Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B1556200B25 for ; Wed, 8 Jun 2016 15:30:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AFC0C160A35; Wed, 8 Jun 2016 13:30:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 340F9160A0E for ; Wed, 8 Jun 2016 15:30:56 +0200 (CEST) Received: (qmail 62234 invoked by uid 500); 8 Jun 2016 13:30:55 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 62225 invoked by uid 99); 8 Jun 2016 13:30:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 13:30:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45FA9DFFAB; Wed, 8 Jun 2016 13:30:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mjwall@apache.org To: commits@accumulo.apache.org Date: Wed, 08 Jun 2016 13:30:55 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] accumulo git commit: ACCUMULO-4157 Bug fix for removing WALs to quickly archived-at: Wed, 08 Jun 2016 13:30:57 -0000 Repository: accumulo Updated Branches: refs/heads/1.7 0eab0ecff -> 5f02d564e ACCUMULO-4157 Bug fix for removing WALs to quickly Keep track of first time a tserver is seen down and only remove WALs for that server if past configurated threshhold Trying to keep the changes small to fix the bug. I'll create another ticket to refactor and cleanup Includes an end to end test calling the collect method simulating a dead tserver. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e0426c51 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e0426c51 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e0426c51 Branch: refs/heads/1.7 Commit: e0426c51cd6991741e9be321aaa1e4f5361e0e3e Parents: beb69cd Author: Michael Wall Authored: Wed Jun 8 08:06:27 2016 -0400 Committer: Michael Wall Committed: Wed Jun 8 08:07:47 2016 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../apache/accumulo/core/conf/PropertyTest.java | 5 + .../gc/GarbageCollectWriteAheadLogs.java | 308 ++++++++++++++---- .../gc/GarbageCollectWriteAheadLogsTest.java | 320 ++++++++++++++++++- 4 files changed, 567 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 2149ad9..5fff17f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -305,6 +305,8 @@ public enum Property { GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"), GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"), GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"), + GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION, + "Time to wait after a tserver is first seen as dead before removing associated WAL files"), // properties that are specific to the monitor server behavior MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java index bca2e22..4d1dc70 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java @@ -147,4 +147,9 @@ public class PropertyTest { } } } + + @Test + public void testGCDeadServerWaitSecond() { + assertEquals("1h", Property.GC_WAL_DEAD_SERVER_WAIT.getDefaultValue()); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 06ace49..b7d8d92 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.gc; +import com.google.common.annotations.VisibleForTesting; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -55,12 +56,16 @@ import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import com.google.common.net.HostAndPort; +import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.conf.Property; public class GarbageCollectWriteAheadLogs { private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class); private final Instance instance; private final VolumeManager fs; + private final Map firstSeenDead = new HashMap(); + private AccumuloConfiguration config; private boolean useTrash; @@ -107,6 +112,15 @@ public class GarbageCollectWriteAheadLogs { return useTrash; } + /** + * Removes all the WAL files that are no longer used. + *

+ * + * This method is not Threadsafe. SimpleGarbageCollector#run does not invoke collect in a concurrent manner. + * + * @param status + * GCStatus object + */ public void collect(GCStatus status) { Span span = Trace.start("scanServers"); @@ -170,76 +184,202 @@ public class GarbageCollectWriteAheadLogs { } } - private int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { - AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(instance); + private AccumuloConfiguration getConfig() { + return ServerConfiguration.getSystemConfiguration(instance); + } + + /** + * Top level method for removing WAL files. + *

+ * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal + * + * @param nameToFileMap + * Map of filename to Path + * @param serverToFileMap + * Map of HostAndPort string to a list of Paths + * @param sortedWALogs + * Map of sorted WAL names to Path + * @param status + * GCStatus object for tracking what is done + * @return 0 always + */ + @VisibleForTesting + int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { + // TODO: remove nameToFileMap from method signature, not used here I don't think + AccumuloConfiguration conf = getConfig(); for (Entry> entry : serverToFileMap.entrySet()) { if (entry.getKey().isEmpty()) { - // old-style log entry, just remove it - for (Path path : entry.getValue()) { - log.debug("Removing old-style WAL " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } + removeOldStyleWAL(entry, status); } else { - HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); - if (!holdsLock(address)) { - for (Path path : entry.getValue()) { - log.debug("Removing WAL for offline server " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } - continue; - } else { - Client tserver = null; - try { - tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue())); - log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); - status.currentLog.deleted += entry.getValue().size(); - } catch (TException e) { - log.warn("Error talking to " + address + ": " + e); - } finally { - if (tserver != null) - ThriftUtil.returnClient(tserver); - } - } + removeWALFile(entry, conf, status); } } - for (Path swalog : sortedWALogs.values()) { - log.debug("Removing sorted WAL " + swalog); + removeSortedWAL(swalog); + } + return 0; + } + + /** + * Removes sortedWALs. + *

+ * Sorted WALs are WALs that are in the recovery directory and have already been used. + * + * @param swalog + * Path to the WAL + */ + @VisibleForTesting + void removeSortedWAL(Path swalog) { + log.debug("Removing sorted WAL " + swalog); + try { + if (!useTrash || !fs.moveToTrash(swalog)) { + fs.deleteRecursively(swalog); + } + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ioe) { try { - if (!useTrash || !fs.moveToTrash(swalog)) { - fs.deleteRecursively(swalog); + if (fs.exists(swalog)) { + log.error("Unable to delete sorted walog " + swalog + ": " + ioe); } - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ioe) { + } catch (IOException ex) { + log.error("Unable to check for the existence of " + swalog, ex); + } + } + } + + /** + * A wrapper method to check if the tserver using the WAL is still alive + *

+ * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive + * + * @param entry + * WAL information gathered + * @param conf + * AccumuloConfiguration object + * @param status + * GCStatus object + */ + void removeWALFile(Entry> entry, AccumuloConfiguration conf, final GCStatus status) { + HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); + if (!holdsLock(address)) { + removeWALfromDownTserver(address, conf, entry, status); + } else { + askTserverToRemoveWAL(address, conf, entry, status); + } + } + + /** + * Asks a currently running tserver to remove it's WALs. + *

+ * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of + * just relying on information in the metadata table. + * + * @param address + * HostAndPort of the tserver + * @param conf + * AccumuloConfiguration entry + * @param entry + * WAL information gathered + * @param status + * GCStatus object + */ + @VisibleForTesting + void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + firstSeenDead.remove(address); + Client tserver = null; + try { + tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); + tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue())); + log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey()); + status.currentLog.deleted += entry.getValue().size(); + } catch (TException e) { + log.warn("Error talking to " + address + ": " + e); + } finally { + if (tserver != null) + ThriftUtil.returnClient(tserver); + } + } + + /** + * Get the configured wait period a server has to be dead. + *

+ * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like + * 3600s, 5m or 2h. + * + * @param conf + * AccumuloConfiguration + * @return long that represents the millis to wait + */ + @VisibleForTesting + long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) { + return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT); + } + + /** + * Remove walogs associated with a tserver that no longer has a look. + *

+ * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead + * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver. + * + * @param address + * HostAndPort of the tserver with no lock + * @param conf + * AccumuloConfiguration to get that gc.wal.dead.server.wait info + * @param entry + * The WALOG path + * @param status + * GCStatus for tracking changes + */ + @VisibleForTesting + void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + // tserver is down, only delete once configured time has passed + if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) { + for (Path path : entry.getValue()) { + log.debug("Removing WAL for offline server " + address + " at " + path); try { - if (fs.exists(swalog)) { - log.error("Unable to delete sorted walog " + swalog + ": " + ioe); + if (!useTrash || !fs.moveToTrash(path)) { + fs.deleteRecursively(path); } + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored } catch (IOException ex) { - log.error("Unable to check for the existence of " + swalog, ex); + log.error("Unable to delete wal " + path + ": " + ex); } } + firstSeenDead.remove(address); + } else { + log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address); } + } - return 0; + /** + * Removes old style WAL entries. + *

+ * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support + * upgrading from that version, this code should be removed + * + * @param entry + * Map of empty server address to List of Paths + * @param status + * GCStatus object + */ + @VisibleForTesting + void removeOldStyleWAL(Entry> entry, final GCStatus status) { + // old-style log entry, just remove it + for (Path path : entry.getValue()) { + log.debug("Removing old-style WAL " + path); + try { + if (!useTrash || !fs.moveToTrash(path)) + fs.deleteRecursively(path); + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete wal " + path + ": " + ex); + } + } } /** @@ -281,7 +421,8 @@ public class GarbageCollectWriteAheadLogs { return result; } - private int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + @VisibleForTesting + int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, InterruptedException { int count = 0; Iterator iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get()); @@ -307,19 +448,22 @@ public class GarbageCollectWriteAheadLogs { return count; } - private int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { + @VisibleForTesting + int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap); } // TODO Remove deprecation warning suppression when Hadoop1 support is dropped @SuppressWarnings("deprecation") /** - * Scans write-ahead log directories for logs. The maps passed in are - * populated with scan information. + * Scans write-ahead log directories for logs. The maps passed in are populated with scan information. * - * @param walDirs write-ahead log directories - * @param fileToServerMap map of file paths to servers - * @param nameToFileMap map of file names to paths + * @param walDirs + * write-ahead log directories + * @param fileToServerMap + * map of file paths to servers + * @param nameToFileMap + * map of file names to paths * @return number of servers located (including those with no logs present) */ int scanServers(String[] walDirs, Map fileToServerMap, Map nameToFileMap) throws Exception { @@ -360,7 +504,8 @@ public class GarbageCollectWriteAheadLogs { return servers.size(); } - private Map getSortedWALogs() throws IOException { + @VisibleForTesting + Map getSortedWALogs() throws IOException { return getSortedWALogs(ServerConstants.getRecoveryDirs()); } @@ -410,4 +555,41 @@ public class GarbageCollectWriteAheadLogs { } } + /** + * Determine if TServer has been dead long enough to remove associated WALs. + *

+ * Uses a map where the key is the address and the value is the time first seen dead. If the address is not in the map, it is added with the current system + * nanoTime. When the passed in wait time has elapsed, this method returns true and removes the key and value from the map. + * + * @param address + * HostAndPort of dead tserver + * @param wait + * long value of elapsed millis to wait + * @return boolean whether enough time elapsed since the server was first seen as dead. + */ + @VisibleForTesting + protected boolean timeToDelete(HostAndPort address, long wait) { + // check whether the tserver has been dead long enough + Long firstSeen = firstSeenDead.get(address); + if (firstSeen != null) { + long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstSeen); + log.trace("Elapsed milliseconds since " + address + " first seen dead: " + elapsedTime); + return elapsedTime > wait; + } else { + log.trace("Adding server to firstSeenDead map " + address); + firstSeenDead.put(address, System.nanoTime()); + return false; + } + } + + /** + * Method to clear the map used in timeToDelete. + *

+ * Useful for testing. + */ + @VisibleForTesting + void clearFirstSeenDead() { + firstSeenDead.clear(); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 76579f8..03f5c96 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -16,27 +16,53 @@ */ package org.apache.accumulo.gc; -import static org.easymock.EasyMock.createMock; +import com.google.common.net.HostAndPort; + import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; +import java.io.IOException; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; + import org.junit.Before; import org.junit.Test; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.zookeeper.KeeperException; + +import java.io.File; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map.Entry; + +import static org.easymock.EasyMock.createMock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static java.lang.Thread.sleep; + +import java.io.FileOutputStream; + +import org.apache.commons.io.FileUtils; + +import java.util.concurrent.TimeUnit; + public class GarbageCollectWriteAheadLogsTest { private static final long BLOCK_SIZE = 64000000L; @@ -234,4 +260,288 @@ public class GarbageCollectWriteAheadLogsTest { assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString())); assertFalse(GarbageCollectWriteAheadLogs.isUUID(null)); } + + @Test + public void testTimeToDeleteTrue() throws InterruptedException { + HostAndPort address = HostAndPort.fromString("tserver1:9998"); + long wait = AccumuloConfiguration.getTimeInMillis("1s"); + gcwal.clearFirstSeenDead(); + assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait)); + sleep(wait * 2); + assertTrue(gcwal.timeToDelete(address, wait)); + } + + @Test + public void testTimeToDeleteFalse() { + HostAndPort address = HostAndPort.fromString("tserver1:9998"); + long wait = AccumuloConfiguration.getTimeInMillis("1h"); + long t1, t2; + boolean ttd; + do { + t1 = System.nanoTime(); + gcwal.clearFirstSeenDead(); + assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait)); + ttd = gcwal.timeToDelete(address, wait); + t2 = System.nanoTime(); + } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait + + assertFalse(ttd); + } + + @Test + public void testTimeToDeleteWithNullAddress() { + assertFalse(gcwal.timeToDelete(null, 123l)); + } + + /** + * Wrapper class with some helper methods + *

+ * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner. + */ + class MethodCalls { + + private LinkedHashMap> mapWrapper; + + public MethodCalls() { + mapWrapper = new LinkedHashMap>(); + } + + public void put(String methodName, Object... args) { + mapWrapper.put(methodName, Arrays.asList(args)); + } + + public int size() { + return mapWrapper.size(); + } + + public boolean hasOneEntry() { + return size() == 1; + } + + public Map.Entry> getFirstEntry() { + return mapWrapper.entrySet().iterator().next(); + } + + public String getFirstEntryMethod() { + return getFirstEntry().getKey(); + } + + public List getFirstEntryArgs() { + return getFirstEntry().getValue(); + } + + public Object getFirstEntryArg(int number) { + return getFirstEntryArgs().get(number); + } + } + + /** + * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method + *

+ * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method + */ + class GCWALPartialMock extends GarbageCollectWriteAheadLogs { + + private boolean holdsLockBool = false; + + public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException { + super(i, vm, useTrash); + this.holdsLockBool = holdLock; + } + + public MethodCalls methodCalls = new MethodCalls(); + + @Override + boolean holdsLock(HostAndPort addr) { + return holdsLockBool; + } + + @Override + void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + methodCalls.put("removeWALFromDownTserver", address, conf, entry, status); + } + + @Override + void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status); + } + + @Override + void removeOldStyleWAL(Entry> entry, final GCStatus status) { + methodCalls.put("removeOldStyleWAL", entry, status); + } + + @Override + void removeSortedWAL(Path swalog) { + methodCalls.put("removeSortedWAL", swalog); + } + } + + private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException { + return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked); + } + + private Map getEmptyMap() { + return new HashMap(); + } + + private Map> getServerToFileMap1(String key, Path singlePath) { + Map> serverToFileMap = new HashMap>(); + serverToFileMap.put(key, new ArrayList(Arrays.asList(singlePath))); + return serverToFileMap; + } + + @Test + public void testRemoveFilesWithOldStyle() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1("", p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod()); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0)); + assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1)); + } + + @Test + public void testRemoveFilesWithDeadTservers() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false); + String server = "tserver1+9997"; + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1(server, p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod()); + assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0)); + assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2)); + assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3)); + } + + @Test + public void testRemoveFilesWithLiveTservers() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + String server = "tserver1+9997"; + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1(server, p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod()); + assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0)); + assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2)); + assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3)); + } + + @Test + public void testRemoveFilesRemovesSortedWALs() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + Map> serverToFileMap = new HashMap>(); + Map sortedWALogs = new HashMap(); + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString()); + sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status); + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod()); + assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0)); + + } + + static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver"; + static String GCWAL_DEAD_TSERVER = "tserver1"; + static String GCWAL_DEAD_TSERVER_PORT = "9995"; + static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString(); + + class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs { + + public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException { + super(i, vm, useTrash); + } + + @Override + boolean holdsLock(HostAndPort addr) { + // tries use zookeeper + return false; + } + + @Override + Map getSortedWALogs() { + return new HashMap(); + } + + @Override + int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { + String sep = File.separator; + Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep + + GCWAL_DEAD_TSERVER_COLLECT_FILE); + fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT); + nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p); + return 1; + } + + @Override + int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { + return 0; + } + + long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) { + // tries to use zookeeper + return 1000l; + } + } + + @Test + public void testCollectWithDeadTserver() throws IOException, InterruptedException { + Instance i = new MockInstance(); + File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR); + File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT); + File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE); + if (!walFileDir.exists()) { + walFileDir.mkdirs(); + new FileOutputStream(walFile).close(); + } + + try { + VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString()); + GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false); + GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); + + gcwal2.collect(status); + + assertTrue("File should not be deleted", walFile.exists()); + assertEquals("Should have one candidate", 1, status.lastLog.getCandidates()); + assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted()); + + sleep(2000); + gcwal2.collect(status); + + assertFalse("File should be gone", walFile.exists()); + assertEquals("Should have one candidate", 1, status.lastLog.getCandidates()); + assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted()); + + } finally { + if (walDir.exists()) { + FileUtils.deleteDirectory(walDir); + } + } + } }