accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [03/34] accumulo git commit: ACCUMULO-3625 use log markers against tservers, not tablets
Date Fri, 24 Apr 2015 23:20:49 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index ed7626e..a95cffa 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -60,6 +58,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -86,6 +85,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -120,7 +120,7 @@ public class MetadataTableUtil {
     return metadataTable;
   }
 
-  private synchronized static Writer getRootTable(ClientContext context) {
+  public synchronized static Writer getRootTable(ClientContext context) {
     Credentials credentials = context.getCredentials();
     Writer rootTable = root_tables.get(credentials);
     if (rootTable == null) {
@@ -227,7 +227,7 @@ public class MetadataTableUtil {
 
       // add before removing in case of process death
       for (LogEntry logEntry : logsToAdd)
-        addLogEntry(context, logEntry, zooLock);
+        addRootLogEntry(context, zooLock, logEntry);
 
       removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
     } else {
@@ -252,6 +252,39 @@ public class MetadataTableUtil {
     }
   }
 
+  private static interface ZooOperation {
+    void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
+  }
+
+  private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
+    while (true) {
+      try {
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+        if (zoo.isLockHeld(zooLock.getLockID())) {
+          op.run(zoo);
+        }
+        break;
+      } catch (KeeperException e) {
+        log.error(e, e);
+      } catch (InterruptedException e) {
+        log.error(e, e);
+      } catch (IOException e) {
+        log.error(e, e);
+      }
+      UtilWaitThread.sleep(1000);
+    }
+  }
+
+  private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
+    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+      @Override
+      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+        String root = getZookeeperLogLocation();
+        rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+      }
+    });
+  }
+
   public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
     TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
 
@@ -451,34 +484,6 @@ public class MetadataTableUtil {
     return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
   }
 
-  public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
-    if (entry.extent.isRootTablet()) {
-      String root = getZookeeperLogLocation();
-      while (true) {
-        try {
-          IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-          if (zoo.isLockHeld(zooLock.getLockID())) {
-            String[] parts = entry.filename.split("/");
-            String uniqueId = parts[parts.length - 1];
-            zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
-          }
-          break;
-        } catch (KeeperException e) {
-          log.error(e, e);
-        } catch (InterruptedException e) {
-          log.error(e, e);
-        } catch (IOException e) {
-          log.error(e, e);
-        }
-        UtilWaitThread.sleep(1000);
-      }
-    } else {
-      Mutation m = new Mutation(entry.getRow());
-      m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
-      update(context, zooLock, m, entry.extent);
-    }
-  }
-
   public static void setRootTabletDir(String dir) throws IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -569,22 +574,11 @@ public class MetadataTableUtil {
       }
     }
 
-    Collections.sort(result, new Comparator<LogEntry>() {
-      @Override
-      public int compare(LogEntry o1, LogEntry o2) {
-        long diff = o1.timestamp - o2.timestamp;
-        if (diff < 0)
-          return -1;
-        if (diff > 0)
-          return 1;
-        return 0;
-      }
-    });
     log.info("Returning logs " + result + " for extent " + extent);
     return result;
   }
 
-  static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+  static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String root = getZookeeperLogLocation();
     // there's a little race between getting the children and fetching
@@ -592,11 +586,10 @@ public class MetadataTableUtil {
     while (true) {
       result.clear();
       for (String child : zoo.getChildren(root)) {
-        LogEntry e = new LogEntry();
         try {
-          e.fromBytes(zoo.getData(root + "/" + child, null));
+          LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
           // upgrade from !0;!0<< -> +r<<
-          e.extent = RootTable.EXTENT;
+          e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
           result.add(e);
         } catch (KeeperException.NoNodeException ex) {
           continue;
@@ -666,28 +659,23 @@ public class MetadataTableUtil {
     return new LogEntryIterator(context);
   }
 
-  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
     if (extent.isRootTablet()) {
-      for (LogEntry entry : logEntries) {
-        String root = getZookeeperLogLocation();
-        while (true) {
-          try {
-            IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-            if (zoo.isLockHeld(zooLock.getLockID())) {
-              String parts[] = entry.filename.split("/");
-              zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
-            }
-            break;
-          } catch (Exception e) {
-            log.error(e, e);
+      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = getZookeeperLogLocation();
+          for (LogEntry entry : entries) {
+            String path = root + "/" + entry.getUniqueID();
+            log.debug("Removing " + path + " from zookeeper");
+            rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
           }
-          UtilWaitThread.sleep(1000);
         }
-      }
+      });
     } else {
       Mutation m = new Mutation(extent.getMetadataEntry());
-      for (LogEntry entry : logEntries) {
-        m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
+      for (LogEntry entry : entries) {
+        m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
       }
       update(context, zooLock, m, extent);
     }
@@ -1072,4 +1060,106 @@ public class MetadataTableUtil {
     return tabletEntries;
   }
 
+  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename, KeyExtent extent) {
+    log.debug("Adding log entry " + filename);
+    if (extent.isRootTablet()) {
+      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+          String[] parts = filename.split("/");
+          String uniqueId = parts[parts.length - 1];
+          String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+          rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+        }
+      });
+    } else {
+      Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+      m.put("log", filename, new Value(EMPTY_BYTES));
+      String tableName = MetadataTable.NAME;
+      if (extent.isMeta()) {
+        tableName = RootTable.NAME;
+      }
+      try {
+        BatchWriter bw = context.getConnector().createBatchWriter(tableName, null);
+        bw.addMutation(m);
+        bw.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename) {
+    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+      @Override
+      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+        String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+        String[] parts = filename.split("/");
+        String uniqueId = parts[parts.length - 1];
+        String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+        log.debug("Removing entry " + path + " from zookeeper");
+        rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+      }
+    });
+  }
+
+  public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<String> all) throws AccumuloException {
+    try {
+      BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+      BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+      for (String fname : all) {
+        Text tname = new Text(fname.getBytes(UTF_8));
+        Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+        m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+        root.addMutation(m);
+        log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+        m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+        m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+        meta.addMutation(m);
+        removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+      }
+      root.close();
+      meta.close();
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+
+  public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<String>> logsForDeadServers)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    // already cached
+    if (logsForDeadServers.containsKey(server)) {
+      return;
+    }
+    if (extent.isRootTablet()) {
+      final List<String> logs = new ArrayList<>();
+      retryZooKeeperUpdate(context, lock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+          logs.clear();
+          for (String child : rw.getChildren(root)) {
+            logs.add(new String(rw.getData(root + "/" + child, null), UTF_8));
+          }
+        }
+      });
+      logsForDeadServers.put(server, logs);
+    } else {
+      // use the correct meta table
+      String table = MetadataTable.NAME;
+      if (extent.isMeta()) {
+        table = RootTable.NAME;
+      }
+      // fetch the current logs in use, and put them in the cache
+      Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
+      scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
+      List<String> logs = new ArrayList<>();
+      for (Entry<Key,Value> entry : scanner) {
+        logs.add(entry.getKey().getColumnQualifier().toString());
+      }
+      logsForDeadServers.put(server, logs);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 344e245..0de0b0e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.server.util;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -176,20 +175,14 @@ public class ReplicationTableUtil {
   /**
    * Write replication ingest entries for each provided file with the given {@link Status}.
    */
-  public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
+  public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
     if (log.isDebugEnabled()) {
-      log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+      log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
     }
     // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
-    if (files.isEmpty()) {
-      return;
-    }
 
     Value v = ProtobufUtil.toValue(stat);
-    for (String file : files) {
-      // TODO Can preclude this addition if the extent is for a table we don't need to replicate
-      update(context, createUpdateMutation(new Path(file), v, extent), extent);
-    }
+    update(context, createUpdateMutation(new Path(file), v, extent), extent);
   }
 
   static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 355fa42..375e263 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -94,7 +94,7 @@ public class ReplicationTableUtilTest {
     String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
 
     long createdTime = System.currentTimeMillis();
-    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
+    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
 
     verify(writer);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 35c60d6..2561eec 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -26,21 +26,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
@@ -48,29 +49,29 @@ import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.thrift.TException;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Span;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -79,8 +80,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-
-  private boolean useTrash;
+  private final boolean useTrash;
 
   /**
    * Creates a new GC WAL object.
@@ -98,54 +98,33 @@ public class GarbageCollectWriteAheadLogs {
     this.useTrash = useTrash;
   }
 
-  /**
-   * Gets the instance used by this object.
-   *
-   * @return instance
-   */
-  Instance getInstance() {
-    return context.getInstance();
-  }
-
-  /**
-   * Gets the volume manager used by this object.
-   *
-   * @return volume manager
-   */
-  VolumeManager getVolumeManager() {
-    return fs;
-  }
-
-  /**
-   * Checks if the volume manager should move files to the trash rather than delete them.
-   *
-   * @return true if trash is used
-   */
-  boolean isUsingTrash() {
-    return useTrash;
-  }
-
   public void collect(GCStatus status) {
 
-    Span span = Trace.start("scanServers");
+    Span span = Trace.start("getCandidates");
     try {
-
-      Map<String,Path> sortedWALogs = getSortedWALogs();
+      LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() {
+        @Override
+        public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+          log.debug("New tablet server noticed: " + added);
+          log.debug("Tablet server removed: " + deleted);
+        }
+      });
+      Set<TServerInstance> currentServers = liveServers.getCurrentServers();
 
       status.currentLog.started = System.currentTimeMillis();
 
-      Map<Path,String> fileToServerMap = new HashMap<Path,String>();
-      Map<String,Path> nameToFileMap = new HashMap<String,Path>();
-      int count = scanServers(fileToServerMap, nameToFileMap);
+      Map<TServerInstance, Set<String> > candidates = new HashMap<>();
+      long count = getCurrent(candidates, currentServers);
       long fileScanStop = System.currentTimeMillis();
-      log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+
+      log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(),
           (fileScanStop - status.currentLog.started) / 1000.));
-      status.currentLog.candidates = fileToServerMap.size();
+      status.currentLog.candidates = count;
       span.stop();
 
       span = Trace.start("removeMetadataEntries");
       try {
-        count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
+        count = removeMetadataEntries(candidates, status, currentServers);
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
         return;
@@ -158,7 +137,7 @@ public class GarbageCollectWriteAheadLogs {
 
       span = Trace.start("removeReplicationEntries");
       try {
-        count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
+        count = removeReplicationEntries(candidates, status);
       } catch (Exception ex) {
         log.error("Unable to scan replication table", ex);
         return;
@@ -170,16 +149,23 @@ public class GarbageCollectWriteAheadLogs {
       log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
 
       span = Trace.start("removeFiles");
-      Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
 
-      count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
+      count = removeFiles(candidates, status);
 
       long removeStop = System.currentTimeMillis();
-      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
+      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+      span.stop();
+
+      span = Trace.start("removeMarkers");
+      count = removeMarkers(candidates);
+      long removeMarkersStop = System.currentTimeMillis();
+      log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
+      span.stop();
+
+
       status.currentLog.finished = removeStop;
       status.lastLog = status.currentLog;
       status.currentLog = new GcCycleStats();
-      span.stop();
 
     } catch (Exception e) {
       log.error("exception occured while garbage collecting write ahead logs", e);
@@ -188,161 +174,82 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  boolean holdsLock(HostAndPort addr) {
+  private long removeMarkers(Map<TServerInstance,Set<String>> candidates) {
+    long result = 0;
     try {
-      String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
-      List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
-      return !(children == null || children.isEmpty());
-    } catch (KeeperException.NoNodeException ex) {
-      return false;
-    } catch (Exception ex) {
-      log.debug(ex.toString(), ex);
-      return true;
-    }
-  }
-
-  private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
-    for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
-      if (entry.getKey().isEmpty()) {
-        // old-style log entry, just remove it
-        for (Path path : entry.getValue()) {
-          log.debug("Removing old-style WAL " + path);
-          try {
-            if (!useTrash || !fs.moveToTrash(path))
-              fs.deleteRecursively(path);
-            status.currentLog.deleted++;
-          } catch (FileNotFoundException ex) {
-            // ignored
-          } catch (IOException ex) {
-            log.error("Unable to delete wal " + path + ": " + ex);
-          }
-        }
-      } else {
-        HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
-        if (!holdsLock(address)) {
-          for (Path path : entry.getValue()) {
-            log.debug("Removing WAL for offline server " + path);
-            try {
-              if (!useTrash || !fs.moveToTrash(path))
-                fs.deleteRecursively(path);
-              status.currentLog.deleted++;
-            } catch (FileNotFoundException ex) {
-              // ignored
-            } catch (IOException ex) {
-              log.error("Unable to delete wal " + path + ": " + ex);
-            }
-          }
-          continue;
-        } else {
-          Client tserver = null;
-          try {
-            tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
-            tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
-            log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-            status.currentLog.deleted += entry.getValue().size();
-          } catch (TException e) {
-            log.warn("Error talking to " + address + ": " + e);
-          } finally {
-            if (tserver != null)
-              ThriftUtil.returnClient(tserver);
-          }
+      BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+      BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+      for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+        Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.toString());
+        for (String wal : entry.getValue()) {
+          m.putDelete(CurrentLogsSection.COLF, new Text(wal));
+          result++;
         }
+        root.addMutation(m);
+        meta.addMutation(m);
       }
+      meta.close();
+      root.close();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
     }
+    return result;
+  }
 
-    for (Path swalog : sortedWALogs.values()) {
-      log.debug("Removing sorted WAL " + swalog);
-      try {
-        if (!useTrash || !fs.moveToTrash(swalog)) {
-          fs.deleteRecursively(swalog);
-        }
-      } catch (FileNotFoundException ex) {
-        // ignored
-      } catch (IOException ioe) {
+  private long removeFiles(Map<TServerInstance, Set<String> > candidates, final GCStatus status) {
+    for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+      for (String walog : entry.getValue()) {
+        log.debug("Removing WAL for offline server " + entry.getKey() + " log " + walog);
+        Path path = new Path(walog);
         try {
-          if (fs.exists(swalog)) {
-            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
-          }
+          if (!useTrash || !fs.moveToTrash(path))
+            fs.deleteRecursively(path);
+          status.currentLog.deleted++;
+        } catch (FileNotFoundException ex) {
+          // ignored
         } catch (IOException ex) {
-          log.error("Unable to check for the existence of " + swalog, ex);
+          log.error("Unable to delete wal " + path + ": " + ex);
         }
       }
     }
-
-    return 0;
+    return status.currentLog.deleted;
   }
 
-  /**
-   * Converts a list of paths to their corresponding strings.
-   *
-   * @param paths
-   *          list of paths
-   * @return string forms of paths
-   */
-  static List<String> paths2strings(List<Path> paths) {
-    List<String> result = new ArrayList<String>(paths.size());
-    for (Path path : paths)
-      result.add(path.toString());
-    return result;
-  }
+  private long removeMetadataEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
+      InterruptedException {
 
-  /**
-   * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
-   * mapping of file names to paths is skipped.
-   *
-   * @param fileToServerMap
-   *          map of file paths to servers
-   * @param nameToFileMap
-   *          map of file names to paths
-   * @return map of servers to lists of file paths
-   */
-  static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
-    Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
-    for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
-      if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
-        continue;
-      ArrayList<Path> files = result.get(fileServer.getValue());
-      if (files == null) {
-        files = new ArrayList<Path>();
-        result.put(fileServer.getValue(), files);
+    // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+
+    Map<String, TServerInstance> walToDeadServer = new HashMap<>();
+    for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+      for (String file : entry.getValue()) {
+        walToDeadServer.put(file, entry.getKey());
       }
-      files.add(fileServer.getKey());
     }
-    return result;
-  }
-
-  protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
-      InterruptedException {
-    int count = 0;
-    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
-
-    // For each WAL reference in the metadata table
-    while (iterator.hasNext()) {
-      // Each metadata reference has at least one WAL file
-      for (String entry : iterator.next().logSet) {
-        // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
-        // the last "/" will mark a UUID file name.
-        String uuid = entry.substring(entry.lastIndexOf("/") + 1);
-        if (!isUUID(uuid)) {
-          // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
-          throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
-        }
-
-        Path pathFromNN = nameToFileMap.remove(uuid);
-        if (pathFromNN != null) {
-          status.currentLog.inUse++;
-          sortedWALogs.remove(uuid);
+    long count = 0;
+    RootTabletStateStore root = new RootTabletStateStore(context);
+    MetaDataStateStore meta = new MetaDataStateStore(context);
+    Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+    while (states.hasNext()) {
+      count++;
+      TabletLocationState state = states.next();
+      if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
+        candidates.remove(state.current);
+      }
+      for (Collection<String> wals : state.walogs) {
+        for (String wal : wals) {
+          TServerInstance dead = walToDeadServer.get(wal);
+          if (dead != null) {
+            candidates.get(dead).remove(wal);
+          }
         }
-
-        count++;
       }
     }
-
     return count;
   }
 
-  protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
-      InterruptedException {
+  protected int removeReplicationEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status) throws IOException, KeeperException,
+  InterruptedException {
     Connector conn;
     try {
       conn = context.getConnector();
@@ -353,21 +260,25 @@ public class GarbageCollectWriteAheadLogs {
 
     int count = 0;
 
-    Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+    Iterator<Entry<TServerInstance,Set<String>>> walIter = candidates.entrySet().iterator();
 
     while (walIter.hasNext()) {
-      Entry<String,Path> wal = walIter.next();
-      String fullPath = wal.getValue().toString();
-      if (neededByReplication(conn, fullPath)) {
-        log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
-        // If we haven't already removed it, check to see if this WAL is
-        // "in use" by replication (needed for replication purposes)
-        status.currentLog.inUse++;
-
+      Entry<TServerInstance,Set<String>> wal = walIter.next();
+      Iterator<String> paths = wal.getValue().iterator();
+      while (paths.hasNext()) {
+        String fullPath = paths.next();
+        if (neededByReplication(conn, fullPath)) {
+          log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+          // If we haven't already removed it, check to see if this WAL is
+          // "in use" by replication (needed for replication purposes)
+          status.currentLog.inUse++;
+          paths.remove();
+        } else {
+          log.debug("WAL not needed for replication {}", fullPath);
+        }
+      }
+      if (wal.getValue().isEmpty()) {
         walIter.remove();
-        sortedWALogs.remove(wal.getKey());
-      } else {
-        log.debug("WAL not needed for replication {}", fullPath);
       }
       count++;
     }
@@ -375,6 +286,7 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
+
   /**
    * Determine if the given WAL is needed for replication
    *
@@ -435,107 +347,54 @@ public class GarbageCollectWriteAheadLogs {
     return metaScanner;
   }
 
-  private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
-    return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
-  }
-
-  /**
-   * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
-   *
-   * @param walDirs
-   *          write-ahead log directories
-   * @param fileToServerMap
-   *          map of file paths to servers
-   * @param nameToFileMap
-   *          map of file names to paths
-   * @return number of servers located (including those with no logs present)
-   */
-  int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
-    Set<String> servers = new HashSet<String>();
-    for (String walDir : walDirs) {
-      Path walRoot = new Path(walDir);
-      FileStatus[] listing = null;
-      try {
-        listing = fs.listStatus(walRoot);
-      } catch (FileNotFoundException e) {
-        // ignore dir
-      }
 
-      if (listing == null)
-        continue;
-      for (FileStatus status : listing) {
-        String server = status.getPath().getName();
-        if (status.isDirectory()) {
-          servers.add(server);
-          for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
-            if (isUUID(file.getPath().getName())) {
-              fileToServerMap.put(file.getPath(), server);
-              nameToFileMap.put(file.getPath().getName(), file.getPath());
-            } else {
-              log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
-            }
-          }
-        } else if (isUUID(server)) {
-          // old-style WAL are not under a directory
-          servers.add("");
-          fileToServerMap.put(status.getPath(), "");
-          nameToFileMap.put(server, status.getPath());
-        } else {
-          log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
-        }
-      }
-    }
-    return servers.size();
-  }
 
-  private Map<String,Path> getSortedWALogs() throws IOException {
-    return getSortedWALogs(ServerConstants.getRecoveryDirs());
-  }
 
   /**
-   * Looks for write-ahead logs in recovery directories.
+   * Scans log markers. The map passed in is populated with the logs for dead servers.
    *
-   * @param recoveryDirs
-   *          recovery directories
-   * @return map of log file names to paths
+   * @param logsForDeadServers
+   *          map of dead server to log file entries
+   * @return total number of log files
    */
-  Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
-    Map<String,Path> result = new HashMap<String,Path>();
-
-    for (String dir : recoveryDirs) {
-      Path recoveryDir = new Path(dir);
-
-      if (fs.exists(recoveryDir)) {
-        for (FileStatus status : fs.listStatus(recoveryDir)) {
-          String name = status.getPath().getName();
-          if (isUUID(name)) {
-            result.put(name, status.getPath());
-          } else {
-            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
-          }
+  private long getCurrent(Map<TServerInstance, Set<String> > logsForDeadServers, Set<TServerInstance> currentServers) throws Exception {
+    Set<String> rootWALs = new HashSet<String>();
+    // Get entries in zookeeper:
+    String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    List<String> children = zoo.getChildren(zpath);
+    for (String child : children) {
+      LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+      rootWALs.add(entry.filename);
+    }
+    long count = 0;
+
+    // get all the WAL markers that are not in zookeeper for dead servers
+    Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
+    rootScanner.setRange(CurrentLogsSection.getRange());
+    Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    metaScanner.setRange(CurrentLogsSection.getRange());
+    Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
+    Text hostAndPort = new Text();
+    Text sessionId = new Text();
+    Text filename = new Text();
+    while (entries.hasNext()) {
+      Entry<Key,Value> entry = entries.next();
+      CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
+      CurrentLogsSection.getPath(entry.getKey(), filename);
+      TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
+      if ((!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(filename)) {
+        Set<String> logs = logsForDeadServers.get(tsi);
+        if (logs == null) {
+          logsForDeadServers.put(tsi, logs = new HashSet<String>());
+        }
+        if (logs.add(new Path(filename.toString()).toString())) {
+          count++;
         }
       }
     }
-    return result;
-  }
 
-  /**
-   * Checks if a string is a valid UUID.
-   *
-   * @param name
-   *          string to check
-   * @return true if string is a UUID
-   */
-  static boolean isUUID(String name) {
-    if (name == null || name.length() != 36) {
-      return false;
-    }
-    try {
-      UUID.fromString(name);
-      return true;
-    } catch (IllegalArgumentException ex) {
-      return false;
-    }
+    return count;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 35005d8..9328225 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -568,7 +568,6 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
         replSpan.stop();
       }
 
-      // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
         GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 9b60c88..8185f23 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,15 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
@@ -186,20 +184,21 @@ public class CloseWriteAheadLogReferences implements Runnable {
     try {
       // TODO Configurable number of threads
       bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
-      bs.setRanges(Collections.singleton(TabletsSection.getRange()));
-      bs.fetchColumnFamily(LogColumnFamily.NAME);
+      bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+      bs.fetchColumnFamily(CurrentLogsSection.COLF);
 
       // For each log key/value in the metadata table
       for (Entry<Key,Value> entry : bs) {
-        // The value may contain multiple WALs
-        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
-        log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
+        if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
+          continue;
+        }
+        Text tpath = new Text();
+        CurrentLogsSection.getPath(entry.getKey(), tpath);
+        String path = new Path(tpath.toString()).toString();
+        log.debug("Found WAL " + path.toString());
 
         // Normalize each log file (using Path) and add it to the set
-        for (String logFile : logEntry.logSet) {
-          referencedWals.add(normalizedWalPaths.get(logFile));
-        }
+        referencedWals.add(normalizedWalPaths.get(path));
       }
     } catch (TableNotFoundException e) {
       // uhhhh
@@ -248,6 +247,8 @@ public class CloseWriteAheadLogReferences implements Runnable {
         MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
         String replFile = replFileText.toString();
         boolean isReferenced = referencedWals.contains(replFile);
+        log.debug("replFile " + replFile);
+        log.debug("referencedWals " + referencedWals);
 
         // We only want to clean up WALs (which is everything but rfiles) and only when
         // metadata doesn't have a reference to the given WAL

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index 5224f28..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class GarbageCollectWriteAheadLogsTest {
-  private static final long BLOCK_SIZE = 64000000L;
-
-  private static final Path DIR_1_PATH = new Path("/dir1");
-  private static final Path DIR_2_PATH = new Path("/dir2");
-  private static final Path DIR_3_PATH = new Path("/dir3");
-  private static final String UUID1 = UUID.randomUUID().toString();
-  private static final String UUID2 = UUID.randomUUID().toString();
-  private static final String UUID3 = UUID.randomUUID().toString();
-
-  private Instance instance;
-  private AccumuloConfiguration systemConfig;
-  private VolumeManager volMgr;
-  private GarbageCollectWriteAheadLogs gcwal;
-  private AccumuloServerContext context;
-  private long modTime;
-
-  @Rule
-  public TestName testName = new TestName();
-
-  @Before
-  public void setUp() throws Exception {
-    SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
-    instance = createMock(Instance.class);
-    expect(instance.getInstanceID()).andReturn("mock").anyTimes();
-    expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
-    expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
-    systemConfig = new ConfigurationCopy(new HashMap<String,String>());
-    volMgr = createMock(VolumeManager.class);
-    ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
-    expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
-    expect(factory.getInstance()).andReturn(instance).anyTimes();
-    expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
-
-    // Just make the SiteConfiguration delegate to our AccumuloConfiguration
-    // Presently, we only need get(Property) and iterator().
-    EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
-      @Override
-      public String answer() {
-        Object[] args = EasyMock.getCurrentArguments();
-        return systemConfig.get((Property) args[0]);
-      }
-    }).anyTimes();
-    EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
-      @Override
-      public Boolean answer() {
-        Object[] args = EasyMock.getCurrentArguments();
-        return systemConfig.getBoolean((Property) args[0]);
-      }
-    }).anyTimes();
-
-    EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
-      @Override
-      public Iterator<Entry<String,String>> answer() {
-        return systemConfig.iterator();
-      }
-    }).anyTimes();
-
-    replay(instance, factory, siteConfig);
-    AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-    modTime = System.currentTimeMillis();
-  }
-
-  @Test
-  public void testGetters() {
-    assertSame(instance, gcwal.getInstance());
-    assertSame(volMgr, gcwal.getVolumeManager());
-    assertFalse(gcwal.isUsingTrash());
-  }
-
-  @Test
-  public void testPathsToStrings() {
-    ArrayList<Path> paths = new ArrayList<Path>();
-    paths.add(new Path(DIR_1_PATH, "file1"));
-    paths.add(DIR_2_PATH);
-    paths.add(new Path(DIR_3_PATH, "file3"));
-    List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
-    int len = 3;
-    assertEquals(len, strings.size());
-    for (int i = 0; i < len; i++) {
-      assertEquals(paths.get(i).toString(), strings.get(i));
-    }
-  }
-
-  @Test
-  public void testMapServersToFiles() {
-    // @formatter:off
-    /*
-     * Test fileToServerMap:
-     * /dir1/server1/uuid1 -> server1 (new-style)
-     * /dir1/uuid2 -> "" (old-style)
-     * /dir3/server3/uuid3 -> server3 (new-style)
-     */
-    // @formatter:on
-    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
-    Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
-    fileToServerMap.put(path1, "server1"); // new-style
-    Path path2 = new Path(DIR_1_PATH, UUID2);
-    fileToServerMap.put(path2, ""); // old-style
-    Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
-    fileToServerMap.put(path3, "server3"); // old-style
-    // @formatter:off
-    /*
-     * Test nameToFileMap:
-     * uuid1 -> /dir1/server1/uuid1
-     * uuid3 -> /dir3/server3/uuid3
-     */
-    // @formatter:on
-    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
-    nameToFileMap.put(UUID1, path1);
-    nameToFileMap.put(UUID3, path3);
-
-    // @formatter:off
-    /*
-     * Expected map:
-     * server1 -> [ /dir1/server1/uuid1 ]
-     * server3 -> [ /dir3/server3/uuid3 ]
-     */
-    // @formatter:on
-    Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
-    assertEquals(2, result.size());
-    ArrayList<Path> list1 = result.get("server1");
-    assertEquals(1, list1.size());
-    assertTrue(list1.contains(path1));
-    ArrayList<Path> list3 = result.get("server3");
-    assertEquals(1, list3.size());
-    assertTrue(list3.contains(path3));
-  }
-
-  private FileStatus makeFileStatus(int size, Path path) {
-    boolean isDir = (size == 0);
-    return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
-  }
-
-  private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
-    expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
-  }
-
-  @Test
-  public void testScanServers_NewStyle() throws Exception {
-    String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
-    // @formatter:off
-    /*
-     * Test directory layout:
-     * /dir1/
-     *   server1/
-     *     uuid1
-     *     file2
-     *   subdir2/
-     * /dir2/ missing
-     * /dir3/
-     *   server3/
-     *     uuid3
-     */
-    // @formatter:on
-    Path serverDir1Path = new Path(DIR_1_PATH, "server1");
-    FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
-    Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
-    FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
-    mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
-    Path path1 = new Path(serverDir1Path, UUID1);
-    FileStatus file1 = makeFileStatus(100, path1);
-    FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
-    mockListStatus(serverDir1Path, file1, file2);
-    mockListStatus(subDir2Path);
-    expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
-    Path serverDir3Path = new Path(DIR_3_PATH, "server3");
-    FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
-    mockListStatus(DIR_3_PATH, serverDir3);
-    Path path3 = new Path(serverDir3Path, UUID3);
-    FileStatus file3 = makeFileStatus(300, path3);
-    mockListStatus(serverDir3Path, file3);
-    replay(volMgr);
-
-    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
-    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
-    int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
-    assertEquals(3, count);
-    // @formatter:off
-    /*
-     * Expected fileToServerMap:
-     * /dir1/server1/uuid1 -> server1
-     * /dir3/server3/uuid3 -> server3
-     */
-    // @formatter:on
-    assertEquals(2, fileToServerMap.size());
-    assertEquals("server1", fileToServerMap.get(path1));
-    assertEquals("server3", fileToServerMap.get(path3));
-    // @formatter:off
-    /*
-     * Expected nameToFileMap:
-     * uuid1 -> /dir1/server1/uuid1
-     * uuid3 -> /dir3/server3/uuid3
-     */
-    // @formatter:on
-    assertEquals(2, nameToFileMap.size());
-    assertEquals(path1, nameToFileMap.get(UUID1));
-    assertEquals(path3, nameToFileMap.get(UUID3));
-  }
-
-  @Test
-  public void testScanServers_OldStyle() throws Exception {
-    // @formatter:off
-    /*
-     * Test directory layout:
-     * /dir1/
-     *   uuid1
-     * /dir3/
-     *   uuid3
-     */
-    // @formatter:on
-    String[] walDirs = new String[] {"/dir1", "/dir3"};
-    Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
-    FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
-    mockListStatus(DIR_1_PATH, serverFile1);
-    Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
-    FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
-    mockListStatus(DIR_3_PATH, serverFile3);
-    replay(volMgr);
-
-    Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
-    Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
-    int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
-    /*
-     * Expect only a single server, the non-server entry for upgrade WALs
-     */
-    assertEquals(1, count);
-    // @formatter:off
-    /*
-     * Expected fileToServerMap:
-     * /dir1/uuid1 -> ""
-     * /dir3/uuid3 -> ""
-     */
-    // @formatter:on
-    assertEquals(2, fileToServerMap.size());
-    assertEquals("", fileToServerMap.get(serverFile1Path));
-    assertEquals("", fileToServerMap.get(serverFile3Path));
-    // @formatter:off
-    /*
-     * Expected nameToFileMap:
-     * uuid1 -> /dir1/uuid1
-     * uuid3 -> /dir3/uuid3
-     */
-    // @formatter:on
-    assertEquals(2, nameToFileMap.size());
-    assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
-    assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
-  }
-
-  @Test
-  public void testGetSortedWALogs() throws Exception {
-    String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
-    // @formatter:off
-    /*
-     * Test directory layout:
-     * /dir1/
-     *   uuid1
-     *   file2
-     * /dir2/ missing
-     * /dir3/
-     *   uuid3
-     */
-    // @formatter:on
-    expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
-    expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
-    expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
-    Path path1 = new Path(DIR_1_PATH, UUID1);
-    FileStatus file1 = makeFileStatus(100, path1);
-    FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
-    mockListStatus(DIR_1_PATH, file1, file2);
-    Path path3 = new Path(DIR_3_PATH, UUID3);
-    FileStatus file3 = makeFileStatus(300, path3);
-    mockListStatus(DIR_3_PATH, file3);
-    replay(volMgr);
-
-    Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
-    // @formatter:off
-    /*
-     * Expected map:
-     * uuid1 -> /dir1/uuid1
-     * uuid3 -> /dir3/uuid3
-     */
-    // @formatter:on
-    assertEquals(2, sortedWalogs.size());
-    assertEquals(path1, sortedWalogs.get(UUID1));
-    assertEquals(path3, sortedWalogs.get(UUID3));
-  }
-
-  @Test
-  public void testIsUUID() {
-    assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
-    assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
-    assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
-    assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
-  }
-
-  // It was easier to do this than get the mocking working for me
-  private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
-
-    private List<Entry<Key,Value>> replData;
-
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
-      this.replData = replData;
-    }
-
-    @Override
-    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
-      return this.replData;
-    }
-  }
-
-  @Test
-  public void replicationEntriesAffectGC() throws Exception {
-    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-    Connector conn = createMock(Connector.class);
-
-    // Write a Status record which should prevent file1 from being deleted
-    LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
-    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
-
-    ReplicationGCWAL replGC = new ReplicationGCWAL(context, volMgr, false, replData);
-
-    replay(conn);
-
-    // Open (not-closed) file must be retained
-    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
-    // No replication data, not needed
-    replData.clear();
-    assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
-
-    // The file is closed but not replicated, must be retained
-    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
-    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
-    // File is closed and fully replicated, can be deleted
-    replData.clear();
-    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
-        ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
-    assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
-  }
-
-  @Test
-  public void removeReplicationEntries() throws Exception {
-    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
-    Instance inst = new MockInstance(testName.getMethodName());
-    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
-    long file1CreateTime = System.currentTimeMillis();
-    long file2CreateTime = file1CreateTime + 50;
-    BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
-    Mutation m = new Mutation("/wals/" + file1);
-    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
-    bw.addMutation(m);
-    m = new Mutation("/wals/" + file2);
-    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
-    bw.addMutation(m);
-
-    // These WALs are potential candidates for deletion from fs
-    Map<String,Path> nameToFileMap = new HashMap<>();
-    nameToFileMap.put(file1, new Path("/wals/" + file1));
-    nameToFileMap.put(file2, new Path("/wals/" + file2));
-
-    Map<String,Path> sortedWALogs = Collections.emptyMap();
-
-    // Make the GCStatus and GcCycleStats
-    GCStatus status = new GCStatus();
-    GcCycleStats cycleStats = new GcCycleStats();
-    status.currentLog = cycleStats;
-
-    // We should iterate over two entries
-    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
-    // We should have noted that two files were still in use
-    Assert.assertEquals(2l, cycleStats.inUse);
-
-    // Both should have been deleted
-    Assert.assertEquals(0, nameToFileMap.size());
-  }
-
-  @Test
-  public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
-    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
-    Instance inst = new MockInstance(testName.getMethodName());
-    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
-    Connector conn = context.getConnector();
-
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
-    long file1CreateTime = System.currentTimeMillis();
-    long file2CreateTime = file1CreateTime + 50;
-    // Write some records to the metadata table, we haven't yet written status records to the replication table
-    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
-    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
-    bw.addMutation(m);
-
-    m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
-    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
-    bw.addMutation(m);
-
-    // These WALs are potential candidates for deletion from fs
-    Map<String,Path> nameToFileMap = new HashMap<>();
-    nameToFileMap.put(file1, new Path("/wals/" + file1));
-    nameToFileMap.put(file2, new Path("/wals/" + file2));
-
-    Map<String,Path> sortedWALogs = Collections.emptyMap();
-
-    // Make the GCStatus and GcCycleStats objects
-    GCStatus status = new GCStatus();
-    GcCycleStats cycleStats = new GcCycleStats();
-    status.currentLog = cycleStats;
-
-    // We should iterate over two entries
-    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
-    // We should have noted that two files were still in use
-    Assert.assertEquals(2l, cycleStats.inUse);
-
-    // Both should have been deleted
-    Assert.assertEquals(0, nameToFileMap.size());
-  }
-
-  @Test
-  public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
-    Instance inst = new MockInstance(testName.getMethodName());
-    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-    Connector conn = context.getConnector();
-
-    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
-    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
-    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-    bw.addMutation(m);
-    bw.close();
-
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
-    Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
-    Entry<Key,Value> entry = Iterables.getOnlyElement(data);
-
-    Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
-  }
-
-  @Test
-  public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
-    Instance inst = new MockInstance(testName.getMethodName());
-    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-    Connector conn = context.getConnector();
-
-    long walCreateTime = System.currentTimeMillis();
-    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
-    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
-    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
-    bw.addMutation(m);
-    bw.close();
-
-    bw = ReplicationTable.getBatchWriter(conn);
-    m = new Mutation(wal);
-    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
-    bw.addMutation(m);
-    bw.close();
-
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
-    Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
-    Map<Key,Value> data = new HashMap<>();
-    for (Entry<Key,Value> e : iter) {
-      data.put(e.getKey(), e.getValue());
-    }
-
-    Assert.assertEquals(2, data.size());
-
-    // Should get one element from each table (metadata and replication)
-    for (Key k : data.keySet()) {
-      String row = k.getRow().toString();
-      if (row.startsWith(ReplicationSection.getRowPrefix())) {
-        Assert.assertTrue(row.endsWith(wal));
-      } else {
-        Assert.assertEquals(wal, row);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index ba68890..f47f14b 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -46,20 +46,17 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -130,22 +127,16 @@ public class CloseWriteAheadLogReferencesTest {
   public void findOneWalFromMetadata() throws Exception {
     Connector conn = createMock(Connector.class);
     BatchScanner bs = createMock(BatchScanner.class);
-
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    LogEntry logEntry = new LogEntry();
-    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
-    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 1;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+    String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+    data.add(entry("tserver1:9997[1234567890]", file));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(LogColumnFamily.NAME);
+    bs.fetchColumnFamily(CurrentLogsSection.COLF);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -163,54 +154,12 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Validate
     Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
-
-    verify(conn, bs);
-  }
-
-  @Test
-  public void findManyWalFromSingleMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    LogEntry logEntry = new LogEntry();
-    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
-    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 1;
-    // Multiple DFSLoggers
-    logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(LogColumnFamily.NAME);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(logEntry.logSet, wals);
+    Assert.assertEquals(Collections.singleton(file), wals);
 
     verify(conn, bs);
   }
 
+  // This is a silly test now
   @Test
   public void findManyRefsToSingleWalFromMetadata() throws Exception {
     Connector conn = createMock(Connector.class);
@@ -220,31 +169,14 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    LogEntry logEntry = new LogEntry();
-    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
-    logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 1;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 2;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 3;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+    String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
+    data.add(entry("tserver1:9997[0123456789]", filename));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(LogColumnFamily.NAME);
+    bs.fetchColumnFamily(CurrentLogsSection.COLF);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -262,7 +194,7 @@ public class CloseWriteAheadLogReferencesTest {
 
     // Validate
     Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+    Assert.assertEquals(Collections.singleton(filename), wals);
 
     verify(conn, bs);
   }
@@ -272,59 +204,22 @@ public class CloseWriteAheadLogReferencesTest {
     Connector conn = createMock(Connector.class);
     BatchScanner bs = createMock(BatchScanner.class);
 
-    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
-        + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
+    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+    String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
+    String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
 
     // Fake out some data
     final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    LogEntry logEntry = new LogEntry();
-    logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
-    logEntry.filename = file1;
-    logEntry.server = "tserver1";
-    logEntry.tabletId = 1;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("5"), null, null);
-    logEntry.tabletId = 2;
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
-    logEntry.filename = file2;
-    logEntry.server = "tserver2";
-    logEntry.tabletId = 3;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
-    logEntry.tabletId = 4;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
-    logEntry.filename = file3;
-    logEntry.server = "tserver3";
-    logEntry.tabletId = 5;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
-    logEntry.server = "tserver3";
-    logEntry.tabletId = 7;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
-    logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
-    logEntry.server = "tserver3";
-    logEntry.tabletId = 15;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
-    data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+    data.add(entry("tserver1:9997[1234567890]", file1));
+    data.add(entry("tserver2:9997[1234567891]", file2));
+    data.add(entry("tserver3:9997[1234567891]", file3));
 
     // Get a batchscanner, scan the tablets section, fetch only the logs
     expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
     expectLastCall().once();
-    bs.fetchColumnFamily(LogColumnFamily.NAME);
+    bs.fetchColumnFamily(CurrentLogsSection.COLF);
     expectLastCall().once();
     expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
 
@@ -347,6 +242,11 @@ public class CloseWriteAheadLogReferencesTest {
     verify(conn, bs);
   }
 
+  private static Entry<Key,Value> entry(String session, String file) {
+    Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
+    return Maps.immutableEntry(key, new Value());
+  }
+
   @Test
   public void unusedWalsAreClosed() throws Exception {
     Set<String> wals = Collections.emptySet();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 5e6dcfb..2434487 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -421,6 +421,9 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
           perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
         }
         perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
+
+        // add the currlog location for root tablet current logs
+        zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
         haveUpgradedZooKeeper = true;
       } catch (Exception ex) {
         log.fatal("Error performing upgrade", ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 3809a29..43939d2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -162,7 +163,8 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
           scanner.setRange(MetadataSchema.TabletsSection.getRange());
         } else {
           scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
-          scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
+          Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
+          scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
         }
         TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
         TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);


Mime
View raw message