Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2642E200B25 for ; Wed, 8 Jun 2016 15:32:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 25058160A0E; Wed, 8 Jun 2016 13:32:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7171E160A2E for ; Wed, 8 Jun 2016 15:32:02 +0200 (CEST) Received: (qmail 64432 invoked by uid 500); 8 Jun 2016 13:32:01 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 64384 invoked by uid 99); 8 Jun 2016 13:32:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 13:32:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 549A7E058E; Wed, 8 Jun 2016 13:32:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mjwall@apache.org To: commits@accumulo.apache.org Date: Wed, 08 Jun 2016 13:32:02 -0000 Message-Id: <3fa41b217f724a99bf7a54c718439565@git.apache.org> In-Reply-To: <0272b2790b754b39ba6b6f419825424b@git.apache.org> References: <0272b2790b754b39ba6b6f419825424b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] accumulo git commit: Merge branch '1.6' into 1.7 archived-at: Wed, 08 Jun 2016 13:32:04 -0000 Merge branch '1.6' into 1.7 Adds commit for ACCUMULO-4157 to fix bug where WALs were deleted too quickly for "Dead" Tservers Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f02d564 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f02d564 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f02d564 Branch: refs/heads/master Commit: 5f02d564ec3dae626edb7091fc1a92f5fd760f97 Parents: 0eab0ec e0426c5 Author: Michael Wall Authored: Wed Jun 8 08:34:26 2016 -0400 Committer: Michael Wall Committed: Wed Jun 8 08:34:26 2016 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../apache/accumulo/core/conf/PropertyTest.java | 5 + .../gc/GarbageCollectWriteAheadLogs.java | 296 +++++++++++++---- .../gc/GarbageCollectWriteAheadLogsTest.java | 332 ++++++++++++++++++- 4 files changed, 564 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index dbb2036,5fff17f..c427610 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -364,7 -305,8 +364,9 @@@ public enum Property GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"), GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"), GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"), + GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"), + GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION, + "Time to wait after a tserver is first seen as dead before removing associated WAL files"), // properties that are specific to the monitor server behavior MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 1735c0d,b7d8d92..a62ffb2 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@@ -51,34 -37,35 +52,39 @@@ import org.apache.accumulo.core.securit import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Span; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.util.AddressUtil; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.ServerConstants; -import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterables; import com.google.common.net.HostAndPort; +import com.google.protobuf.InvalidProtocolBufferException; + import java.util.concurrent.TimeUnit; ++import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.conf.Property; public class GarbageCollectWriteAheadLogs { - private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class); + private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class); - private final Instance instance; + private final AccumuloServerContext context; private final VolumeManager fs; + private final Map firstSeenDead = new HashMap(); + private AccumuloConfiguration config; private boolean useTrash; @@@ -201,75 -184,202 +216,202 @@@ } } - private int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { + private AccumuloConfiguration getConfig() { - return ServerConfiguration.getSystemConfiguration(instance); ++ return context.getServerConfigurationFactory().getConfiguration(); + } + + /** + * Top level method for removing WAL files. + *

+ * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal + * + * @param nameToFileMap + * Map of filename to Path + * @param serverToFileMap + * Map of HostAndPort string to a list of Paths + * @param sortedWALogs + * Map of sorted WAL names to Path + * @param status + * GCStatus object for tracking what is done + * @return 0 always + */ + @VisibleForTesting + int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { + // TODO: remove nameToFileMap from method signature, not used here I don't think + AccumuloConfiguration conf = getConfig(); for (Entry> entry : serverToFileMap.entrySet()) { if (entry.getKey().isEmpty()) { - // old-style log entry, just remove it - for (Path path : entry.getValue()) { - log.debug("Removing old-style WAL " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } + removeOldStyleWAL(entry, status); } else { - HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); - if (!holdsLock(address)) { - for (Path path : entry.getValue()) { - log.debug("Removing WAL for offline server " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } - continue; - } else { - Client tserver = null; - try { - tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); - tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue())); - log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); - status.currentLog.deleted += entry.getValue().size(); - } catch (TException e) { - log.warn("Error talking to " + address + ": " + e); - } finally { - if (tserver != null) - ThriftUtil.returnClient(tserver); - } - } + removeWALFile(entry, conf, status); } } - for (Path swalog : sortedWALogs.values()) { - log.debug("Removing sorted WAL " + swalog); + removeSortedWAL(swalog); + } + return 0; + } + + /** + * Removes sortedWALs. + *

+ * Sorted WALs are WALs that are in the recovery directory and have already been used. + * + * @param swalog + * Path to the WAL + */ + @VisibleForTesting + void removeSortedWAL(Path swalog) { + log.debug("Removing sorted WAL " + swalog); + try { + if (!useTrash || !fs.moveToTrash(swalog)) { + fs.deleteRecursively(swalog); + } + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ioe) { try { - if (!useTrash || !fs.moveToTrash(swalog)) { - fs.deleteRecursively(swalog); + if (fs.exists(swalog)) { + log.error("Unable to delete sorted walog " + swalog + ": " + ioe); } - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ioe) { + } catch (IOException ex) { + log.error("Unable to check for the existence of " + swalog, ex); + } + } + } + + /** + * A wrapper method to check if the tserver using the WAL is still alive + *

+ * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive + * + * @param entry + * WAL information gathered + * @param conf + * AccumuloConfiguration object + * @param status + * GCStatus object + */ + void removeWALFile(Entry> entry, AccumuloConfiguration conf, final GCStatus status) { + HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); + if (!holdsLock(address)) { + removeWALfromDownTserver(address, conf, entry, status); + } else { + askTserverToRemoveWAL(address, conf, entry, status); + } + } + + /** + * Asks a currently running tserver to remove it's WALs. + *

+ * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of + * just relying on information in the metadata table. + * + * @param address + * HostAndPort of the tserver + * @param conf + * AccumuloConfiguration entry + * @param entry + * WAL information gathered + * @param status + * GCStatus object + */ + @VisibleForTesting + void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + firstSeenDead.remove(address); + Client tserver = null; + try { - tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf); - tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue())); ++ tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); ++ tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue())); + log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey()); + status.currentLog.deleted += entry.getValue().size(); + } catch (TException e) { + log.warn("Error talking to " + address + ": " + e); + } finally { + if (tserver != null) + ThriftUtil.returnClient(tserver); + } + } + + /** + * Get the configured wait period a server has to be dead. + *

+ * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like + * 3600s, 5m or 2h. + * + * @param conf + * AccumuloConfiguration + * @return long that represents the millis to wait + */ + @VisibleForTesting + long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) { + return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT); + } + + /** + * Remove walogs associated with a tserver that no longer has a look. + *

+ * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead + * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver. + * + * @param address + * HostAndPort of the tserver with no lock + * @param conf + * AccumuloConfiguration to get that gc.wal.dead.server.wait info + * @param entry + * The WALOG path + * @param status + * GCStatus for tracking changes + */ + @VisibleForTesting + void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + // tserver is down, only delete once configured time has passed + if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) { + for (Path path : entry.getValue()) { + log.debug("Removing WAL for offline server " + address + " at " + path); try { - if (fs.exists(swalog)) { - log.error("Unable to delete sorted walog " + swalog + ": " + ioe); + if (!useTrash || !fs.moveToTrash(path)) { + fs.deleteRecursively(path); } + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored } catch (IOException ex) { - log.error("Unable to check for the existence of " + swalog, ex); + log.error("Unable to delete wal " + path + ": " + ex); } } + firstSeenDead.remove(address); + } else { + log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address); } + } - return 0; + /** + * Removes old style WAL entries. + *

+ * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support + * upgrading from that version, this code should be removed + * + * @param entry + * Map of empty server address to List of Paths + * @param status + * GCStatus object + */ + @VisibleForTesting + void removeOldStyleWAL(Entry> entry, final GCStatus status) { + // old-style log entry, just remove it + for (Path path : entry.getValue()) { + log.debug("Removing old-style WAL " + path); + try { + if (!useTrash || !fs.moveToTrash(path)) + fs.deleteRecursively(path); + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete wal " + path + ": " + ex); + } + } } /** @@@ -311,14 -421,13 +453,15 @@@ return result; } - protected int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + @VisibleForTesting + int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, InterruptedException { int count = 0; - Iterator iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get()); + Iterator iterator = MetadataTableUtil.getLogEntries(context); + // For each WAL reference in the metadata table while (iterator.hasNext()) { + // Each metadata reference has at least one WAL file for (String entry : iterator.next().logSet) { // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases // the last "/" will mark a UUID file name. @@@ -341,101 -448,8 +484,102 @@@ return count; } + protected int removeReplicationEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { + Connector conn; + try { + conn = context.getConnector(); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Failed to get connector", e); + throw new IllegalArgumentException(e); + } + + int count = 0; + + Iterator> walIter = nameToFileMap.entrySet().iterator(); + + while (walIter.hasNext()) { + Entry wal = walIter.next(); + String fullPath = wal.getValue().toString(); + if (neededByReplication(conn, fullPath)) { + log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); + // If we haven't already removed it, check to see if this WAL is + // "in use" by replication (needed for replication purposes) + status.currentLog.inUse++; + + walIter.remove(); + sortedWALogs.remove(wal.getKey()); + } else { + log.debug("WAL not needed for replication {}", fullPath); + } + count++; + } + + return count; + } + + /** + * Determine if the given WAL is needed for replication + * + * @param wal + * The full path (URI) + * @return True if the WAL is still needed by replication (not a candidate for deletion) + */ + protected boolean neededByReplication(Connector conn, String wal) { + log.info("Checking replication table for " + wal); + + Iterable> iter = getReplicationStatusForFile(conn, wal); + + // TODO Push down this filter to the tserver to only return records + // that are not completely replicated and convert this loop into a + // `return s.iterator.hasNext()` statement + for (Entry entry : iter) { + try { + Status status = Status.parseFrom(entry.getValue().get()); + log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status)); + if (!StatusUtil.isSafeForRemoval(status)) { + return true; + } + } catch (InvalidProtocolBufferException e) { + log.error("Could not deserialize Status protobuf for " + entry.getKey(), e); + } + } + + return false; + } + + protected Iterable> getReplicationStatusForFile(Connector conn, String wal) { + Scanner metaScanner; + try { + metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + + // Need to add in the replication section prefix + metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal)); + // Limit the column family to be sure + metaScanner.fetchColumnFamily(ReplicationSection.COLF); + + try { + Scanner replScanner = ReplicationTable.getScanner(conn); + + // Scan only the Status records + StatusSection.limit(replScanner); + + // Only look for this specific WAL + replScanner.setRange(Range.exact(wal)); + + return Iterables.concat(metaScanner, replScanner); + } catch (ReplicationTableOfflineException e) { + // do nothing + } + + return metaScanner; + } + - private int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { + @VisibleForTesting + int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 5801faa,03f5c96..bc9fca3 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@@ -22,58 -23,46 +23,75 @@@ import static org.easymock.EasyMock.rep import java.io.FileNotFoundException; import java.io.IOException; - import java.util.ArrayList; - +import java.util.Collections; - import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; + import java.util.ArrayList; + import java.util.HashMap; import java.util.List; import java.util.Map; - import java.util.Map.Entry; import java.util.UUID; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; - import org.apache.accumulo.core.client.mock.MockInstance; - import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; - import org.apache.accumulo.core.gc.thrift.GCStatus; - import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; - import org.apache.hadoop.fs.FileStatus; - import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; + import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.gc.thrift.GCStatus; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.Path; + import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + import org.apache.accumulo.core.client.mock.MockInstance; + import org.apache.accumulo.core.gc.thrift.GcCycleStats; + import org.apache.accumulo.server.fs.VolumeManagerImpl; + import org.apache.zookeeper.KeeperException; + + import java.io.File; + import java.util.Arrays; + import java.util.LinkedHashMap; + import java.util.Map.Entry; + + import static org.easymock.EasyMock.createMock; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertSame; + import static org.junit.Assert.assertTrue; + import static java.lang.Thread.sleep; + + import java.io.FileOutputStream; + + import org.apache.commons.io.FileUtils; + + import java.util.concurrent.TimeUnit; + public class GarbageCollectWriteAheadLogsTest { private static final long BLOCK_SIZE = 64000000L; @@@ -370,198 -261,287 +388,484 @@@ assertFalse(GarbageCollectWriteAheadLogs.isUUID(null)); } + // It was easier to do this than get the mocking working for me + private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs { + + private List> replData; + + ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List> replData) throws IOException { + super(context, fs, useTrash); + this.replData = replData; + } + + @Override + protected Iterable> getReplicationStatusForFile(Connector conn, String wal) { + return this.replData; + } + } + + @Test + public void replicationEntriesAffectGC() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + Connector conn = createMock(Connector.class); + + // Write a Status record which should prevent file1 from being deleted + LinkedList> replData = new LinkedList<>(); + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()))); + + ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData); + + replay(conn); + + // Open (not-closed) file must be retained + assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); + + // No replication data, not needed + replData.clear(); + assertFalse(replGC.neededByReplication(conn, "/wals/" + file2)); + + // The file is closed but not replicated, must be retained + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue())); + assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); + + // File is closed and fully replicated, can be deleted + replData.clear(); + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), + ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build()))); + assertFalse(replGC.neededByReplication(conn, "/wals/" + file1)); + } + + @Test + public void removeReplicationEntries() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + long file1CreateTime = System.currentTimeMillis(); + long file2CreateTime = file1CreateTime + 50; + BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector()); + Mutation m = new Mutation("/wals/" + file1); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); + bw.addMutation(m); + m = new Mutation("/wals/" + file2); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); + bw.addMutation(m); + + // These WALs are potential candidates for deletion from fs + Map nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + + Map sortedWALogs = Collections.emptyMap(); + + // Make the GCStatus and GcCycleStats + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + + // Both should have been deleted + Assert.assertEquals(0, nameToFileMap.size()); + } + + @Test + public void replicationEntriesOnlyInMetaPreventGC() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + + Connector conn = context.getConnector(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + long file1CreateTime = System.currentTimeMillis(); + long file2CreateTime = file1CreateTime + 50; + // Write some records to the metadata table, we haven't yet written status records to the replication table + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); + bw.addMutation(m); + + // These WALs are potential candidates for deletion from fs + Map nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + + Map sortedWALogs = Collections.emptyMap(); + + // Make the GCStatus and GcCycleStats objects + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + + // Both should have been deleted + Assert.assertEquals(0, nameToFileMap.size()); + } + + @Test + public void noReplicationTableDoesntLimitMetatdataResults() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + Connector conn = context.getConnector(); + + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + Iterable> data = gcWALs.getReplicationStatusForFile(conn, wal); + Entry entry = Iterables.getOnlyElement(data); + + Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString()); + } + + @Test + public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + Connector conn = context.getConnector(); + + long walCreateTime = System.currentTimeMillis(); + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); + bw.addMutation(m); + bw.close(); + + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(wal); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + Iterable> iter = gcWALs.getReplicationStatusForFile(conn, wal); + Map data = new HashMap<>(); + for (Entry e : iter) { + data.put(e.getKey(), e.getValue()); + } + + Assert.assertEquals(2, data.size()); + + // Should get one element from each table (metadata and replication) + for (Key k : data.keySet()) { + String row = k.getRow().toString(); + if (row.startsWith(ReplicationSection.getRowPrefix())) { + Assert.assertTrue(row.endsWith(wal)); + } else { + Assert.assertEquals(wal, row); + } + } + } ++ + @Test + public void testTimeToDeleteTrue() throws InterruptedException { + HostAndPort address = HostAndPort.fromString("tserver1:9998"); + long wait = AccumuloConfiguration.getTimeInMillis("1s"); + gcwal.clearFirstSeenDead(); + assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait)); + sleep(wait * 2); + assertTrue(gcwal.timeToDelete(address, wait)); + } + + @Test + public void testTimeToDeleteFalse() { + HostAndPort address = HostAndPort.fromString("tserver1:9998"); + long wait = AccumuloConfiguration.getTimeInMillis("1h"); + long t1, t2; + boolean ttd; + do { + t1 = System.nanoTime(); + gcwal.clearFirstSeenDead(); + assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait)); + ttd = gcwal.timeToDelete(address, wait); + t2 = System.nanoTime(); + } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait + + assertFalse(ttd); + } + + @Test + public void testTimeToDeleteWithNullAddress() { + assertFalse(gcwal.timeToDelete(null, 123l)); + } + + /** + * Wrapper class with some helper methods + *

+ * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner. + */ + class MethodCalls { + + private LinkedHashMap> mapWrapper; + + public MethodCalls() { + mapWrapper = new LinkedHashMap>(); + } + + public void put(String methodName, Object... args) { + mapWrapper.put(methodName, Arrays.asList(args)); + } + + public int size() { + return mapWrapper.size(); + } + + public boolean hasOneEntry() { + return size() == 1; + } + + public Map.Entry> getFirstEntry() { + return mapWrapper.entrySet().iterator().next(); + } + + public String getFirstEntryMethod() { + return getFirstEntry().getKey(); + } + + public List getFirstEntryArgs() { + return getFirstEntry().getValue(); + } + + public Object getFirstEntryArg(int number) { + return getFirstEntryArgs().get(number); + } + } + + /** + * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method + *

+ * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method + */ + class GCWALPartialMock extends GarbageCollectWriteAheadLogs { + + private boolean holdsLockBool = false; + - public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException { - super(i, vm, useTrash); ++ public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException { ++ super(ctx, vm, useTrash); + this.holdsLockBool = holdLock; + } + + public MethodCalls methodCalls = new MethodCalls(); + + @Override + boolean holdsLock(HostAndPort addr) { + return holdsLockBool; + } + + @Override + void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + methodCalls.put("removeWALFromDownTserver", address, conf, entry, status); + } + + @Override + void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry> entry, final GCStatus status) { + methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status); + } + + @Override + void removeOldStyleWAL(Entry> entry, final GCStatus status) { + methodCalls.put("removeOldStyleWAL", entry, status); + } + + @Override + void removeSortedWAL(Path swalog) { + methodCalls.put("removeSortedWAL", swalog); + } + } + + private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException { - return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked); ++ AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo"))); ++ return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked); + } + + private Map getEmptyMap() { + return new HashMap(); + } + + private Map> getServerToFileMap1(String key, Path singlePath) { + Map> serverToFileMap = new HashMap>(); + serverToFileMap.put(key, new ArrayList(Arrays.asList(singlePath))); + return serverToFileMap; + } + + @Test + public void testRemoveFilesWithOldStyle() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1("", p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod()); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0)); + assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1)); + } + + @Test + public void testRemoveFilesWithDeadTservers() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false); + String server = "tserver1+9997"; + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1(server, p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod()); + assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0)); + assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2)); + assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3)); + } + + @Test + public void testRemoveFilesWithLiveTservers() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + String server = "tserver1+9997"; + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString()); + Map> serverToFileMap = getServerToFileMap1(server, p1); + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status); + + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod()); + assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0)); + assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration); + Entry> firstServerToFileMap = serverToFileMap.entrySet().iterator().next(); + assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2)); + assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3)); + } + + @Test + public void testRemoveFilesRemovesSortedWALs() throws IOException { + GCStatus status = new GCStatus(); + GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true); + Map> serverToFileMap = new HashMap>(); + Map sortedWALogs = new HashMap(); + Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString()); + sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed + + realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status); + MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls; + assertEquals("Only one method should have been called", 1, calls.size()); + assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod()); + assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0)); + + } + + static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver"; + static String GCWAL_DEAD_TSERVER = "tserver1"; + static String GCWAL_DEAD_TSERVER_PORT = "9995"; + static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString(); + + class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs { + - public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException { - super(i, vm, useTrash); ++ public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException { ++ super(ctx, vm, useTrash); + } + + @Override + boolean holdsLock(HostAndPort addr) { + // tries use zookeeper + return false; + } + + @Override + Map getSortedWALogs() { + return new HashMap(); + } + + @Override + int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { + String sep = File.separator; + Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep + + GCWAL_DEAD_TSERVER_COLLECT_FILE); + fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT); + nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p); + return 1; + } + + @Override + int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { + return 0; + } + + long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) { + // tries to use zookeeper + return 1000l; + } + } + + @Test + public void testCollectWithDeadTserver() throws IOException, InterruptedException { + Instance i = new MockInstance(); ++ AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(i)); + File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR); + File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT); + File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE); + if (!walFileDir.exists()) { - walFileDir.mkdirs(); ++ assertTrue("Directory was made", walFileDir.mkdirs()); + new FileOutputStream(walFile).close(); + } + + try { + VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString()); - GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false); ++ GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false); + GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); + + gcwal2.collect(status); + + assertTrue("File should not be deleted", walFile.exists()); + assertEquals("Should have one candidate", 1, status.lastLog.getCandidates()); + assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted()); + + sleep(2000); + gcwal2.collect(status); + + assertFalse("File should be gone", walFile.exists()); + assertEquals("Should have one candidate", 1, status.lastLog.getCandidates()); + assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted()); + + } finally { + if (walDir.exists()) { + FileUtils.deleteDirectory(walDir); + } + } + } }