Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4EFEC17344 for ; Sun, 10 May 2015 21:05:48 +0000 (UTC) Received: (qmail 60255 invoked by uid 500); 10 May 2015 21:05:48 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 60147 invoked by uid 500); 10 May 2015 21:05:48 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 59614 invoked by uid 99); 10 May 2015 21:05:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 10 May 2015 21:05:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9FD20E17FC; Sun, 10 May 2015 21:05:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Sun, 10 May 2015 21:05:57 -0000 Message-Id: <470906e7e3b34dd5b19b0cd655bca1fa@git.apache.org> In-Reply-To: <2967ab54764d4e9381dd1547a2998da1@git.apache.org> References: <2967ab54764d4e9381dd1547a2998da1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL metadata table updates" http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 40acf8b..1735c0d 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -18,7 +18,7 @@ package org.apache.accumulo.gc; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Collection; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -28,56 +28,49 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTableOfflineException; +import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; -import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.server.master.LiveTServerSet; -import org.apache.accumulo.server.master.LiveTServerSet.Listener; -import org.apache.accumulo.server.master.state.MetaDataStateStore; -import org.apache.accumulo.server.master.state.RootTabletStateStore; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.master.state.TabletLocationState; -import org.apache.accumulo.server.master.state.TabletState; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; @@ -86,8 +79,8 @@ public class GarbageCollectWriteAheadLogs { private final AccumuloServerContext context; private final VolumeManager fs; - private final boolean useTrash; - private final LiveTServerSet liveServers; + + private boolean useTrash; /** * Creates a new GC WAL object. @@ -103,35 +96,56 @@ public class GarbageCollectWriteAheadLogs { this.context = context; this.fs = fs; this.useTrash = useTrash; - this.liveServers = new LiveTServerSet(context, new Listener() { - @Override - public void update(LiveTServerSet current, Set deleted, Set added) { - log.debug("New tablet servers noticed: " + added); - log.debug("Tablet servers removed: " + deleted); - } - }); - liveServers.startListeningForTabletServerChanges(); + } + + /** + * Gets the instance used by this object. + * + * @return instance + */ + Instance getInstance() { + return context.getInstance(); + } + + /** + * Gets the volume manager used by this object. + * + * @return volume manager + */ + VolumeManager getVolumeManager() { + return fs; + } + + /** + * Checks if the volume manager should move files to the trash rather than delete them. + * + * @return true if trash is used + */ + boolean isUsingTrash() { + return useTrash; } public void collect(GCStatus status) { - Span span = Trace.start("getCandidates"); + Span span = Trace.start("scanServers"); try { - Set currentServers = liveServers.getCurrentServers(); + + Map sortedWALogs = getSortedWALogs(); status.currentLog.started = System.currentTimeMillis(); - Map> candidates = new HashMap<>(); - long count = getCurrent(candidates, currentServers); + Map fileToServerMap = new HashMap(); + Map nameToFileMap = new HashMap(); + int count = scanServers(fileToServerMap, nameToFileMap); long fileScanStop = System.currentTimeMillis(); - - log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.)); - status.currentLog.candidates = count; + log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count, + (fileScanStop - status.currentLog.started) / 1000.)); + status.currentLog.candidates = fileToServerMap.size(); span.stop(); - span = Trace.start("removeEntriesInUse"); + span = Trace.start("removeMetadataEntries"); try { - count = removeEntriesInUse(candidates, status, currentServers); + count = removeMetadataEntries(nameToFileMap, sortedWALogs, status); } catch (Exception ex) { log.error("Unable to scan metadata table", ex); return; @@ -144,7 +158,7 @@ public class GarbageCollectWriteAheadLogs { span = Trace.start("removeReplicationEntries"); try { - count = removeReplicationEntries(candidates, status); + count = removeReplicationEntries(nameToFileMap, sortedWALogs, status); } catch (Exception ex) { log.error("Unable to scan replication table", ex); return; @@ -156,22 +170,16 @@ public class GarbageCollectWriteAheadLogs { log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.)); span = Trace.start("removeFiles"); + Map> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap); - count = removeFiles(candidates, status); + count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status); long removeStop = System.currentTimeMillis(); - log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.)); - span.stop(); - - span = Trace.start("removeMarkers"); - count = removeTabletServerMarkers(candidates); - long removeMarkersStop = System.currentTimeMillis(); - log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.)); - span.stop(); - + log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.)); status.currentLog.finished = removeStop; status.lastLog = status.currentLog; status.currentLog = new GcCycleStats(); + span.stop(); } catch (Exception e) { log.error("exception occured while garbage collecting write ahead logs", e); @@ -180,100 +188,161 @@ public class GarbageCollectWriteAheadLogs { } } - private long removeTabletServerMarkers(Map> candidates) { - long result = 0; + boolean holdsLock(HostAndPort addr) { try { - BatchWriter root = null; - BatchWriter meta = null; - try { - root = context.getConnector().createBatchWriter(RootTable.NAME, null); - meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); - for (Entry> entry : candidates.entrySet()) { - Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString()); - for (Path path : entry.getValue()) { - m.putDelete(CurrentLogsSection.COLF, new Text(path.toString())); - result++; + String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString(); + List children = ZooReaderWriter.getInstance().getChildren(zpath); + return !(children == null || children.isEmpty()); + } catch (KeeperException.NoNodeException ex) { + return false; + } catch (Exception ex) { + log.debug(ex.toString(), ex); + return true; + } + } + + private int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { + for (Entry> entry : serverToFileMap.entrySet()) { + if (entry.getKey().isEmpty()) { + // old-style log entry, just remove it + for (Path path : entry.getValue()) { + log.debug("Removing old-style WAL " + path); + try { + if (!useTrash || !fs.moveToTrash(path)) + fs.deleteRecursively(path); + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete wal " + path + ": " + ex); } - root.addMutation(m); - meta.addMutation(m); } - } finally { - if (meta != null) { - meta.close(); - } - if (root != null) { - root.close(); + } else { + HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); + if (!holdsLock(address)) { + for (Path path : entry.getValue()) { + log.debug("Removing WAL for offline server " + path); + try { + if (!useTrash || !fs.moveToTrash(path)) + fs.deleteRecursively(path); + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ex) { + log.error("Unable to delete wal " + path + ": " + ex); + } + } + continue; + } else { + Client tserver = null; + try { + tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); + tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue())); + log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); + status.currentLog.deleted += entry.getValue().size(); + } catch (TException e) { + log.warn("Error talking to " + address + ": " + e); + } finally { + if (tserver != null) + ThriftUtil.returnClient(tserver); + } } } - } catch (Exception ex) { - throw new RuntimeException(ex); } - return result; - } - private long removeFiles(Map> candidates, final GCStatus status) { - for (Entry> entry : candidates.entrySet()) { - for (Path path : entry.getValue()) { - log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path); + for (Path swalog : sortedWALogs.values()) { + log.debug("Removing sorted WAL " + swalog); + try { + if (!useTrash || !fs.moveToTrash(swalog)) { + fs.deleteRecursively(swalog); + } + } catch (FileNotFoundException ex) { + // ignored + } catch (IOException ioe) { try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored + if (fs.exists(swalog)) { + log.error("Unable to delete sorted walog " + swalog + ": " + ioe); + } } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); + log.error("Unable to check for the existence of " + swalog, ex); } } } - return status.currentLog.deleted; - } - private UUID path2uuid(Path path) { - return UUID.fromString(path.getName()); + return 0; } - private long removeEntriesInUse(Map> candidates, GCStatus status, Set liveServers) throws IOException, - KeeperException, InterruptedException { - - // remove any entries if there's a log reference, or a tablet is still assigned to the dead server + /** + * Converts a list of paths to their corresponding strings. + * + * @param paths + * list of paths + * @return string forms of paths + */ + static List paths2strings(List paths) { + List result = new ArrayList(paths.size()); + for (Path path : paths) + result.add(path.toString()); + return result; + } - Map walToDeadServer = new HashMap<>(); - for (Entry> entry : candidates.entrySet()) { - for (Path file : entry.getValue()) { - walToDeadServer.put(path2uuid(file), entry.getKey()); + /** + * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the + * mapping of file names to paths is skipped. + * + * @param fileToServerMap + * map of file paths to servers + * @param nameToFileMap + * map of file names to paths + * @return map of servers to lists of file paths + */ + static Map> mapServersToFiles(Map fileToServerMap, Map nameToFileMap) { + Map> result = new HashMap>(); + for (Entry fileServer : fileToServerMap.entrySet()) { + if (!nameToFileMap.containsKey(fileServer.getKey().getName())) + continue; + ArrayList files = result.get(fileServer.getValue()); + if (files == null) { + files = new ArrayList(); + result.put(fileServer.getValue(), files); } + files.add(fileServer.getKey()); } - long count = 0; - RootTabletStateStore root = new RootTabletStateStore(context); - MetaDataStateStore meta = new MetaDataStateStore(context); - Iterator states = Iterators.concat(root.iterator(), meta.iterator()); - while (states.hasNext()) { - count++; - TabletLocationState state = states.next(); - if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { - candidates.remove(state.current); - } - for (Collection wals : state.walogs) { - for (String wal : wals) { - UUID walUUID = path2uuid(new Path(wal)); - TServerInstance dead = walToDeadServer.get(walUUID); - if (dead != null) { - Iterator iter = candidates.get(dead).iterator(); - while (iter.hasNext()) { - if (path2uuid(iter.next()).equals(walUUID)) { - iter.remove(); - break; - } - } - } + return result; + } + + protected int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { + int count = 0; + Iterator iterator = MetadataTableUtil.getLogEntries(context); + + // For each WAL reference in the metadata table + while (iterator.hasNext()) { + // Each metadata reference has at least one WAL file + for (String entry : iterator.next().logSet) { + // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases + // the last "/" will mark a UUID file name. + String uuid = entry.substring(entry.lastIndexOf("/") + 1); + if (!isUUID(uuid)) { + // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed! + throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry); } + + Path pathFromNN = nameToFileMap.remove(uuid); + if (pathFromNN != null) { + status.currentLog.inUse++; + sortedWALogs.remove(uuid); + } + + count++; } } + return count; } - protected int removeReplicationEntries(Map> candidates, GCStatus status) throws IOException, KeeperException, InterruptedException { + protected int removeReplicationEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, + InterruptedException { Connector conn; try { conn = context.getConnector(); @@ -284,25 +353,21 @@ public class GarbageCollectWriteAheadLogs { int count = 0; - Iterator>> walIter = candidates.entrySet().iterator(); + Iterator> walIter = nameToFileMap.entrySet().iterator(); while (walIter.hasNext()) { - Entry> wal = walIter.next(); - Iterator paths = wal.getValue().iterator(); - while (paths.hasNext()) { - Path fullPath = paths.next(); - if (neededByReplication(conn, fullPath)) { - log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); - // If we haven't already removed it, check to see if this WAL is - // "in use" by replication (needed for replication purposes) - status.currentLog.inUse++; - paths.remove(); - } else { - log.debug("WAL not needed for replication {}", fullPath); - } - } - if (wal.getValue().isEmpty()) { + Entry wal = walIter.next(); + String fullPath = wal.getValue().toString(); + if (neededByReplication(conn, fullPath)) { + log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); + // If we haven't already removed it, check to see if this WAL is + // "in use" by replication (needed for replication purposes) + status.currentLog.inUse++; + walIter.remove(); + sortedWALogs.remove(wal.getKey()); + } else { + log.debug("WAL not needed for replication {}", fullPath); } count++; } @@ -317,7 +382,7 @@ public class GarbageCollectWriteAheadLogs { * The full path (URI) * @return True if the WAL is still needed by replication (not a candidate for deletion) */ - protected boolean neededByReplication(Connector conn, Path wal) { + protected boolean neededByReplication(Connector conn, String wal) { log.info("Checking replication table for " + wal); Iterable> iter = getReplicationStatusForFile(conn, wal); @@ -340,7 +405,7 @@ public class GarbageCollectWriteAheadLogs { return false; } - protected Iterable> getReplicationStatusForFile(Connector conn, Path wal) { + protected Iterable> getReplicationStatusForFile(Connector conn, String wal) { Scanner metaScanner; try { metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); @@ -360,7 +425,7 @@ public class GarbageCollectWriteAheadLogs { StatusSection.limit(replScanner); // Only look for this specific WAL - replScanner.setRange(Range.exact(wal.toString())); + replScanner.setRange(Range.exact(wal)); return Iterables.concat(metaScanner, replScanner); } catch (ReplicationTableOfflineException e) { @@ -370,84 +435,107 @@ public class GarbageCollectWriteAheadLogs { return metaScanner; } - - + private int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { + return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap); + } /** - * Scans log markers. The map passed in is populated with the logs for dead servers. + * Scans write-ahead log directories for logs. The maps passed in are populated with scan information. * - * @param unusedLogs - * map of dead server to log file entries - * @return total number of log files + * @param walDirs + * write-ahead log directories + * @param fileToServerMap + * map of file paths to servers + * @param nameToFileMap + * map of file names to paths + * @return number of servers located (including those with no logs present) */ - private long getCurrent(Map > unusedLogs, Set currentServers) throws Exception { - Set rootWALs = new HashSet<>(); - // Get entries in zookeeper: - String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS; - ZooReaderWriter zoo = ZooReaderWriter.getInstance(); - List children = zoo.getChildren(zpath); - for (String child : children) { - LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null)); - rootWALs.add(new Path(entry.filename)); - } - long count = 0; - - // get all the WAL markers that are not in zookeeper for dead servers - Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY); - rootScanner.setRange(CurrentLogsSection.getRange()); - Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); - metaScanner.setRange(CurrentLogsSection.getRange()); - Iterator> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator()); - Text hostAndPort = new Text(); - Text sessionId = new Text(); - Text filename = new Text(); - while (entries.hasNext()) { - Entry entry = entries.next(); - CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId); - CurrentLogsSection.getPath(entry.getKey(), filename); - TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString()); - Path path = new Path(filename.toString()); - if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) { - Set logs = unusedLogs.get(tsi); - if (logs == null) { - unusedLogs.put(tsi, logs = new HashSet()); - } - if (logs.add(path)) { - count++; + int scanServers(String[] walDirs, Map fileToServerMap, Map nameToFileMap) throws Exception { + Set servers = new HashSet(); + for (String walDir : walDirs) { + Path walRoot = new Path(walDir); + FileStatus[] listing = null; + try { + listing = fs.listStatus(walRoot); + } catch (FileNotFoundException e) { + // ignore dir + } + + if (listing == null) + continue; + for (FileStatus status : listing) { + String server = status.getPath().getName(); + if (status.isDirectory()) { + servers.add(server); + for (FileStatus file : fs.listStatus(new Path(walRoot, server))) { + if (isUUID(file.getPath().getName())) { + fileToServerMap.put(file.getPath(), server); + nameToFileMap.put(file.getPath().getName(), file.getPath()); + } else { + log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid"); + } + } + } else if (isUUID(server)) { + // old-style WAL are not under a directory + servers.add(""); + fileToServerMap.put(status.getPath(), ""); + nameToFileMap.put(server, status.getPath()); + } else { + log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); } } } + return servers.size(); + } - // scan HDFS for logs for dead servers - for (Volume volume : VolumeManagerImpl.get().getVolumes()) { - RemoteIterator iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true); - while (iter.hasNext()) { - LocatedFileStatus next = iter.next(); - // recursive listing returns directories, too - if (next.isDirectory()) { - continue; - } - // make sure we've waited long enough for zookeeper propagation - if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) { - continue; - } - Path path = next.getPath(); - String hostPlusPort = path.getParent().getName(); - // server is still alive, or has a replacement - TServerInstance instance = liveServers.find(hostPlusPort); - if (instance != null) { - continue; - } - TServerInstance fake = new TServerInstance(hostPlusPort, 0L); - Set paths = unusedLogs.get(fake); - if (paths == null) { - paths = new HashSet<>(); + private Map getSortedWALogs() throws IOException { + return getSortedWALogs(ServerConstants.getRecoveryDirs()); + } + + /** + * Looks for write-ahead logs in recovery directories. + * + * @param recoveryDirs + * recovery directories + * @return map of log file names to paths + */ + Map getSortedWALogs(String[] recoveryDirs) throws IOException { + Map result = new HashMap(); + + for (String dir : recoveryDirs) { + Path recoveryDir = new Path(dir); + + if (fs.exists(recoveryDir)) { + for (FileStatus status : fs.listStatus(recoveryDir)) { + String name = status.getPath().getName(); + if (isUUID(name)) { + result.put(name, status.getPath()); + } else { + log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); + } } - paths.add(path); - unusedLogs.put(fake, paths); } } - return count; + return result; + } + + /** + * Checks if a string is a valid UUID. + * + * @param name + * string to check + * @return true if string is a UUID + */ + static boolean isUUID(String name) { + if (name == null || name.length() != 36) { + return false; + } + try { + UUID.fromString(name); + return true; + } catch (IllegalArgumentException ex) { + return false; + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 4f64c15..037023a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -569,6 +569,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa replSpan.stop(); } + // Clean up any unused write-ahead logs Span waLogs = Trace.start("walogs"); try { GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index cb4b341..78ac4ac 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -37,11 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; @@ -182,21 +184,20 @@ public class CloseWriteAheadLogReferences implements Runnable { try { // TODO Configurable number of threads bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); - bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); - bs.fetchColumnFamily(CurrentLogsSection.COLF); + bs.setRanges(Collections.singleton(TabletsSection.getRange())); + bs.fetchColumnFamily(LogColumnFamily.NAME); // For each log key/value in the metadata table for (Entry entry : bs) { - if (entry.getValue().equals(CurrentLogsSection.UNUSED)) { - continue; - } - Text tpath = new Text(); - CurrentLogsSection.getPath(entry.getKey(), tpath); - String path = new Path(tpath.toString()).toString(); - log.debug("Found WAL " + path.toString()); + // The value may contain multiple WALs + LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); + + log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet); // Normalize each log file (using Path) and add it to the set - referencedWals.add(normalizedWalPaths.get(path)); + for (String logFile : logEntry.logSet) { + referencedWals.add(normalizedWalPaths.get(logFile)); + } } } catch (TableNotFoundException e) { // uhhhh http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java new file mode 100644 index 0000000..5801faa --- /dev/null +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -0,0 +1,567 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.gc; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +public class GarbageCollectWriteAheadLogsTest { + private static final long BLOCK_SIZE = 64000000L; + + private static final Path DIR_1_PATH = new Path("/dir1"); + private static final Path DIR_2_PATH = new Path("/dir2"); + private static final Path DIR_3_PATH = new Path("/dir3"); + private static final String UUID1 = UUID.randomUUID().toString(); + private static final String UUID2 = UUID.randomUUID().toString(); + private static final String UUID3 = UUID.randomUUID().toString(); + + private Instance instance; + private AccumuloConfiguration systemConfig; + private VolumeManager volMgr; + private GarbageCollectWriteAheadLogs gcwal; + private long modTime; + + @Rule + public TestName testName = new TestName(); + + @Before + public void setUp() throws Exception { + SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); + instance = createMock(Instance.class); + expect(instance.getInstanceID()).andReturn("mock").anyTimes(); + expect(instance.getZooKeepers()).andReturn("localhost").anyTimes(); + expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes(); + systemConfig = new ConfigurationCopy(new HashMap()); + volMgr = createMock(VolumeManager.class); + ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class); + expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes(); + expect(factory.getInstance()).andReturn(instance).anyTimes(); + expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); + + // Just make the SiteConfiguration delegate to our AccumuloConfiguration + // Presently, we only need get(Property) and iterator(). + EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer() { + @Override + public String answer() { + Object[] args = EasyMock.getCurrentArguments(); + return systemConfig.get((Property) args[0]); + } + }).anyTimes(); + EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer() { + @Override + public Boolean answer() { + Object[] args = EasyMock.getCurrentArguments(); + return systemConfig.getBoolean((Property) args[0]); + } + }).anyTimes(); + + EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer>>() { + @Override + public Iterator> answer() { + return systemConfig.iterator(); + } + }).anyTimes(); + + replay(instance, factory, siteConfig); + AccumuloServerContext context = new AccumuloServerContext(factory); + gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false); + modTime = System.currentTimeMillis(); + } + + @Test + public void testGetters() { + assertSame(instance, gcwal.getInstance()); + assertSame(volMgr, gcwal.getVolumeManager()); + assertFalse(gcwal.isUsingTrash()); + } + + @Test + public void testPathsToStrings() { + ArrayList paths = new ArrayList(); + paths.add(new Path(DIR_1_PATH, "file1")); + paths.add(DIR_2_PATH); + paths.add(new Path(DIR_3_PATH, "file3")); + List strings = GarbageCollectWriteAheadLogs.paths2strings(paths); + int len = 3; + assertEquals(len, strings.size()); + for (int i = 0; i < len; i++) { + assertEquals(paths.get(i).toString(), strings.get(i)); + } + } + + @Test + public void testMapServersToFiles() { + // @formatter:off + /* + * Test fileToServerMap: + * /dir1/server1/uuid1 -> server1 (new-style) + * /dir1/uuid2 -> "" (old-style) + * /dir3/server3/uuid3 -> server3 (new-style) + */ + // @formatter:on + Map fileToServerMap = new java.util.HashMap(); + Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1); + fileToServerMap.put(path1, "server1"); // new-style + Path path2 = new Path(DIR_1_PATH, UUID2); + fileToServerMap.put(path2, ""); // old-style + Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3); + fileToServerMap.put(path3, "server3"); // old-style + // @formatter:off + /* + * Test nameToFileMap: + * uuid1 -> /dir1/server1/uuid1 + * uuid3 -> /dir3/server3/uuid3 + */ + // @formatter:on + Map nameToFileMap = new java.util.HashMap(); + nameToFileMap.put(UUID1, path1); + nameToFileMap.put(UUID3, path3); + + // @formatter:off + /* + * Expected map: + * server1 -> [ /dir1/server1/uuid1 ] + * server3 -> [ /dir3/server3/uuid3 ] + */ + // @formatter:on + Map> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap); + assertEquals(2, result.size()); + ArrayList list1 = result.get("server1"); + assertEquals(1, list1.size()); + assertTrue(list1.contains(path1)); + ArrayList list3 = result.get("server3"); + assertEquals(1, list3.size()); + assertTrue(list3.contains(path3)); + } + + private FileStatus makeFileStatus(int size, Path path) { + boolean isDir = (size == 0); + return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path); + } + + private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception { + expect(volMgr.listStatus(dir)).andReturn(fileStatuses); + } + + @Test + public void testScanServers_NewStyle() throws Exception { + String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"}; + // @formatter:off + /* + * Test directory layout: + * /dir1/ + * server1/ + * uuid1 + * file2 + * subdir2/ + * /dir2/ missing + * /dir3/ + * server3/ + * uuid3 + */ + // @formatter:on + Path serverDir1Path = new Path(DIR_1_PATH, "server1"); + FileStatus serverDir1 = makeFileStatus(0, serverDir1Path); + Path subDir2Path = new Path(DIR_1_PATH, "subdir2"); + FileStatus serverDir2 = makeFileStatus(0, subDir2Path); + mockListStatus(DIR_1_PATH, serverDir1, serverDir2); + Path path1 = new Path(serverDir1Path, UUID1); + FileStatus file1 = makeFileStatus(100, path1); + FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2")); + mockListStatus(serverDir1Path, file1, file2); + mockListStatus(subDir2Path); + expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException()); + Path serverDir3Path = new Path(DIR_3_PATH, "server3"); + FileStatus serverDir3 = makeFileStatus(0, serverDir3Path); + mockListStatus(DIR_3_PATH, serverDir3); + Path path3 = new Path(serverDir3Path, UUID3); + FileStatus file3 = makeFileStatus(300, path3); + mockListStatus(serverDir3Path, file3); + replay(volMgr); + + Map fileToServerMap = new java.util.HashMap(); + Map nameToFileMap = new java.util.HashMap(); + int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap); + assertEquals(3, count); + // @formatter:off + /* + * Expected fileToServerMap: + * /dir1/server1/uuid1 -> server1 + * /dir3/server3/uuid3 -> server3 + */ + // @formatter:on + assertEquals(2, fileToServerMap.size()); + assertEquals("server1", fileToServerMap.get(path1)); + assertEquals("server3", fileToServerMap.get(path3)); + // @formatter:off + /* + * Expected nameToFileMap: + * uuid1 -> /dir1/server1/uuid1 + * uuid3 -> /dir3/server3/uuid3 + */ + // @formatter:on + assertEquals(2, nameToFileMap.size()); + assertEquals(path1, nameToFileMap.get(UUID1)); + assertEquals(path3, nameToFileMap.get(UUID3)); + } + + @Test + public void testScanServers_OldStyle() throws Exception { + // @formatter:off + /* + * Test directory layout: + * /dir1/ + * uuid1 + * /dir3/ + * uuid3 + */ + // @formatter:on + String[] walDirs = new String[] {"/dir1", "/dir3"}; + Path serverFile1Path = new Path(DIR_1_PATH, UUID1); + FileStatus serverFile1 = makeFileStatus(100, serverFile1Path); + mockListStatus(DIR_1_PATH, serverFile1); + Path serverFile3Path = new Path(DIR_3_PATH, UUID3); + FileStatus serverFile3 = makeFileStatus(300, serverFile3Path); + mockListStatus(DIR_3_PATH, serverFile3); + replay(volMgr); + + Map fileToServerMap = new java.util.HashMap(); + Map nameToFileMap = new java.util.HashMap(); + int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap); + /* + * Expect only a single server, the non-server entry for upgrade WALs + */ + assertEquals(1, count); + // @formatter:off + /* + * Expected fileToServerMap: + * /dir1/uuid1 -> "" + * /dir3/uuid3 -> "" + */ + // @formatter:on + assertEquals(2, fileToServerMap.size()); + assertEquals("", fileToServerMap.get(serverFile1Path)); + assertEquals("", fileToServerMap.get(serverFile3Path)); + // @formatter:off + /* + * Expected nameToFileMap: + * uuid1 -> /dir1/uuid1 + * uuid3 -> /dir3/uuid3 + */ + // @formatter:on + assertEquals(2, nameToFileMap.size()); + assertEquals(serverFile1Path, nameToFileMap.get(UUID1)); + assertEquals(serverFile3Path, nameToFileMap.get(UUID3)); + } + + @Test + public void testGetSortedWALogs() throws Exception { + String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"}; + // @formatter:off + /* + * Test directory layout: + * /dir1/ + * uuid1 + * file2 + * /dir2/ missing + * /dir3/ + * uuid3 + */ + // @formatter:on + expect(volMgr.exists(DIR_1_PATH)).andReturn(true); + expect(volMgr.exists(DIR_2_PATH)).andReturn(false); + expect(volMgr.exists(DIR_3_PATH)).andReturn(true); + Path path1 = new Path(DIR_1_PATH, UUID1); + FileStatus file1 = makeFileStatus(100, path1); + FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2")); + mockListStatus(DIR_1_PATH, file1, file2); + Path path3 = new Path(DIR_3_PATH, UUID3); + FileStatus file3 = makeFileStatus(300, path3); + mockListStatus(DIR_3_PATH, file3); + replay(volMgr); + + Map sortedWalogs = gcwal.getSortedWALogs(recoveryDirs); + // @formatter:off + /* + * Expected map: + * uuid1 -> /dir1/uuid1 + * uuid3 -> /dir3/uuid3 + */ + // @formatter:on + assertEquals(2, sortedWalogs.size()); + assertEquals(path1, sortedWalogs.get(UUID1)); + assertEquals(path3, sortedWalogs.get(UUID3)); + } + + @Test + public void testIsUUID() { + assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString())); + assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo")); + assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString())); + assertFalse(GarbageCollectWriteAheadLogs.isUUID(null)); + } + + // It was easier to do this than get the mocking working for me + private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs { + + private List> replData; + + ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List> replData) throws IOException { + super(context, fs, useTrash); + this.replData = replData; + } + + @Override + protected Iterable> getReplicationStatusForFile(Connector conn, String wal) { + return this.replData; + } + } + + @Test + public void replicationEntriesAffectGC() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + Connector conn = createMock(Connector.class); + + // Write a Status record which should prevent file1 from being deleted + LinkedList> replData = new LinkedList<>(); + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()))); + + ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData); + + replay(conn); + + // Open (not-closed) file must be retained + assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); + + // No replication data, not needed + replData.clear(); + assertFalse(replGC.neededByReplication(conn, "/wals/" + file2)); + + // The file is closed but not replicated, must be retained + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue())); + assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); + + // File is closed and fully replicated, can be deleted + replData.clear(); + replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), + ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build()))); + assertFalse(replGC.neededByReplication(conn, "/wals/" + file1)); + } + + @Test + public void removeReplicationEntries() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + long file1CreateTime = System.currentTimeMillis(); + long file2CreateTime = file1CreateTime + 50; + BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector()); + Mutation m = new Mutation("/wals/" + file1); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); + bw.addMutation(m); + m = new Mutation("/wals/" + file2); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); + bw.addMutation(m); + + // These WALs are potential candidates for deletion from fs + Map nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + + Map sortedWALogs = Collections.emptyMap(); + + // Make the GCStatus and GcCycleStats + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + + // Both should have been deleted + Assert.assertEquals(0, nameToFileMap.size()); + } + + @Test + public void replicationEntriesOnlyInMetaPreventGC() throws Exception { + String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); + + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + + Connector conn = context.getConnector(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + long file1CreateTime = System.currentTimeMillis(); + long file2CreateTime = file1CreateTime + 50; + // Write some records to the metadata table, we haven't yet written status records to the replication table + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); + bw.addMutation(m); + + m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); + bw.addMutation(m); + + // These WALs are potential candidates for deletion from fs + Map nameToFileMap = new HashMap<>(); + nameToFileMap.put(file1, new Path("/wals/" + file1)); + nameToFileMap.put(file2, new Path("/wals/" + file2)); + + Map sortedWALogs = Collections.emptyMap(); + + // Make the GCStatus and GcCycleStats objects + GCStatus status = new GCStatus(); + GcCycleStats cycleStats = new GcCycleStats(); + status.currentLog = cycleStats; + + // We should iterate over two entries + Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); + + // We should have noted that two files were still in use + Assert.assertEquals(2l, cycleStats.inUse); + + // Both should have been deleted + Assert.assertEquals(0, nameToFileMap.size()); + } + + @Test + public void noReplicationTableDoesntLimitMetatdataResults() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + Connector conn = context.getConnector(); + + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + Iterable> data = gcWALs.getReplicationStatusForFile(conn, wal); + Entry entry = Iterables.getOnlyElement(data); + + Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString()); + } + + @Test + public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { + Instance inst = new MockInstance(testName.getMethodName()); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); + Connector conn = context.getConnector(); + + long walCreateTime = System.currentTimeMillis(); + String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); + m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); + bw.addMutation(m); + bw.close(); + + bw = ReplicationTable.getBatchWriter(conn); + m = new Mutation(wal); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); + bw.addMutation(m); + bw.close(); + + GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); + + Iterable> iter = gcWALs.getReplicationStatusForFile(conn, wal); + Map data = new HashMap<>(); + for (Entry e : iter) { + data.put(e.getKey(), e.getValue()); + } + + Assert.assertEquals(2, data.size()); + + // Should get one element from each table (metadata and replication) + for (Key k : data.keySet()) { + String row = k.getRow().toString(); + if (row.startsWith(ReplicationSection.getRowPrefix())) { + Assert.assertTrue(row.endsWith(wal)); + } else { + Assert.assertEquals(wal, row); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index 78a5bd5..3115de1 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -50,12 +50,14 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.conf.ServerConfigurationFactory; @@ -128,16 +130,22 @@ public class CloseWriteAheadLogReferencesTest { public void findOneWalFromMetadata() throws Exception { Connector conn = createMock(Connector.class); BatchScanner bs = createMock(BatchScanner.class); + // Fake out some data final ArrayList> data = new ArrayList<>(); - String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID(); - data.add(entry("tserver1:9997[1234567890]", file)); + LogEntry logEntry = new LogEntry(); + logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); + logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID(); + logEntry.server = "tserver1"; + logEntry.tabletId = 1; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); + bs.setRanges(Collections.singleton(TabletsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(CurrentLogsSection.COLF); + bs.fetchColumnFamily(LogColumnFamily.NAME); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -155,12 +163,54 @@ public class CloseWriteAheadLogReferencesTest { // Validate Set wals = refs.getReferencedWals(conn); - Assert.assertEquals(Collections.singleton(file), wals); + Assert.assertEquals(Collections.singleton(logEntry.filename), wals); + + verify(conn, bs); + } + + @Test + public void findManyWalFromSingleMetadata() throws Exception { + Connector conn = createMock(Connector.class); + BatchScanner bs = createMock(BatchScanner.class); + + // Fake out some data + final ArrayList> data = new ArrayList<>(); + LogEntry logEntry = new LogEntry(); + logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); + logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID(); + logEntry.server = "tserver1"; + logEntry.tabletId = 1; + // Multiple DFSLoggers + logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID()); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + // Get a batchscanner, scan the tablets section, fetch only the logs + expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); + bs.setRanges(Collections.singleton(TabletsSection.getRange())); + expectLastCall().once(); + bs.fetchColumnFamily(LogColumnFamily.NAME); + expectLastCall().once(); + expect(bs.iterator()).andAnswer(new IAnswer>>() { + + @Override + public Iterator> answer() throws Throwable { + return data.iterator(); + } + + }); + // Close the bs + bs.close(); + expectLastCall().once(); + + replay(conn, bs); + + // Validate + Set wals = refs.getReferencedWals(conn); + Assert.assertEquals(logEntry.logSet, wals); verify(conn, bs); } - // This is a silly test now @Test public void findManyRefsToSingleWalFromMetadata() throws Exception { Connector conn = createMock(Connector.class); @@ -170,14 +220,31 @@ public class CloseWriteAheadLogReferencesTest { // Fake out some data final ArrayList> data = new ArrayList<>(); - String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid; - data.add(entry("tserver1:9997[0123456789]", filename)); + LogEntry logEntry = new LogEntry(); + logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); + logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid; + logEntry.server = "tserver1"; + logEntry.tabletId = 1; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b")); + logEntry.server = "tserver1"; + logEntry.tabletId = 2; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c")); + logEntry.server = "tserver1"; + logEntry.tabletId = 3; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); + bs.setRanges(Collections.singleton(TabletsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(CurrentLogsSection.COLF); + bs.fetchColumnFamily(LogColumnFamily.NAME); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -195,7 +262,7 @@ public class CloseWriteAheadLogReferencesTest { // Validate Set wals = refs.getReferencedWals(conn); - Assert.assertEquals(Collections.singleton(filename), wals); + Assert.assertEquals(Collections.singleton(logEntry.filename), wals); verify(conn, bs); } @@ -205,22 +272,59 @@ public class CloseWriteAheadLogReferencesTest { Connector conn = createMock(Connector.class); BatchScanner bs = createMock(BatchScanner.class); - String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID(); - String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID(); - String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID(); + String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/" + + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID(); // Fake out some data final ArrayList> data = new ArrayList<>(); - - data.add(entry("tserver1:9997[1234567890]", file1)); - data.add(entry("tserver2:9997[1234567891]", file2)); - data.add(entry("tserver3:9997[1234567891]", file3)); + LogEntry logEntry = new LogEntry(); + logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); + logEntry.filename = file1; + logEntry.server = "tserver1"; + logEntry.tabletId = 1; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("5"), null, null); + logEntry.tabletId = 2; + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a")); + logEntry.filename = file2; + logEntry.server = "tserver2"; + logEntry.tabletId = 3; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b")); + logEntry.tabletId = 4; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0")); + logEntry.filename = file3; + logEntry.server = "tserver3"; + logEntry.tabletId = 5; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5")); + logEntry.server = "tserver3"; + logEntry.tabletId = 7; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8")); + logEntry.server = "tserver3"; + logEntry.tabletId = 15; + logEntry.logSet = Collections.singleton(logEntry.filename); + data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); + bs.setRanges(Collections.singleton(TabletsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(CurrentLogsSection.COLF); + bs.fetchColumnFamily(LogColumnFamily.NAME); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -243,11 +347,6 @@ public class CloseWriteAheadLogReferencesTest { verify(conn, bs); } - private static Entry entry(String session, String file) { - Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file)); - return Maps.immutableEntry(key, new Value()); - } - @Test public void unusedWalsAreClosed() throws Exception { Set wals = Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 9a324fb..d00785a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -423,9 +423,6 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ); } perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE); - - // add the currlog location for root tablet current logs - zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP); haveUpgradedZooKeeper = true; } catch (Exception ex) { // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java index 3f15b39..592d9ae 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java @@ -173,8 +173,7 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli scanner.setRange(MetadataSchema.TabletsSection.getRange()); } else { scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange(); - scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange())); + scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange()); } TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner); TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 2b874f6..4c47953 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -163,7 +163,6 @@ class TabletGroupWatcher extends Daemon { List assigned = new ArrayList(); List assignedToDeadServers = new ArrayList(); Map unassigned = new HashMap(); - Map> logsForDeadServers = new TreeMap<>(); MasterState masterState = master.getMasterState(); int[] counts = new int[TabletState.values().length]; @@ -176,7 +175,6 @@ class TabletGroupWatcher extends Daemon { if (tls == null) { continue; } - Master.log.debug(store.name() + " location State: " + tls); // ignore entries for tables that do not exist in zookeeper if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null) continue; @@ -186,7 +184,7 @@ class TabletGroupWatcher extends Daemon { // Don't overwhelm the tablet servers with work if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); assignments.clear(); assigned.clear(); assignedToDeadServers.clear(); @@ -206,9 +204,8 @@ class TabletGroupWatcher extends Daemon { TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo()); TServerInstance server = tls.getServer(); TabletState state = tls.getState(currentTServers.keySet()); - if (Master.log.isTraceEnabled()) { - Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent); - } + if (Master.log.isTraceEnabled()) + Master.log.trace("Goal state " + goal + " current " + state); stats.update(tableId, state); mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); sendChopRequest(mergeStats.getMergeInfo(), state, tls); @@ -242,7 +239,7 @@ class TabletGroupWatcher extends Daemon { assignedToDeadServers.add(tls); if (server.equals(this.master.migrations.get(tls.extent))) this.master.migrations.remove(tls.extent); - MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers); + // log.info("Current servers " + currentTServers.keySet()); break; case UNASSIGNED: // maybe it's a finishing migration @@ -276,7 +273,7 @@ class TabletGroupWatcher extends Daemon { break; case ASSIGNED_TO_DEAD_SERVER: assignedToDeadServers.add(tls); - MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers); + // log.info("Current servers " + currentTServers.keySet()); break; case HOSTED: TServerConnection conn = this.master.tserverSet.getConnection(server); @@ -295,8 +292,7 @@ class TabletGroupWatcher extends Daemon { counts[state.ordinal()]++; } - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); - store.markLogsAsUnused(master, logsForDeadServers); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(masterState); @@ -316,12 +312,8 @@ class TabletGroupWatcher extends Daemon { updateMergeState(mergeStatsCache); - if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { - Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); - eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); - } else { - Master.log.info("Detected change in current tserver set, re-running state machine."); - } + Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); + eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); } catch (Exception ex) { Master.log.error("Error processing table state for store " + store.name(), ex); if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { @@ -739,13 +731,11 @@ class TabletGroupWatcher extends Daemon { } private void flushChanges(SortedMap currentTServers, List assignments, List assigned, - List assignedToDeadServers, Map> logsForDeadServers, Map unassigned) - throws DistributedStoreException, TException { + List assignedToDeadServers, Map unassigned) throws DistributedStoreException, TException { if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); - Master.log.debug("logs for dead servers: " + logsForDeadServers); - store.unassign(assignedToDeadServers, logsForDeadServers); + store.unassign(assignedToDeadServers); this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index 74e9b78..8532e1b 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -107,7 +107,6 @@ public class WorkMaker { // Don't create the record if we have nothing to do. // TODO put this into a filter on serverside if (!shouldCreateWork(status)) { - log.debug("Not creating work: " + status.toString()); continue; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java index f1763be..a3c7e46 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java @@ -30,7 +30,6 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.cli.ClientOpts; @@ -188,7 +187,7 @@ public class MergeStats { Text tableId = extent.getTableId(); Text first = KeyExtent.getMetadataEntry(tableId, start); Range range = new Range(first, false, null, true); - scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange())); + scanner.setRange(range); KeyExtent prevExtent = null; log.debug("Scanning range " + range); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java index 3c3bc37..6790858 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.master; +import java.util.Arrays; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; @@ -305,7 +306,13 @@ public class ReplicationOperationsImplTest { bw.addMutation(m); bw.close(); - LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1); + LogEntry logEntry = new LogEntry(); + logEntry.extent = new KeyExtent(new Text(tableId1), null, null); + logEntry.server = "tserver"; + logEntry.filename = file1; + logEntry.tabletId = 1; + logEntry.logSet = Arrays.asList(file1); + logEntry.timestamp = System.currentTimeMillis(); bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); m = new Mutation(ReplicationSection.getRowPrefix() + file1); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java index 8cbea68..634ee89 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -186,7 +186,7 @@ public class TestMergeState { // take it offline m = tablet.getPrevRowUpdateMutation(); Collection> walogs = Collections.emptyList(); - metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false))); // now we can split stats = scan(state, metaDataStateStore); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index 4002da5..d2cc0cf 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -181,7 +181,7 @@ public class RootTabletStateStoreTest { } catch (BadLocationStateException e) { fail("Unexpected error " + e); } - tstore.unassign(Collections.singletonList(assigned), null); + tstore.unassign(Collections.singletonList(assigned)); count = 0; for (TabletLocationState location : tstore) { assertEquals(location.extent, root); @@ -209,7 +209,7 @@ public class RootTabletStateStoreTest { fail("Unexpected error " + e); } try { - tstore.unassign(Collections.singletonList(broken), null); + tstore.unassign(Collections.singletonList(broken)); Assert.fail("should not get here"); } catch (IllegalArgumentException ex) {} } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/findbugs/exclude-filter.xml ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/findbugs/exclude-filter.xml b/server/tserver/src/main/findbugs/exclude-filter.xml index a334163..47dd1f5 100644 --- a/server/tserver/src/main/findbugs/exclude-filter.xml +++ b/server/tserver/src/main/findbugs/exclude-filter.xml @@ -18,7 +18,7 @@ - +