accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mjw...@apache.org
Subject accumulo git commit: ACCUMULO-4157 Bug fix for removing WALs to quickly
Date Wed, 08 Jun 2016 13:30:23 GMT
Repository: accumulo
Updated Branches:
  refs/heads/1.6 beb69cdfc -> e0426c51c


ACCUMULO-4157 Bug fix for removing WALs to quickly

Keep track of first time a tserver is seen down and only remove WALs for that server if past
configurated threshhold

Trying to keep the changes small to fix the bug.  I'll create another ticket to refactor and
cleanup

Includes an end to end test calling the collect method simulating a dead tserver.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e0426c51
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e0426c51
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e0426c51

Branch: refs/heads/1.6
Commit: e0426c51cd6991741e9be321aaa1e4f5361e0e3e
Parents: beb69cd
Author: Michael Wall <mjwall@apache.org>
Authored: Wed Jun 8 08:06:27 2016 -0400
Committer: Michael Wall <mjwall@apache.org>
Committed: Wed Jun 8 08:07:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../apache/accumulo/core/conf/PropertyTest.java |   5 +
 .../gc/GarbageCollectWriteAheadLogs.java        | 308 ++++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 320 ++++++++++++++++++-
 4 files changed, 567 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2149ad9..5fff17f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -305,6 +305,8 @@ public enum Property {
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads
used to delete files"),
   GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash,
even if it is configured"),
   GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories
instead of moving to the HDFS trash or deleting"),
+  GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION,
+      "Time to wait after a tserver is first seen as dead before removing associated WAL
files"),
 
   // properties that are specific to the monitor server behavior
   MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the monitor web server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
index bca2e22..4d1dc70 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
@@ -147,4 +147,9 @@ public class PropertyTest {
       }
     }
   }
+
+  @Test
+  public void testGCDeadServerWaitSecond() {
+    assertEquals("1h", Property.GC_WAL_DEAD_SERVER_WAIT.getDefaultValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 06ace49..b7d8d92 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.gc;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -55,12 +56,16 @@ import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.net.HostAndPort;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.conf.Property;
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 
   private final Instance instance;
   private final VolumeManager fs;
+  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private AccumuloConfiguration config;
 
   private boolean useTrash;
 
@@ -107,6 +112,15 @@ public class GarbageCollectWriteAheadLogs {
     return useTrash;
   }
 
+  /**
+   * Removes all the WAL files that are no longer used.
+   * <p>
+   *
+   * This method is not Threadsafe. SimpleGarbageCollector#run does not invoke collect in
a concurrent manner.
+   *
+   * @param status
+   *          GCStatus object
+   */
   public void collect(GCStatus status) {
 
     Span span = Trace.start("scanServers");
@@ -170,76 +184,202 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>>
serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
-    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(instance);
+  private AccumuloConfiguration getConfig() {
+    return ServerConfiguration.getSystemConfiguration(instance);
+  }
+
+  /**
+   * Top level method for removing WAL files.
+   * <p>
+   * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods
for removal
+   *
+   * @param nameToFileMap
+   *          Map of filename to Path
+   * @param serverToFileMap
+   *          Map of HostAndPort string to a list of Paths
+   * @param sortedWALogs
+   *          Map of sorted WAL names to Path
+   * @param status
+   *          GCStatus object for tracking what is done
+   * @return 0 always
+   */
+  @VisibleForTesting
+  int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>>
serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+    // TODO: remove nameToFileMap from method signature, not used here I don't think
+    AccumuloConfiguration conf = getConfig();
     for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
       if (entry.getKey().isEmpty()) {
-        // old-style log entry, just remove it
-        for (Path path : entry.getValue()) {
-          log.debug("Removing old-style WAL " + path);
-          try {
-            if (!useTrash || !fs.moveToTrash(path))
-              fs.deleteRecursively(path);
-            status.currentLog.deleted++;
-          } catch (FileNotFoundException ex) {
-            // ignored
-          } catch (IOException ex) {
-            log.error("Unable to delete wal " + path + ": " + ex);
-          }
-        }
+        removeOldStyleWAL(entry, status);
       } else {
-        HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
-        if (!holdsLock(address)) {
-          for (Path path : entry.getValue()) {
-            log.debug("Removing WAL for offline server " + path);
-            try {
-              if (!useTrash || !fs.moveToTrash(path))
-                fs.deleteRecursively(path);
-              status.currentLog.deleted++;
-            } catch (FileNotFoundException ex) {
-              // ignored
-            } catch (IOException ex) {
-              log.error("Unable to delete wal " + path + ": " + ex);
-            }
-          }
-          continue;
-        } else {
-          Client tserver = null;
-          try {
-            tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address,
conf);
-            tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance),
paths2strings(entry.getValue()));
-            log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-            status.currentLog.deleted += entry.getValue().size();
-          } catch (TException e) {
-            log.warn("Error talking to " + address + ": " + e);
-          } finally {
-            if (tserver != null)
-              ThriftUtil.returnClient(tserver);
-          }
-        }
+        removeWALFile(entry, conf, status);
       }
     }
-
     for (Path swalog : sortedWALogs.values()) {
-      log.debug("Removing sorted WAL " + swalog);
+      removeSortedWAL(swalog);
+    }
+    return 0;
+  }
+
+  /**
+   * Removes sortedWALs.
+   * <p>
+   * Sorted WALs are WALs that are in the recovery directory and have already been used.
+   *
+   * @param swalog
+   *          Path to the WAL
+   */
+  @VisibleForTesting
+  void removeSortedWAL(Path swalog) {
+    log.debug("Removing sorted WAL " + swalog);
+    try {
+      if (!useTrash || !fs.moveToTrash(swalog)) {
+        fs.deleteRecursively(swalog);
+      }
+    } catch (FileNotFoundException ex) {
+      // ignored
+    } catch (IOException ioe) {
       try {
-        if (!useTrash || !fs.moveToTrash(swalog)) {
-          fs.deleteRecursively(swalog);
+        if (fs.exists(swalog)) {
+          log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
         }
-      } catch (FileNotFoundException ex) {
-        // ignored
-      } catch (IOException ioe) {
+      } catch (IOException ex) {
+        log.error("Unable to check for the existence of " + swalog, ex);
+      }
+    }
+  }
+
+  /**
+   * A wrapper method to check if the tserver using the WAL is still alive
+   * <p>
+   * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL
if the server is known to still be alive
+   *
+   * @param entry
+   *          WAL information gathered
+   * @param conf
+   *          AccumuloConfiguration object
+   * @param status
+   *          GCStatus object
+   */
+  void removeWALFile(Entry<String,ArrayList<Path>> entry, AccumuloConfiguration
conf, final GCStatus status) {
+    HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+    if (!holdsLock(address)) {
+      removeWALfromDownTserver(address, conf, entry, status);
+    } else {
+      askTserverToRemoveWAL(address, conf, entry, status);
+    }
+  }
+
+  /**
+   * Asks a currently running tserver to remove it's WALs.
+   * <p>
+   * A tserver has more information about whether a WAL is still being used for current mutations.
It is safer to ask the tserver to remove the file instead of
+   * just relying on information in the metadata table.
+   *
+   * @param address
+   *          HostAndPort of the tserver
+   * @param conf
+   *          AccumuloConfiguration entry
+   * @param entry
+   *          WAL information gathered
+   * @param status
+   *          GCStatus object
+   */
+  @VisibleForTesting
+  void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>>
entry, final GCStatus status) {
+    firstSeenDead.remove(address);
+    Client tserver = null;
+    try {
+      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance),
paths2strings(entry.getValue()));
+      log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey());
+      status.currentLog.deleted += entry.getValue().size();
+    } catch (TException e) {
+      log.warn("Error talking to " + address + ": " + e);
+    } finally {
+      if (tserver != null)
+        ThriftUtil.returnClient(tserver);
+    }
+  }
+
+  /**
+   * Get the configured wait period a server has to be dead.
+   * <p>
+   * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT
and is duration. Valid values include a unit with no space like
+   * 3600s, 5m or 2h.
+   *
+   * @param conf
+   *          AccumuloConfiguration
+   * @return long that represents the millis to wait
+   */
+  @VisibleForTesting
+  long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+    return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT);
+  }
+
+  /**
+   * Remove walogs associated with a tserver that no longer has a look.
+   * <p>
+   * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long
a server must be "dead" before removing the associated write ahead
+   * log files. The intent to ensure that recovery succeeds for the tablet that were host
on that tserver.
+   *
+   * @param address
+   *          HostAndPort of the tserver with no lock
+   * @param conf
+   *          AccumuloConfiguration to get that gc.wal.dead.server.wait info
+   * @param entry
+   *          The WALOG path
+   * @param status
+   *          GCStatus for tracking changes
+   */
+  @VisibleForTesting
+  void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>>
entry, final GCStatus status) {
+    // tserver is down, only delete once configured time has passed
+    if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) {
+      for (Path path : entry.getValue()) {
+        log.debug("Removing WAL for offline server " + address + " at " + path);
         try {
-          if (fs.exists(swalog)) {
-            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+          if (!useTrash || !fs.moveToTrash(path)) {
+            fs.deleteRecursively(path);
           }
+          status.currentLog.deleted++;
+        } catch (FileNotFoundException ex) {
+          // ignored
         } catch (IOException ex) {
-          log.error("Unable to check for the existence of " + swalog, ex);
+          log.error("Unable to delete wal " + path + ": " + ex);
         }
       }
+      firstSeenDead.remove(address);
+    } else {
+      log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since
it has not be long enough: " + address);
     }
+  }
 
-    return 0;
+  /**
+   * Removes old style WAL entries.
+   * <p>
+   * The format for storing WAL info in the metadata table changed at some point, maybe the
1.5 release. Once that is known for sure and we no longer support
+   * upgrading from that version, this code should be removed
+   *
+   * @param entry
+   *          Map of empty server address to List of Paths
+   * @param status
+   *          GCStatus object
+   */
+  @VisibleForTesting
+  void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus
status) {
+    // old-style log entry, just remove it
+    for (Path path : entry.getValue()) {
+      log.debug("Removing old-style WAL " + path);
+      try {
+        if (!useTrash || !fs.moveToTrash(path))
+          fs.deleteRecursively(path);
+        status.currentLog.deleted++;
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ex) {
+        log.error("Unable to delete wal " + path + ": " + ex);
+      }
+    }
   }
 
   /**
@@ -281,7 +421,8 @@ public class GarbageCollectWriteAheadLogs {
     return result;
   }
 
-  private int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path>
sortedWALogs, GCStatus status) throws IOException, KeeperException,
+  @VisibleForTesting
+  int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path>
sortedWALogs, GCStatus status) throws IOException, KeeperException,
       InterruptedException {
     int count = 0;
     Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
@@ -307,19 +448,22 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
-  private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path>
nameToFileMap) throws Exception {
+  @VisibleForTesting
+  int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap)
throws Exception {
     return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
   }
 
   // TODO Remove deprecation warning suppression when Hadoop1 support is dropped
   @SuppressWarnings("deprecation")
   /**
-   * Scans write-ahead log directories for logs. The maps passed in are
-   * populated with scan information.
+   * Scans write-ahead log directories for logs. The maps passed in are populated with scan
information.
    *
-   * @param walDirs write-ahead log directories
-   * @param fileToServerMap map of file paths to servers
-   * @param nameToFileMap map of file names to paths
+   * @param walDirs
+   *          write-ahead log directories
+   * @param fileToServerMap
+   *          map of file paths to servers
+   * @param nameToFileMap
+   *          map of file names to paths
    * @return number of servers located (including those with no logs present)
    */
   int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path>
nameToFileMap) throws Exception {
@@ -360,7 +504,8 @@ public class GarbageCollectWriteAheadLogs {
     return servers.size();
   }
 
-  private Map<String,Path> getSortedWALogs() throws IOException {
+  @VisibleForTesting
+  Map<String,Path> getSortedWALogs() throws IOException {
     return getSortedWALogs(ServerConstants.getRecoveryDirs());
   }
 
@@ -410,4 +555,41 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
+  /**
+   * Determine if TServer has been dead long enough to remove associated WALs.
+   * <p>
+   * Uses a map where the key is the address and the value is the time first seen dead. If
the address is not in the map, it is added with the current system
+   * nanoTime. When the passed in wait time has elapsed, this method returns true and removes
the key and value from the map.
+   *
+   * @param address
+   *          HostAndPort of dead tserver
+   * @param wait
+   *          long value of elapsed millis to wait
+   * @return boolean whether enough time elapsed since the server was first seen as dead.
+   */
+  @VisibleForTesting
+  protected boolean timeToDelete(HostAndPort address, long wait) {
+    // check whether the tserver has been dead long enough
+    Long firstSeen = firstSeenDead.get(address);
+    if (firstSeen != null) {
+      long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstSeen);
+      log.trace("Elapsed milliseconds since " + address + " first seen dead: " + elapsedTime);
+      return elapsedTime > wait;
+    } else {
+      log.trace("Adding server to firstSeenDead map " + address);
+      firstSeenDead.put(address, System.nanoTime());
+      return false;
+    }
+  }
+
+  /**
+   * Method to clear the map used in timeToDelete.
+   * <p>
+   * Useful for testing.
+   */
+  @VisibleForTesting
+  void clearFirstSeenDead() {
+    firstSeenDead.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 76579f8..03f5c96 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -16,27 +16,53 @@
  */
 package org.apache.accumulo.gc;
 
-import static org.easymock.EasyMock.createMock;
+import com.google.common.net.HostAndPort;
+
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static java.lang.Thread.sleep;
+
+import java.io.FileOutputStream;
+
+import org.apache.commons.io.FileUtils;
+
+import java.util.concurrent.TimeUnit;
+
 public class GarbageCollectWriteAheadLogsTest {
   private static final long BLOCK_SIZE = 64000000L;
 
@@ -234,4 +260,288 @@ public class GarbageCollectWriteAheadLogsTest {
     assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
     assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
   }
+
+  @Test
+  public void testTimeToDeleteTrue() throws InterruptedException {
+    HostAndPort address = HostAndPort.fromString("tserver1:9998");
+    long wait = AccumuloConfiguration.getTimeInMillis("1s");
+    gcwal.clearFirstSeenDead();
+    assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address,
wait));
+    sleep(wait * 2);
+    assertTrue(gcwal.timeToDelete(address, wait));
+  }
+
+  @Test
+  public void testTimeToDeleteFalse() {
+    HostAndPort address = HostAndPort.fromString("tserver1:9998");
+    long wait = AccumuloConfiguration.getTimeInMillis("1h");
+    long t1, t2;
+    boolean ttd;
+    do {
+      t1 = System.nanoTime();
+      gcwal.clearFirstSeenDead();
+      assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address,
wait));
+      ttd = gcwal.timeToDelete(address, wait);
+      t2 = System.nanoTime();
+    } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took
less than half of the configured wait
+
+    assertFalse(ttd);
+  }
+
+  @Test
+  public void testTimeToDeleteWithNullAddress() {
+    assertFalse(gcwal.timeToDelete(null, 123l));
+  }
+
+  /**
+   * Wrapper class with some helper methods
+   * <p>
+   * Just a wrapper around a LinkedHashMap that store method name and argument information.
Also includes some convenience methods to make usage cleaner.
+   */
+  class MethodCalls {
+
+    private LinkedHashMap<String,List<Object>> mapWrapper;
+
+    public MethodCalls() {
+      mapWrapper = new LinkedHashMap<String,List<Object>>();
+    }
+
+    public void put(String methodName, Object... args) {
+      mapWrapper.put(methodName, Arrays.asList(args));
+    }
+
+    public int size() {
+      return mapWrapper.size();
+    }
+
+    public boolean hasOneEntry() {
+      return size() == 1;
+    }
+
+    public Map.Entry<String,List<Object>> getFirstEntry() {
+      return mapWrapper.entrySet().iterator().next();
+    }
+
+    public String getFirstEntryMethod() {
+      return getFirstEntry().getKey();
+    }
+
+    public List<Object> getFirstEntryArgs() {
+      return getFirstEntry().getValue();
+    }
+
+    public Object getFirstEntryArg(int number) {
+      return getFirstEntryArgs().get(number);
+    }
+  }
+
+  /**
+   * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method
+   * <p>
+   * There is a map named methodCalls that can be used to assert parameters on methods called
inside the removeFile method
+   */
+  class GCWALPartialMock extends GarbageCollectWriteAheadLogs {
+
+    private boolean holdsLockBool = false;
+
+    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock)
throws IOException {
+      super(i, vm, useTrash);
+      this.holdsLockBool = holdLock;
+    }
+
+    public MethodCalls methodCalls = new MethodCalls();
+
+    @Override
+    boolean holdsLock(HostAndPort addr) {
+      return holdsLockBool;
+    }
+
+    @Override
+    void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>>
entry, final GCStatus status) {
+      methodCalls.put("removeWALFromDownTserver", address, conf, entry, status);
+    }
+
+    @Override
+    void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>>
entry, final GCStatus status) {
+      methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status);
+    }
+
+    @Override
+    void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus
status) {
+      methodCalls.put("removeOldStyleWAL", entry, status);
+    }
+
+    @Override
+    void removeSortedWAL(Path swalog) {
+      methodCalls.put("removeSortedWAL", swalog);
+    }
+  }
+
+  private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws
IOException {
+    return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false,
locked);
+  }
+
+  private Map<String,Path> getEmptyMap() {
+    return new HashMap<String,Path>();
+  }
+
+  private Map<String,ArrayList<Path>> getServerToFileMap1(String key, Path singlePath)
{
+    Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+    serverToFileMap.put(key, new ArrayList<Path>(Arrays.asList(singlePath)));
+    return serverToFileMap;
+  }
+
+  @Test
+  public void testRemoveFilesWithOldStyle() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1("", p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod());
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0));
+    assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1));
+  }
+
+  @Test
+  public void testRemoveFilesWithDeadTservers() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false);
+    String server = "tserver1+9997";
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server,
p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver",
calls.getFirstEntryMethod());
+    assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]",
":")), calls.getFirstEntryArg(0));
+    assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1)
instanceof AccumuloConfiguration);
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+    assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+  }
+
+  @Test
+  public void testRemoveFilesWithLiveTservers() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    String server = "tserver1+9997";
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server,
p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod());
+    assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]",
":")), calls.getFirstEntryArg(0));
+    assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1)
instanceof AccumuloConfiguration);
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+    assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+  }
+
+  @Test
+  public void testRemoveFilesRemovesSortedWALs() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+    Map<String,Path> sortedWALogs = new HashMap<String,Path>();
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+    sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can
be removed
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status);
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod());
+    assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0));
+
+  }
+
+  static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver";
+  static String GCWAL_DEAD_TSERVER = "tserver1";
+  static String GCWAL_DEAD_TSERVER_PORT = "9995";
+  static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString();
+
+  class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
+
+    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws
IOException {
+      super(i, vm, useTrash);
+    }
+
+    @Override
+    boolean holdsLock(HostAndPort addr) {
+      // tries use zookeeper
+      return false;
+    }
+
+    @Override
+    Map<String,Path> getSortedWALogs() {
+      return new HashMap<String,Path>();
+    }
+
+    @Override
+    int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap)
throws Exception {
+      String sep = File.separator;
+      Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR
+ sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep
+          + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+      fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT);
+      nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p);
+      return 1;
+    }
+
+    @Override
+    int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path>
sortedWALogs, GCStatus status) throws IOException, KeeperException,
+        InterruptedException {
+      return 0;
+    }
+
+    long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+      // tries to use zookeeper
+      return 1000l;
+    }
+  }
+
+  @Test
+  public void testCollectWithDeadTserver() throws IOException, InterruptedException {
+    Instance i = new MockInstance();
+    File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator
+ GCWAL_DEAD_DIR);
+    File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT);
+    File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+    if (!walFileDir.exists()) {
+      walFileDir.mkdirs();
+      new FileOutputStream(walFile).close();
+    }
+
+    try {
+      VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false);
+      GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(),
new GcCycleStats());
+
+      gcwal2.collect(status);
+
+      assertTrue("File should not be deleted", walFile.exists());
+      assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+      assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted());
+
+      sleep(2000);
+      gcwal2.collect(status);
+
+      assertFalse("File should be gone", walFile.exists());
+      assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+      assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted());
+
+    } finally {
+      if (walDir.exists()) {
+        FileUtils.deleteDirectory(walDir);
+      }
+    }
+  }
 }


Mime
View raw message