accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [15/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL metadata table updates"
Date Sun, 10 May 2015 21:06:01 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 40acf8b..1735c0d 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,56 +28,49 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
+import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.LiveTServerSet.Listener;
-import org.apache.accumulo.server.master.state.MetaDataStateStore;
-import org.apache.accumulo.server.master.state.RootTabletStateStore;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -86,8 +79,8 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final boolean useTrash;
-  private final LiveTServerSet liveServers;
+
+  private boolean useTrash;
 
   /**
    * Creates a new GC WAL object.
@@ -103,35 +96,56 @@ public class GarbageCollectWriteAheadLogs {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
-    this.liveServers = new LiveTServerSet(context, new Listener() {
-      @Override
-      public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
-        log.debug("New tablet servers noticed: " + added);
-        log.debug("Tablet servers removed: " + deleted);
-      }
-    });
-    liveServers.startListeningForTabletServerChanges();
+  }
+
+  /**
+   * Gets the instance used by this object.
+   *
+   * @return instance
+   */
+  Instance getInstance() {
+    return context.getInstance();
+  }
+
+  /**
+   * Gets the volume manager used by this object.
+   *
+   * @return volume manager
+   */
+  VolumeManager getVolumeManager() {
+    return fs;
+  }
+
+  /**
+   * Checks if the volume manager should move files to the trash rather than delete them.
+   *
+   * @return true if trash is used
+   */
+  boolean isUsingTrash() {
+    return useTrash;
   }
 
   public void collect(GCStatus status) {
 
-    Span span = Trace.start("getCandidates");
+    Span span = Trace.start("scanServers");
     try {
-      Set<TServerInstance> currentServers = liveServers.getCurrentServers();
+
+      Map<String,Path> sortedWALogs = getSortedWALogs();
 
       status.currentLog.started = System.currentTimeMillis();
 
-      Map<TServerInstance,Set<Path>> candidates = new HashMap<>();
-      long count = getCurrent(candidates, currentServers);
+      Map<Path,String> fileToServerMap = new HashMap<Path,String>();
+      Map<String,Path> nameToFileMap = new HashMap<String,Path>();
+      int count = scanServers(fileToServerMap, nameToFileMap);
       long fileScanStop = System.currentTimeMillis();
-
-      log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.));
-      status.currentLog.candidates = count;
+      log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+          (fileScanStop - status.currentLog.started) / 1000.));
+      status.currentLog.candidates = fileToServerMap.size();
       span.stop();
 
-      span = Trace.start("removeEntriesInUse");
+      span = Trace.start("removeMetadataEntries");
       try {
-        count = removeEntriesInUse(candidates, status, currentServers);
+        count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
         return;
@@ -144,7 +158,7 @@ public class GarbageCollectWriteAheadLogs {
 
       span = Trace.start("removeReplicationEntries");
       try {
-        count = removeReplicationEntries(candidates, status);
+        count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
       } catch (Exception ex) {
         log.error("Unable to scan replication table", ex);
         return;
@@ -156,22 +170,16 @@ public class GarbageCollectWriteAheadLogs {
       log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
 
       span = Trace.start("removeFiles");
+      Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
 
-      count = removeFiles(candidates, status);
+      count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
 
       long removeStop = System.currentTimeMillis();
-      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
-      span.stop();
-
-      span = Trace.start("removeMarkers");
-      count = removeTabletServerMarkers(candidates);
-      long removeMarkersStop = System.currentTimeMillis();
-      log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
-      span.stop();
-
+      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
       status.currentLog.finished = removeStop;
       status.lastLog = status.currentLog;
       status.currentLog = new GcCycleStats();
+      span.stop();
 
     } catch (Exception e) {
       log.error("exception occured while garbage collecting write ahead logs", e);
@@ -180,100 +188,161 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
-    long result = 0;
+  boolean holdsLock(HostAndPort addr) {
     try {
-      BatchWriter root = null;
-      BatchWriter meta = null;
-      try {
-        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-        for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-          Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
-          for (Path path : entry.getValue()) {
-            m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
-            result++;
+      String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
+      List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
+      return !(children == null || children.isEmpty());
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (Exception ex) {
+      log.debug(ex.toString(), ex);
+      return true;
+    }
+  }
+
+  private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+    for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
+      if (entry.getKey().isEmpty()) {
+        // old-style log entry, just remove it
+        for (Path path : entry.getValue()) {
+          log.debug("Removing old-style WAL " + path);
+          try {
+            if (!useTrash || !fs.moveToTrash(path))
+              fs.deleteRecursively(path);
+            status.currentLog.deleted++;
+          } catch (FileNotFoundException ex) {
+            // ignored
+          } catch (IOException ex) {
+            log.error("Unable to delete wal " + path + ": " + ex);
           }
-          root.addMutation(m);
-          meta.addMutation(m);
         }
-      } finally {
-        if (meta != null) {
-          meta.close();
-        }
-        if (root != null) {
-          root.close();
+      } else {
+        HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+        if (!holdsLock(address)) {
+          for (Path path : entry.getValue()) {
+            log.debug("Removing WAL for offline server " + path);
+            try {
+              if (!useTrash || !fs.moveToTrash(path))
+                fs.deleteRecursively(path);
+              status.currentLog.deleted++;
+            } catch (FileNotFoundException ex) {
+              // ignored
+            } catch (IOException ex) {
+              log.error("Unable to delete wal " + path + ": " + ex);
+            }
+          }
+          continue;
+        } else {
+          Client tserver = null;
+          try {
+            tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
+            tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
+            log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+            status.currentLog.deleted += entry.getValue().size();
+          } catch (TException e) {
+            log.warn("Error talking to " + address + ": " + e);
+          } finally {
+            if (tserver != null)
+              ThriftUtil.returnClient(tserver);
+          }
         }
       }
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
     }
-    return result;
-  }
 
-  private long removeFiles(Map<TServerInstance,Set<Path>> candidates, final GCStatus status) {
-    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-      for (Path path : entry.getValue()) {
-        log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
+    for (Path swalog : sortedWALogs.values()) {
+      log.debug("Removing sorted WAL " + swalog);
+      try {
+        if (!useTrash || !fs.moveToTrash(swalog)) {
+          fs.deleteRecursively(swalog);
+        }
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ioe) {
         try {
-          if (!useTrash || !fs.moveToTrash(path))
-            fs.deleteRecursively(path);
-          status.currentLog.deleted++;
-        } catch (FileNotFoundException ex) {
-          // ignored
+          if (fs.exists(swalog)) {
+            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+          }
         } catch (IOException ex) {
-          log.error("Unable to delete wal " + path + ": " + ex);
+          log.error("Unable to check for the existence of " + swalog, ex);
         }
       }
     }
-    return status.currentLog.deleted;
-  }
 
-  private UUID path2uuid(Path path) {
-    return UUID.fromString(path.getName());
+    return 0;
   }
 
-  private long removeEntriesInUse(Map<TServerInstance,Set<Path>> candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException,
-      KeeperException, InterruptedException {
-
-    // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+  /**
+   * Converts a list of paths to their corresponding strings.
+   *
+   * @param paths
+   *          list of paths
+   * @return string forms of paths
+   */
+  static List<String> paths2strings(List<Path> paths) {
+    List<String> result = new ArrayList<String>(paths.size());
+    for (Path path : paths)
+      result.add(path.toString());
+    return result;
+  }
 
-    Map<UUID,TServerInstance> walToDeadServer = new HashMap<>();
-    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-      for (Path file : entry.getValue()) {
-        walToDeadServer.put(path2uuid(file), entry.getKey());
+  /**
+   * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
+   * mapping of file names to paths is skipped.
+   *
+   * @param fileToServerMap
+   *          map of file paths to servers
+   * @param nameToFileMap
+   *          map of file names to paths
+   * @return map of servers to lists of file paths
+   */
+  static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
+    Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
+    for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
+      if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
+        continue;
+      ArrayList<Path> files = result.get(fileServer.getValue());
+      if (files == null) {
+        files = new ArrayList<Path>();
+        result.put(fileServer.getValue(), files);
       }
+      files.add(fileServer.getKey());
     }
-    long count = 0;
-    RootTabletStateStore root = new RootTabletStateStore(context);
-    MetaDataStateStore meta = new MetaDataStateStore(context);
-    Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
-    while (states.hasNext()) {
-      count++;
-      TabletLocationState state = states.next();
-      if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
-        candidates.remove(state.current);
-      }
-      for (Collection<String> wals : state.walogs) {
-        for (String wal : wals) {
-          UUID walUUID = path2uuid(new Path(wal));
-          TServerInstance dead = walToDeadServer.get(walUUID);
-          if (dead != null) {
-            Iterator<Path> iter = candidates.get(dead).iterator();
-            while (iter.hasNext()) {
-              if (path2uuid(iter.next()).equals(walUUID)) {
-                iter.remove();
-                break;
-              }
-            }
-          }
+    return result;
+  }
+
+  protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+      InterruptedException {
+    int count = 0;
+    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
+
+    // For each WAL reference in the metadata table
+    while (iterator.hasNext()) {
+      // Each metadata reference has at least one WAL file
+      for (String entry : iterator.next().logSet) {
+        // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
+        // the last "/" will mark a UUID file name.
+        String uuid = entry.substring(entry.lastIndexOf("/") + 1);
+        if (!isUUID(uuid)) {
+          // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
+          throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
         }
+
+        Path pathFromNN = nameToFileMap.remove(uuid);
+        if (pathFromNN != null) {
+          status.currentLog.inUse++;
+          sortedWALogs.remove(uuid);
+        }
+
+        count++;
       }
     }
+
     return count;
   }
 
-  protected int removeReplicationEntries(Map<TServerInstance,Set<Path>> candidates, GCStatus status) throws IOException, KeeperException, InterruptedException {
+  protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+      InterruptedException {
     Connector conn;
     try {
       conn = context.getConnector();
@@ -284,25 +353,21 @@ public class GarbageCollectWriteAheadLogs {
 
     int count = 0;
 
-    Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
+    Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
 
     while (walIter.hasNext()) {
-      Entry<TServerInstance,Set<Path>> wal = walIter.next();
-      Iterator<Path> paths = wal.getValue().iterator();
-      while (paths.hasNext()) {
-        Path fullPath = paths.next();
-        if (neededByReplication(conn, fullPath)) {
-          log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
-          // If we haven't already removed it, check to see if this WAL is
-          // "in use" by replication (needed for replication purposes)
-          status.currentLog.inUse++;
-          paths.remove();
-        } else {
-          log.debug("WAL not needed for replication {}", fullPath);
-        }
-      }
-      if (wal.getValue().isEmpty()) {
+      Entry<String,Path> wal = walIter.next();
+      String fullPath = wal.getValue().toString();
+      if (neededByReplication(conn, fullPath)) {
+        log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+        // If we haven't already removed it, check to see if this WAL is
+        // "in use" by replication (needed for replication purposes)
+        status.currentLog.inUse++;
+
         walIter.remove();
+        sortedWALogs.remove(wal.getKey());
+      } else {
+        log.debug("WAL not needed for replication {}", fullPath);
       }
       count++;
     }
@@ -317,7 +382,7 @@ public class GarbageCollectWriteAheadLogs {
    *          The full path (URI)
    * @return True if the WAL is still needed by replication (not a candidate for deletion)
    */
-  protected boolean neededByReplication(Connector conn, Path wal) {
+  protected boolean neededByReplication(Connector conn, String wal) {
     log.info("Checking replication table for " + wal);
 
     Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
@@ -340,7 +405,7 @@ public class GarbageCollectWriteAheadLogs {
     return false;
   }
 
-  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
+  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
     Scanner metaScanner;
     try {
       metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -360,7 +425,7 @@ public class GarbageCollectWriteAheadLogs {
       StatusSection.limit(replScanner);
 
       // Only look for this specific WAL
-      replScanner.setRange(Range.exact(wal.toString()));
+      replScanner.setRange(Range.exact(wal));
 
       return Iterables.concat(metaScanner, replScanner);
     } catch (ReplicationTableOfflineException e) {
@@ -370,84 +435,107 @@ public class GarbageCollectWriteAheadLogs {
     return metaScanner;
   }
 
-
-
+  private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+    return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
+  }
 
   /**
-   * Scans log markers. The map passed in is populated with the logs for dead servers.
+   * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
    *
-   * @param unusedLogs
-   *          map of dead server to log file entries
-   * @return total number of log files
+   * @param walDirs
+   *          write-ahead log directories
+   * @param fileToServerMap
+   *          map of file paths to servers
+   * @param nameToFileMap
+   *          map of file names to paths
+   * @return number of servers located (including those with no logs present)
    */
-  private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
-    Set<Path> rootWALs = new HashSet<>();
-    // Get entries in zookeeper:
-    String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
-    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    List<String> children = zoo.getChildren(zpath);
-    for (String child : children) {
-      LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
-      rootWALs.add(new Path(entry.filename));
-    }
-    long count = 0;
-
-    // get all the WAL markers that are not in zookeeper for dead servers
-    Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
-    rootScanner.setRange(CurrentLogsSection.getRange());
-    Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    metaScanner.setRange(CurrentLogsSection.getRange());
-    Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
-    Text hostAndPort = new Text();
-    Text sessionId = new Text();
-    Text filename = new Text();
-    while (entries.hasNext()) {
-      Entry<Key,Value> entry = entries.next();
-      CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
-      CurrentLogsSection.getPath(entry.getKey(), filename);
-      TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
-      Path path = new Path(filename.toString());
-      if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
-        Set<Path> logs = unusedLogs.get(tsi);
-        if (logs == null) {
-          unusedLogs.put(tsi, logs = new HashSet<Path>());
-        }
-        if (logs.add(path)) {
-          count++;
+  int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+    Set<String> servers = new HashSet<String>();
+    for (String walDir : walDirs) {
+      Path walRoot = new Path(walDir);
+      FileStatus[] listing = null;
+      try {
+        listing = fs.listStatus(walRoot);
+      } catch (FileNotFoundException e) {
+        // ignore dir
+      }
+
+      if (listing == null)
+        continue;
+      for (FileStatus status : listing) {
+        String server = status.getPath().getName();
+        if (status.isDirectory()) {
+          servers.add(server);
+          for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
+            if (isUUID(file.getPath().getName())) {
+              fileToServerMap.put(file.getPath(), server);
+              nameToFileMap.put(file.getPath().getName(), file.getPath());
+            } else {
+              log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+            }
+          }
+        } else if (isUUID(server)) {
+          // old-style WAL are not under a directory
+          servers.add("");
+          fileToServerMap.put(status.getPath(), "");
+          nameToFileMap.put(server, status.getPath());
+        } else {
+          log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
         }
       }
     }
+    return servers.size();
+  }
 
-    // scan HDFS for logs for dead servers
-    for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
-      RemoteIterator<LocatedFileStatus> iter =  volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
-      while (iter.hasNext()) {
-        LocatedFileStatus next = iter.next();
-        // recursive listing returns directories, too
-        if (next.isDirectory()) {
-          continue;
-        }
-        // make sure we've waited long enough for zookeeper propagation
-        if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
-          continue;
-        }
-        Path path = next.getPath();
-        String hostPlusPort = path.getParent().getName();
-        // server is still alive, or has a replacement
-        TServerInstance instance = liveServers.find(hostPlusPort);
-        if (instance != null) {
-          continue;
-        }
-        TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
-        Set<Path> paths = unusedLogs.get(fake);
-        if (paths == null) {
-          paths = new HashSet<>();
+  private Map<String,Path> getSortedWALogs() throws IOException {
+    return getSortedWALogs(ServerConstants.getRecoveryDirs());
+  }
+
+  /**
+   * Looks for write-ahead logs in recovery directories.
+   *
+   * @param recoveryDirs
+   *          recovery directories
+   * @return map of log file names to paths
+   */
+  Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
+    Map<String,Path> result = new HashMap<String,Path>();
+
+    for (String dir : recoveryDirs) {
+      Path recoveryDir = new Path(dir);
+
+      if (fs.exists(recoveryDir)) {
+        for (FileStatus status : fs.listStatus(recoveryDir)) {
+          String name = status.getPath().getName();
+          if (isUUID(name)) {
+            result.put(name, status.getPath());
+          } else {
+            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+          }
         }
-        paths.add(path);
-        unusedLogs.put(fake, paths);
       }
     }
-    return count;
+    return result;
+  }
+
+  /**
+   * Checks if a string is a valid UUID.
+   *
+   * @param name
+   *          string to check
+   * @return true if string is a UUID
+   */
+  static boolean isUUID(String name) {
+    if (name == null || name.length() != 36) {
+      return false;
+    }
+    try {
+      UUID.fromString(name);
+      return true;
+    } catch (IllegalArgumentException ex) {
+      return false;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 4f64c15..037023a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -569,6 +569,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
         replSpan.stop();
       }
 
+      // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
         GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb4b341..78ac4ac 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,11 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
@@ -182,21 +184,20 @@ public class CloseWriteAheadLogReferences implements Runnable {
     try {
       // TODO Configurable number of threads
       bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
-      bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-      bs.fetchColumnFamily(CurrentLogsSection.COLF);
+      bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+      bs.fetchColumnFamily(LogColumnFamily.NAME);
 
       // For each log key/value in the metadata table
       for (Entry<Key,Value> entry : bs) {
-        if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
-          continue;
-        }
-        Text tpath = new Text();
-        CurrentLogsSection.getPath(entry.getKey(), tpath);
-        String path = new Path(tpath.toString()).toString();
-        log.debug("Found WAL " + path.toString());
+        // The value may contain multiple WALs
+        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+        log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
 
         // Normalize each log file (using Path) and add it to the set
-        referencedWals.add(normalizedWalPaths.get(path));
+        for (String logFile : logEntry.logSet) {
+          referencedWals.add(normalizedWalPaths.get(logFile));
+        }
       }
     } catch (TableNotFoundException e) {
       // uhhhh

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
new file mode 100644
index 0000000..5801faa
--- /dev/null
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class GarbageCollectWriteAheadLogsTest {
+  private static final long BLOCK_SIZE = 64000000L;
+
+  private static final Path DIR_1_PATH = new Path("/dir1");
+  private static final Path DIR_2_PATH = new Path("/dir2");
+  private static final Path DIR_3_PATH = new Path("/dir3");
+  private static final String UUID1 = UUID.randomUUID().toString();
+  private static final String UUID2 = UUID.randomUUID().toString();
+  private static final String UUID3 = UUID.randomUUID().toString();
+
+  private Instance instance;
+  private AccumuloConfiguration systemConfig;
+  private VolumeManager volMgr;
+  private GarbageCollectWriteAheadLogs gcwal;
+  private long modTime;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUp() throws Exception {
+    SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+    instance = createMock(Instance.class);
+    expect(instance.getInstanceID()).andReturn("mock").anyTimes();
+    expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+    expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+    systemConfig = new ConfigurationCopy(new HashMap<String,String>());
+    volMgr = createMock(VolumeManager.class);
+    ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
+    expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
+    expect(factory.getInstance()).andReturn(instance).anyTimes();
+    expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+    // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+    // Presently, we only need get(Property) and iterator().
+    EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+      @Override
+      public String answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        return systemConfig.get((Property) args[0]);
+      }
+    }).anyTimes();
+    EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
+      @Override
+      public Boolean answer() {
+        Object[] args = EasyMock.getCurrentArguments();
+        return systemConfig.getBoolean((Property) args[0]);
+      }
+    }).anyTimes();
+
+    EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+      @Override
+      public Iterator<Entry<String,String>> answer() {
+        return systemConfig.iterator();
+      }
+    }).anyTimes();
+
+    replay(instance, factory, siteConfig);
+    AccumuloServerContext context = new AccumuloServerContext(factory);
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    modTime = System.currentTimeMillis();
+  }
+
+  @Test
+  public void testGetters() {
+    assertSame(instance, gcwal.getInstance());
+    assertSame(volMgr, gcwal.getVolumeManager());
+    assertFalse(gcwal.isUsingTrash());
+  }
+
+  @Test
+  public void testPathsToStrings() {
+    ArrayList<Path> paths = new ArrayList<Path>();
+    paths.add(new Path(DIR_1_PATH, "file1"));
+    paths.add(DIR_2_PATH);
+    paths.add(new Path(DIR_3_PATH, "file3"));
+    List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
+    int len = 3;
+    assertEquals(len, strings.size());
+    for (int i = 0; i < len; i++) {
+      assertEquals(paths.get(i).toString(), strings.get(i));
+    }
+  }
+
+  @Test
+  public void testMapServersToFiles() {
+    // @formatter:off
+    /*
+     * Test fileToServerMap:
+     * /dir1/server1/uuid1 -> server1 (new-style)
+     * /dir1/uuid2 -> "" (old-style)
+     * /dir3/server3/uuid3 -> server3 (new-style)
+     */
+    // @formatter:on
+    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+    Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
+    fileToServerMap.put(path1, "server1"); // new-style
+    Path path2 = new Path(DIR_1_PATH, UUID2);
+    fileToServerMap.put(path2, ""); // old-style
+    Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
+    fileToServerMap.put(path3, "server3"); // old-style
+    // @formatter:off
+    /*
+     * Test nameToFileMap:
+     * uuid1 -> /dir1/server1/uuid1
+     * uuid3 -> /dir3/server3/uuid3
+     */
+    // @formatter:on
+    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+    nameToFileMap.put(UUID1, path1);
+    nameToFileMap.put(UUID3, path3);
+
+    // @formatter:off
+    /*
+     * Expected map:
+     * server1 -> [ /dir1/server1/uuid1 ]
+     * server3 -> [ /dir3/server3/uuid3 ]
+     */
+    // @formatter:on
+    Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
+    assertEquals(2, result.size());
+    ArrayList<Path> list1 = result.get("server1");
+    assertEquals(1, list1.size());
+    assertTrue(list1.contains(path1));
+    ArrayList<Path> list3 = result.get("server3");
+    assertEquals(1, list3.size());
+    assertTrue(list3.contains(path3));
+  }
+
+  private FileStatus makeFileStatus(int size, Path path) {
+    boolean isDir = (size == 0);
+    return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
+  }
+
+  private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
+    expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
+  }
+
+  @Test
+  public void testScanServers_NewStyle() throws Exception {
+    String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+    // @formatter:off
+    /*
+     * Test directory layout:
+     * /dir1/
+     *   server1/
+     *     uuid1
+     *     file2
+     *   subdir2/
+     * /dir2/ missing
+     * /dir3/
+     *   server3/
+     *     uuid3
+     */
+    // @formatter:on
+    Path serverDir1Path = new Path(DIR_1_PATH, "server1");
+    FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
+    Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
+    FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
+    mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
+    Path path1 = new Path(serverDir1Path, UUID1);
+    FileStatus file1 = makeFileStatus(100, path1);
+    FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
+    mockListStatus(serverDir1Path, file1, file2);
+    mockListStatus(subDir2Path);
+    expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
+    Path serverDir3Path = new Path(DIR_3_PATH, "server3");
+    FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
+    mockListStatus(DIR_3_PATH, serverDir3);
+    Path path3 = new Path(serverDir3Path, UUID3);
+    FileStatus file3 = makeFileStatus(300, path3);
+    mockListStatus(serverDir3Path, file3);
+    replay(volMgr);
+
+    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+    int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
+    assertEquals(3, count);
+    // @formatter:off
+    /*
+     * Expected fileToServerMap:
+     * /dir1/server1/uuid1 -> server1
+     * /dir3/server3/uuid3 -> server3
+     */
+    // @formatter:on
+    assertEquals(2, fileToServerMap.size());
+    assertEquals("server1", fileToServerMap.get(path1));
+    assertEquals("server3", fileToServerMap.get(path3));
+    // @formatter:off
+    /*
+     * Expected nameToFileMap:
+     * uuid1 -> /dir1/server1/uuid1
+     * uuid3 -> /dir3/server3/uuid3
+     */
+    // @formatter:on
+    assertEquals(2, nameToFileMap.size());
+    assertEquals(path1, nameToFileMap.get(UUID1));
+    assertEquals(path3, nameToFileMap.get(UUID3));
+  }
+
+  @Test
+  public void testScanServers_OldStyle() throws Exception {
+    // @formatter:off
+    /*
+     * Test directory layout:
+     * /dir1/
+     *   uuid1
+     * /dir3/
+     *   uuid3
+     */
+    // @formatter:on
+    String[] walDirs = new String[] {"/dir1", "/dir3"};
+    Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
+    FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
+    mockListStatus(DIR_1_PATH, serverFile1);
+    Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
+    FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
+    mockListStatus(DIR_3_PATH, serverFile3);
+    replay(volMgr);
+
+    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+    int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
+    /*
+     * Expect only a single server, the non-server entry for upgrade WALs
+     */
+    assertEquals(1, count);
+    // @formatter:off
+    /*
+     * Expected fileToServerMap:
+     * /dir1/uuid1 -> ""
+     * /dir3/uuid3 -> ""
+     */
+    // @formatter:on
+    assertEquals(2, fileToServerMap.size());
+    assertEquals("", fileToServerMap.get(serverFile1Path));
+    assertEquals("", fileToServerMap.get(serverFile3Path));
+    // @formatter:off
+    /*
+     * Expected nameToFileMap:
+     * uuid1 -> /dir1/uuid1
+     * uuid3 -> /dir3/uuid3
+     */
+    // @formatter:on
+    assertEquals(2, nameToFileMap.size());
+    assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
+    assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
+  }
+
+  @Test
+  public void testGetSortedWALogs() throws Exception {
+    String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+    // @formatter:off
+    /*
+     * Test directory layout:
+     * /dir1/
+     *   uuid1
+     *   file2
+     * /dir2/ missing
+     * /dir3/
+     *   uuid3
+     */
+    // @formatter:on
+    expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
+    expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
+    expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
+    Path path1 = new Path(DIR_1_PATH, UUID1);
+    FileStatus file1 = makeFileStatus(100, path1);
+    FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
+    mockListStatus(DIR_1_PATH, file1, file2);
+    Path path3 = new Path(DIR_3_PATH, UUID3);
+    FileStatus file3 = makeFileStatus(300, path3);
+    mockListStatus(DIR_3_PATH, file3);
+    replay(volMgr);
+
+    Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
+    // @formatter:off
+    /*
+     * Expected map:
+     * uuid1 -> /dir1/uuid1
+     * uuid3 -> /dir3/uuid3
+     */
+    // @formatter:on
+    assertEquals(2, sortedWalogs.size());
+    assertEquals(path1, sortedWalogs.get(UUID1));
+    assertEquals(path3, sortedWalogs.get(UUID3));
+  }
+
+  @Test
+  public void testIsUUID() {
+    assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
+    assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
+    assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
+    assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
+  }
+
+  // It was easier to do this than get the mocking working for me
+  private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
+
+    private List<Entry<Key,Value>> replData;
+
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
+      super(context, fs, useTrash);
+      this.replData = replData;
+    }
+
+    @Override
+    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+      return this.replData;
+    }
+  }
+
+  @Test
+  public void replicationEntriesAffectGC() throws Exception {
+    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+    Connector conn = createMock(Connector.class);
+
+    // Write a Status record which should prevent file1 from being deleted
+    LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
+    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
+
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+
+    replay(conn);
+
+    // Open (not-closed) file must be retained
+    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+    // No replication data, not needed
+    replData.clear();
+    assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
+
+    // The file is closed but not replicated, must be retained
+    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
+    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+    // File is closed and fully replicated, can be deleted
+    replData.clear();
+    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
+        ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
+    assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
+  }
+
+  @Test
+  public void removeReplicationEntries() throws Exception {
+    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+    Instance inst = new MockInstance(testName.getMethodName());
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+    long file1CreateTime = System.currentTimeMillis();
+    long file2CreateTime = file1CreateTime + 50;
+    BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
+    Mutation m = new Mutation("/wals/" + file1);
+    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+    bw.addMutation(m);
+    m = new Mutation("/wals/" + file2);
+    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+    bw.addMutation(m);
+
+    // These WALs are potential candidates for deletion from fs
+    Map<String,Path> nameToFileMap = new HashMap<>();
+    nameToFileMap.put(file1, new Path("/wals/" + file1));
+    nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+    Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+    // Make the GCStatus and GcCycleStats
+    GCStatus status = new GCStatus();
+    GcCycleStats cycleStats = new GcCycleStats();
+    status.currentLog = cycleStats;
+
+    // We should iterate over two entries
+    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+    // We should have noted that two files were still in use
+    Assert.assertEquals(2l, cycleStats.inUse);
+
+    // Both should have been deleted
+    Assert.assertEquals(0, nameToFileMap.size());
+  }
+
+  @Test
+  public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
+    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+    Instance inst = new MockInstance(testName.getMethodName());
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+    Connector conn = context.getConnector();
+
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+    long file1CreateTime = System.currentTimeMillis();
+    long file2CreateTime = file1CreateTime + 50;
+    // Write some records to the metadata table, we haven't yet written status records to the replication table
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+    bw.addMutation(m);
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+    bw.addMutation(m);
+
+    // These WALs are potential candidates for deletion from fs
+    Map<String,Path> nameToFileMap = new HashMap<>();
+    nameToFileMap.put(file1, new Path("/wals/" + file1));
+    nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+    Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+    // Make the GCStatus and GcCycleStats objects
+    GCStatus status = new GCStatus();
+    GcCycleStats cycleStats = new GcCycleStats();
+    status.currentLog = cycleStats;
+
+    // We should iterate over two entries
+    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+    // We should have noted that two files were still in use
+    Assert.assertEquals(2l, cycleStats.inUse);
+
+    // Both should have been deleted
+    Assert.assertEquals(0, nameToFileMap.size());
+  }
+
+  @Test
+  public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
+    Instance inst = new MockInstance(testName.getMethodName());
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+    Connector conn = context.getConnector();
+
+    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+    bw.addMutation(m);
+    bw.close();
+
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+    Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(data);
+
+    Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
+  }
+
+  @Test
+  public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
+    Instance inst = new MockInstance(testName.getMethodName());
+    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+    Connector conn = context.getConnector();
+
+    long walCreateTime = System.currentTimeMillis();
+    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+    bw.addMutation(m);
+    bw.close();
+
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(wal);
+    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+    bw.addMutation(m);
+    bw.close();
+
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+    Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
+    Map<Key,Value> data = new HashMap<>();
+    for (Entry<Key,Value> e : iter) {
+      data.put(e.getKey(), e.getValue());
+    }
+
+    Assert.assertEquals(2, data.size());
+
+    // Should get one element from each table (metadata and replication)
+    for (Key k : data.keySet()) {
+      String row = k.getRow().toString();
+      if (row.startsWith(ReplicationSection.getRowPrefix())) {
+        Assert.assertTrue(row.endsWith(wal));
+      } else {
+        Assert.assertEquals(wal, row);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 78a5bd5..3115de1 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -50,12 +50,14 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -128,16 +130,22 @@ public class CloseWriteAheadLogReferencesTest {
   public void findOneWalFromMetadata() throws Exception {
     Connector conn = createMock(Connector.class);
     BatchScanner bs = createMock(BatchScanner.class);
+
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    data.add(entry("tserver1:9997[1234567890]", file));
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 1;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
+    bs.fetchColumnFamily(LogColumnFamily.NAME);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -155,12 +163,54 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Validate
     Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(file), wals);
+    Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+
+    verify(conn, bs);
+  }
+
+  @Test
+  public void findManyWalFromSingleMetadata() throws Exception {
+    Connector conn = createMock(Connector.class);
+    BatchScanner bs = createMock(BatchScanner.class);
+
+    // Fake out some data
+    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 1;
+    // Multiple DFSLoggers
+    logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    // Get a batchscanner, scan the tablets section, fetch only the logs
+    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
+    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+    expectLastCall().once();
+    bs.fetchColumnFamily(LogColumnFamily.NAME);
+    expectLastCall().once();
+    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
+
+      @Override
+      public Iterator<Entry<Key,Value>> answer() throws Throwable {
+        return data.iterator();
+      }
+
+    });
+    // Close the bs
+    bs.close();
+    expectLastCall().once();
+
+    replay(conn, bs);
+
+    // Validate
+    Set<String> wals = refs.getReferencedWals(conn);
+    Assert.assertEquals(logEntry.logSet, wals);
 
     verify(conn, bs);
   }
 
-  // This is a silly test now
   @Test
   public void findManyRefsToSingleWalFromMetadata() throws Exception {
     Connector conn = createMock(Connector.class);
@@ -170,14 +220,31 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
-    data.add(entry("tserver1:9997[0123456789]", filename));
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 1;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 2;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 3;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
+    bs.fetchColumnFamily(LogColumnFamily.NAME);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -195,7 +262,7 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Validate
     Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(filename), wals);
+    Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
 
     verify(conn, bs);
   }
@@ -205,22 +272,59 @@ public class CloseWriteAheadLogReferencesTest {
     Connector conn = createMock(Connector.class);
     BatchScanner bs = createMock(BatchScanner.class);
 
-    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
-    String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
+    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
+        + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
 
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-
-    data.add(entry("tserver1:9997[1234567890]", file1));
-    data.add(entry("tserver2:9997[1234567891]", file2));
-    data.add(entry("tserver3:9997[1234567891]", file3));
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+    logEntry.filename = file1;
+    logEntry.server = "tserver1";
+    logEntry.tabletId = 1;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("5"), null, null);
+    logEntry.tabletId = 2;
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
+    logEntry.filename = file2;
+    logEntry.server = "tserver2";
+    logEntry.tabletId = 3;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
+    logEntry.tabletId = 4;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
+    logEntry.filename = file3;
+    logEntry.server = "tserver3";
+    logEntry.tabletId = 5;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
+    logEntry.server = "tserver3";
+    logEntry.tabletId = 7;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
+    logEntry.server = "tserver3";
+    logEntry.tabletId = 15;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
+    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
+    bs.fetchColumnFamily(LogColumnFamily.NAME);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -243,11 +347,6 @@ public class CloseWriteAheadLogReferencesTest {
     verify(conn, bs);
   }
 
-  private static Entry<Key,Value> entry(String session, String file) {
-    Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
-    return Maps.immutableEntry(key, new Value());
-  }
-
   @Test
   public void unusedWalsAreClosed() throws Exception {
     Set<String> wals = Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9a324fb..d00785a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -423,9 +423,6 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
           perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
         }
         perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
-
-        // add the currlog location for root tablet current logs
-        zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
         haveUpgradedZooKeeper = true;
       } catch (Exception ex) {
         // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 3f15b39..592d9ae 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -173,8 +173,7 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
           scanner.setRange(MetadataSchema.TabletsSection.getRange());
         } else {
           scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
-          Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
-          scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
+          scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
         }
         TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
         TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 2b874f6..4c47953 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -163,7 +163,6 @@ class TabletGroupWatcher extends Daemon {
         List<Assignment> assigned = new ArrayList<Assignment>();
         List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
         Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
-        Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
 
         MasterState masterState = master.getMasterState();
         int[] counts = new int[TabletState.values().length];
@@ -176,7 +175,6 @@ class TabletGroupWatcher extends Daemon {
           if (tls == null) {
             continue;
           }
-          Master.log.debug(store.name() + " location State: " + tls);
           // ignore entries for tables that do not exist in zookeeper
           if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
             continue;
@@ -186,7 +184,7 @@ class TabletGroupWatcher extends Daemon {
 
           // Don't overwhelm the tablet servers with work
           if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-            flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
+            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
             assignments.clear();
             assigned.clear();
             assignedToDeadServers.clear();
@@ -206,9 +204,8 @@ class TabletGroupWatcher extends Daemon {
           TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
           TServerInstance server = tls.getServer();
           TabletState state = tls.getState(currentTServers.keySet());
-          if (Master.log.isTraceEnabled()) {
-            Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent);
-          }
+          if (Master.log.isTraceEnabled())
+            Master.log.trace("Goal state " + goal + " current " + state);
           stats.update(tableId, state);
           mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
           sendChopRequest(mergeStats.getMergeInfo(), state, tls);
@@ -242,7 +239,7 @@ class TabletGroupWatcher extends Daemon {
                 assignedToDeadServers.add(tls);
                 if (server.equals(this.master.migrations.get(tls.extent)))
                   this.master.migrations.remove(tls.extent);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                // log.info("Current servers " + currentTServers.keySet());
                 break;
               case UNASSIGNED:
                 // maybe it's a finishing migration
@@ -276,7 +273,7 @@ class TabletGroupWatcher extends Daemon {
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
                 assignedToDeadServers.add(tls);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                // log.info("Current servers " + currentTServers.keySet());
                 break;
               case HOSTED:
                 TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -295,8 +292,7 @@ class TabletGroupWatcher extends Daemon {
           counts[state.ordinal()]++;
         }
 
-        flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
-        store.markLogsAsUnused(master, logsForDeadServers);
+        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
 
         // provide stats after flushing changes to avoid race conditions w/ delete table
         stats.end(masterState);
@@ -316,12 +312,8 @@ class TabletGroupWatcher extends Daemon {
 
         updateMergeState(mergeStatsCache);
 
-        if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
-          Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
-          eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
-        } else {
-          Master.log.info("Detected change in current tserver set, re-running state machine.");
-        }
+        Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+        eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
       } catch (Exception ex) {
         Master.log.error("Error processing table state for store " + store.name(), ex);
         if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) {
@@ -739,13 +731,11 @@ class TabletGroupWatcher extends Daemon {
   }
 
   private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
-      List<TabletLocationState> assignedToDeadServers, Map<TServerInstance,List<Path>> logsForDeadServers, Map<KeyExtent,TServerInstance> unassigned)
-      throws DistributedStoreException, TException {
+      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
     if (!assignedToDeadServers.isEmpty()) {
       int maxServersToShow = min(assignedToDeadServers.size(), 100);
       Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
-      Master.log.debug("logs for dead servers: " + logsForDeadServers);
-      store.unassign(assignedToDeadServers, logsForDeadServers);
+      store.unassign(assignedToDeadServers);
       this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 74e9b78..8532e1b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,7 +107,6 @@ public class WorkMaker {
         // Don't create the record if we have nothing to do.
         // TODO put this into a filter on serverside
         if (!shouldCreateWork(status)) {
-          log.debug("Not creating work: " + status.toString());
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index f1763be..a3c7e46 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.cli.ClientOpts;
@@ -188,7 +187,7 @@ public class MergeStats {
     Text tableId = extent.getTableId();
     Text first = KeyExtent.getMetadataEntry(tableId, start);
     Range range = new Range(first, false, null, true);
-    scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
+    scanner.setRange(range);
     KeyExtent prevExtent = null;
 
     log.debug("Scanning range " + range);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
index 3c3bc37..6790858 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.master;
 
+import java.util.Arrays;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
@@ -305,7 +306,13 @@ public class ReplicationOperationsImplTest {
     bw.addMutation(m);
     bw.close();
 
-    LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
+    LogEntry logEntry = new LogEntry();
+    logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
+    logEntry.server = "tserver";
+    logEntry.filename = file1;
+    logEntry.tabletId = 1;
+    logEntry.logSet = Arrays.asList(file1);
+    logEntry.timestamp = System.currentTimeMillis();
 
     bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
     m = new Mutation(ReplicationSection.getRowPrefix() + file1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index 8cbea68..634ee89 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -186,7 +186,7 @@ public class TestMergeState {
     // take it offline
     m = tablet.getPrevRowUpdateMutation();
     Collection<Collection<String>> walogs = Collections.emptyList();
-    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
+    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
 
     // now we can split
     stats = scan(state, metaDataStateStore);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index 4002da5..d2cc0cf 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -181,7 +181,7 @@ public class RootTabletStateStoreTest {
     } catch (BadLocationStateException e) {
       fail("Unexpected error " + e);
     }
-    tstore.unassign(Collections.singletonList(assigned), null);
+    tstore.unassign(Collections.singletonList(assigned));
     count = 0;
     for (TabletLocationState location : tstore) {
       assertEquals(location.extent, root);
@@ -209,7 +209,7 @@ public class RootTabletStateStoreTest {
       fail("Unexpected error " + e);
     }
     try {
-      tstore.unassign(Collections.singletonList(broken), null);
+      tstore.unassign(Collections.singletonList(broken));
       Assert.fail("should not get here");
     } catch (IllegalArgumentException ex) {}
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/findbugs/exclude-filter.xml b/server/tserver/src/main/findbugs/exclude-filter.xml
index a334163..47dd1f5 100644
--- a/server/tserver/src/main/findbugs/exclude-filter.xml
+++ b/server/tserver/src/main/findbugs/exclude-filter.xml
@@ -18,7 +18,7 @@
   <Match>
     <!-- locking is confusing, but probably correct -->
     <Class name="org.apache.accumulo.tserver.tablet.Tablet" />
-    <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,org.apache.accumulo.tserver.log.DfsLogger,boolean" returns="boolean" />
+    <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,java.util.Collection,boolean" returns="boolean" />
     <Bug code="UL" pattern="UL_UNRELEASED_LOCK" />
   </Match>
   <Match>


Mime
View raw message