accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [02/34] accumulo git commit: ACCUMULO-3625 use log markers against tservers, not tablets
Date Fri, 24 Apr 2015 23:20:48 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 755e322..edea93f 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -161,6 +161,7 @@ class TabletGroupWatcher extends Daemon {
         List<Assignment> assigned = new ArrayList<Assignment>();
         List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
         Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+        Map<TServerInstance, List<String>> logsForDeadServers = new TreeMap<>();
 
         MasterState masterState = master.getMasterState();
         int[] counts = new int[TabletState.values().length];
@@ -173,6 +174,7 @@ class TabletGroupWatcher extends Daemon {
           if (tls == null) {
             continue;
           }
+          Master.log.debug(store.name() + " location State: " + tls);
           // ignore entries for tables that do not exist in zookeeper
           if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
             continue;
@@ -182,7 +184,7 @@ class TabletGroupWatcher extends Daemon {
 
           // Don't overwhelm the tablet servers with work
           if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
-            flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+            flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
             assignments.clear();
             assigned.clear();
             assignedToDeadServers.clear();
@@ -237,7 +239,7 @@ class TabletGroupWatcher extends Daemon {
                 assignedToDeadServers.add(tls);
                 if (server.equals(this.master.migrations.get(tls.extent)))
                   this.master.migrations.remove(tls.extent);
-                // log.info("Current servers " + currentTServers.keySet());
+                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
                 break;
               case UNASSIGNED:
                 // maybe it's a finishing migration
@@ -266,7 +268,7 @@ class TabletGroupWatcher extends Daemon {
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
                 assignedToDeadServers.add(tls);
-                // log.info("Current servers " + currentTServers.keySet());
+                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
                 break;
               case HOSTED:
                 TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -285,7 +287,8 @@ class TabletGroupWatcher extends Daemon {
           counts[state.ordinal()]++;
         }
 
-        flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+        flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
+        store.markLogsAsUnused(master, logsForDeadServers);
 
         // provide stats after flushing changes to avoid race conditions w/ delete table
         stats.end(masterState);
@@ -723,12 +726,19 @@ class TabletGroupWatcher extends Daemon {
     }
   }
 
-  private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
-      List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+  private void flushChanges(
+      SortedMap<TServerInstance,TabletServerStatus> currentTServers,
+      List<Assignment> assignments,
+      List<Assignment> assigned,
+      List<TabletLocationState> assignedToDeadServers,
+      Map<TServerInstance, List<String>> logsForDeadServers,
+      Map<KeyExtent,TServerInstance> unassigned)
+          throws DistributedStoreException, TException {
     if (!assignedToDeadServers.isEmpty()) {
       int maxServersToShow = min(assignedToDeadServers.size(), 100);
       Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
-      store.unassign(assignedToDeadServers);
+      Master.log.debug("logs for dead servers: " + logsForDeadServers);
+      store.unassign(assignedToDeadServers, logsForDeadServers);
       this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index b8e0b40..bc4c64f 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,6 +107,7 @@ public class WorkMaker {
         // Don't create the record if we have nothing to do.
         // TODO put this into a filter on serverside
         if (!shouldCreateWork(status)) {
+          log.info("Not creating work: " + status.toString());
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index 8cdaf9f..895717a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.cli.ClientOpts;
@@ -186,7 +187,7 @@ public class MergeStats {
     Text tableId = extent.getTableId();
     Text first = KeyExtent.getMetadataEntry(tableId, start);
     Range range = new Range(first, false, null, true);
-    scanner.setRange(range);
+    scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
     KeyExtent prevExtent = null;
 
     log.debug("Scanning range " + range);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index b0240f1..b39dcb8 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -186,7 +186,7 @@ public class TestMergeState {
     // take it offline
     m = tablet.getPrevRowUpdateMutation();
     Collection<Collection<String>> walogs = Collections.emptyList();
-    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
+    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
 
     // now we can split
     stats = scan(state, metaDataStateStore);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index abceae4..db16bcb 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -181,7 +181,7 @@ public class RootTabletStateStoreTest {
     } catch (BadLocationStateException e) {
       fail("Unexpected error " + e);
     }
-    tstore.unassign(Collections.singletonList(assigned));
+    tstore.unassign(Collections.singletonList(assigned), null);
     count = 0;
     for (TabletLocationState location : tstore) {
       assertEquals(location.extent, root);
@@ -209,7 +209,7 @@ public class RootTabletStateStoreTest {
       fail("Unexpected error " + e);
     }
     try {
-      tstore.unassign(Collections.singletonList(broken));
+      tstore.unassign(Collections.singletonList(broken), null);
       Assert.fail("should not get here");
     } catch (IllegalArgumentException ex) {}
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
index 7d09fe3..b05032e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
@@ -34,7 +34,8 @@ public class GarbageCollectionLogger {
   private long gcTimeIncreasedCount = 0;
   private static long lastMemoryCheckTime = 0;
 
-  public GarbageCollectionLogger() {}
+  public GarbageCollectionLogger() {
+  }
 
   public synchronized void logGCInfo(AccumuloConfiguration conf) {
     final long now = System.currentTimeMillis();
@@ -96,7 +97,7 @@ public class GarbageCollectionLogger {
     final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
       final long diff = now - lastMemoryCheckTime;
-      if (diff > keepAliveTimeout) {
+      if (diff > keepAliveTimeout + 1000) {
         log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check",
             keepAliveTimeout / 1000., diff / 1000.));
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java
new file mode 100644
index 0000000..1e82393
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public enum TabletLevel {
+  ROOT,
+  META,
+  NORMAL;
+
+  public static TabletLevel getLevel(KeyExtent extent) {
+    if (!extent.isMeta())
+      return NORMAL;
+    if (extent.isRootTablet())
+      return ROOT;
+    return META;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 662ee31..b12fccc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.tserver;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
@@ -29,6 +28,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,6 +44,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -146,7 +147,6 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.GarbageCollectionLogger;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -1438,6 +1438,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       }
     }
 
+
     @Override
     public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) {
 
@@ -1498,6 +1499,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       final AssignmentHandler ah = new AssignmentHandler(extent);
       // final Runnable ah = new LoggingRunnable(log, );
       // Root tablet assignment must take place immediately
+
       if (extent.isRootTablet()) {
         new Daemon("Root Tablet Assignment") {
           @Override
@@ -1690,66 +1692,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     }
 
     @Override
-    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
-      String myname = getClientAddressString();
-      myname = myname.replace(':', '+');
-      Set<String> loggers = new HashSet<String>();
-      logger.getLogFiles(loggers);
-      Set<String> loggerUUIDs = new HashSet<String>();
-      for (String logger : loggers)
-        loggerUUIDs.add(new Path(logger).getName());
-
-      nextFile: for (String filename : filenames) {
-        String uuid = new Path(filename).getName();
-        // skip any log we're currently using
-        if (loggerUUIDs.contains(uuid))
-          continue nextFile;
-
-        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
-        synchronized (onlineTablets) {
-          onlineTabletsCopy.addAll(onlineTablets.values());
-        }
-        for (Tablet tablet : onlineTabletsCopy) {
-          for (String current : tablet.getCurrentLogFiles()) {
-            if (current.contains(uuid)) {
-              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
-              continue nextFile;
-            }
-          }
-        }
-
-        try {
-          Path source = new Path(filename);
-          if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
-            Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
-            fs.mkdirs(walogArchive);
-            Path dest = new Path(walogArchive, source.getName());
-            log.info("Archiving walog " + source + " to " + dest);
-            if (!fs.rename(source, dest))
-              log.error("rename is unsuccessful");
-          } else {
-            log.info("Deleting walog " + filename);
-            Path sourcePath = new Path(filename);
-            if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath))
-                && !fs.deleteRecursively(sourcePath))
-              log.warn("Failed to delete walog " + source);
-            for (String recovery : ServerConstants.getRecoveryDirs()) {
-              Path recoveryPath = new Path(recovery, source.getName());
-              try {
-                if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
-                  log.info("Deleted any recovery log " + filename);
-              } catch (FileNotFoundException ex) {
-                // ignore
-              }
-            }
-          }
-        } catch (IOException e) {
-          log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
-        }
-      }
-    }
-
-    @Override
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       try {
         checkPermission(credentials, null, "getActiveCompactions");
@@ -1770,14 +1712,13 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
     @Override
     public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
-      Set<String> logs = new HashSet<String>();
-      logger.getLogFiles(logs);
-      return new ArrayList<String>(logs);
+      String log = logger.getLogFile();
+      return Collections.singletonList(log);
     }
   }
 
   private class SplitRunner implements Runnable {
-    private Tablet tablet;
+    private final Tablet tablet;
 
     public SplitRunner(Tablet tablet) {
       this.tablet = tablet;
@@ -2031,7 +1972,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
           log.error("Unexpected error ", e);
         }
         log.debug("Unassigning " + tls);
-        TabletStateStore.unassign(TabletServer.this, tls);
+        TabletStateStore.unassign(TabletServer.this, tls, null);
       } catch (DistributedStoreException ex) {
         log.warn("Unable to update storage", ex);
       } catch (KeeperException e) {
@@ -2238,29 +2179,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     }
   }
 
-  public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
-    if (!this.onlineTablets.containsKey(extent)) {
-      log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline");
-      // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs,
-      // if it doesn't, we'll need to do the same recovery with the old files.
-      return;
-    }
-
-    log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
-    long now = RelativeTime.currentTimeMillis();
-    List<String> logSet = new ArrayList<String>();
-    for (DfsLogger log : logs)
-      logSet.add(log.getFileName());
-    LogEntry entry = new LogEntry();
-    entry.extent = extent;
-    entry.tabletId = id;
-    entry.timestamp = now;
-    entry.server = logs.get(0).getLogger();
-    entry.filename = logs.get(0).getFileName();
-    entry.logSet = logSet;
-    MetadataTableUtil.addLogEntry(this, entry, getLock());
-  }
-
   private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName)
       throws UnknownHostException {
     Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -2968,6 +2886,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions.incrementAndGet();
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
+    removeUnusedWALs();
   }
 
   public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException {
@@ -2986,14 +2905,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     });
     for (LogEntry entry : sorted) {
       Path recovery = null;
-      for (String log : entry.logSet) {
-        Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
-        finished = SortedLogState.getFinishedMarkerPath(finished);
-        TabletServer.log.info("Looking for " + finished);
-        if (fs.exists(finished)) {
-          recovery = finished.getParent();
-          break;
-        }
+      Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename));
+      finished = SortedLogState.getFinishedMarkerPath(finished);
+      TabletServer.log.info("Looking for " + finished);
+      if (fs.exists(finished)) {
+        recovery = finished.getParent();
       }
       if (recovery == null)
         throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry);
@@ -3035,7 +2951,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   }
 
   public Collection<Tablet> getOnlineTablets() {
-    return Collections.unmodifiableCollection(onlineTablets.values());
+    synchronized (onlineTablets) {
+      return new ArrayList<Tablet>(onlineTablets.values());
+    }
   }
 
   public VolumeManager getFileSystem() {
@@ -3061,4 +2979,54 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   public SecurityOperation getSecurityOperation() {
     return security;
   }
+
+  // avoid unnecessary redundant markings to meta
+  ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+
+  // remove any meta entries after a rolled log is no longer referenced
+  Set<DfsLogger> closedLogs = new HashSet<>();
+
+  private void removeUnusedWALs() {
+    Set<DfsLogger> candidates;
+    synchronized (closedLogs) {
+      candidates = new HashSet<>(closedLogs);
+    }
+    for (Tablet tablet : getOnlineTablets()) {
+      candidates.removeAll(tablet.getCurrentLogFiles());
+    }
+    try {
+      Set<String> filenames = new HashSet<>();
+      for (DfsLogger candidate : candidates) {
+        filenames.add(candidate.getFileName());
+      }
+      MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
+      synchronized (closedLogs) {
+        closedLogs.removeAll(candidates);
+      }
+    } catch (AccumuloException ex) {
+      log.info(ex.toString(), ex);
+    }
+  }
+
+  public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) {
+    TabletLevel level = TabletLevel.getLevel(extent);
+    synchronized (level) {
+      EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+      if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
+        log.info("Writing log marker for level " + level + " " + copy.getFileName());
+        MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getFileName(), extent);
+        if (set != null) {
+          set.add(level);
+        }
+      }
+    }
+  }
+
+  public void walogClosed(DfsLogger currentLog) {
+    metadataTableLogs.remove(currentLog);
+    synchronized (closedLogs) {
+      closedLogs.add(currentLog);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 5acf5eb..f8bcfbc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -74,7 +74,7 @@ import com.google.common.base.Optional;
  * Wrap a connection to a logger.
  *
  */
-public class DfsLogger {
+public class DfsLogger implements Comparable<DfsLogger> {
   public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
   public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
@@ -621,4 +621,9 @@ public class DfsLogger {
     return Joiner.on(":").join(parts[parts.length - 2].split("[+]"));
   }
 
+  @Override
+  public int compareTo(DfsLogger o) {
+    return getFileName().compareTo(o.getFileName());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 405ec70..efdbbf4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -179,7 +179,7 @@ public class SortedLogRecovery {
     // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
     // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
     while (reader.next(key, value)) {
-      // LogReader.printEntry(entry);
+      // log.debug("Event " + key.event + " tablet " + key.tablet);
       if (key.event != DEFINE_TABLET)
         break;
       if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
@@ -208,7 +208,7 @@ public class SortedLogRecovery {
         if (lastStartToFinish.compactionStatus == Status.INITIAL)
           lastStartToFinish.compactionStatus = Status.COMPLETE;
         if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
         lastStartToFinish.update(fileno, key.seq);
 
         // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
@@ -217,7 +217,7 @@ public class SortedLogRecovery {
           lastStartToFinish.update(-1);
       } else if (key.event == COMPACTION_FINISH) {
         if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
         if (lastStartToFinish.compactionStatus == Status.INITIAL)
           lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
         else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
@@ -248,8 +248,6 @@ public class SortedLogRecovery {
         break;
       if (key.tid != tid)
         break;
-      // log.info("Replaying " + key);
-      // log.info(value);
       if (key.event == MUTATION) {
         mr.receive(value.mutations.get(0));
       } else if (key.event == MANY_MUTATIONS) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 5c3fc2d..6455726 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -29,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -37,7 +37,6 @@ import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.TableConfiguration;
@@ -49,6 +48,7 @@ import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
 import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -71,20 +71,22 @@ public class TabletServerLogger {
 
   private final TabletServer tserver;
 
-  // The current log set: always updated to a new set with every change of loggers
-  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
+  // The current logger
+  private DfsLogger currentLog = null;
+  private DfsLogger nextLog = null;
+  private Thread nextLogThread = null;
 
-  // The current generation of logSet.
-  // Because multiple threads can be using a log set at one time, a log
+  // The current generation of logs.
+  // Because multiple threads can be using a log at one time, a log
   // failure is likely to affect multiple threads, who will all attempt to
-  // create a new logSet. This will cause many unnecessary updates to the
+  // create a new log. This will cause many unnecessary updates to the
   // metadata table.
   // We'll use this generational counter to determine if another thread has
-  // already fetched a new logSet.
-  private AtomicInteger logSetId = new AtomicInteger();
+  // already fetched a new log.
+  private final AtomicInteger logId = new AtomicInteger();
 
   // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
-  private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
 
   private final AtomicInteger seqGen = new AtomicInteger();
 
@@ -145,61 +147,66 @@ public class TabletServerLogger {
     this.flushCounter = flushCounter;
   }
 
-  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
-    final int[] result = {-1};
-    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+  private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
+    final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>();
+    testLockAndRun(logIdLock, new TestCallWithWriteLock() {
       @Override
       boolean test() {
-        copy.clear();
-        copy.addAll(loggers);
-        if (!loggers.isEmpty())
-          result[0] = logSetId.get();
-        return loggers.isEmpty();
+        result.set(currentLog);
+        if (currentLog != null)
+          logIdOut.set(logId.get());
+        return currentLog == null;
       }
 
       @Override
       void withWriteLock() throws IOException {
         try {
-          createLoggers();
-          copy.clear();
-          copy.addAll(loggers);
-          if (copy.size() > 0)
-            result[0] = logSetId.get();
+          createLogger();
+          result.set(currentLog);
+          if (currentLog != null)
+            logIdOut.set(logId.get());
           else
-            result[0] = -1;
+            logIdOut.set(-1);
         } catch (IOException e) {
           log.error("Unable to create loggers", e);
         }
       }
     });
-    return result[0];
+    return result.get();
   }
 
-  public void getLogFiles(Set<String> loggersOut) {
-    logSetLock.readLock().lock();
+  public String getLogFile() {
+    logIdLock.readLock().lock();
     try {
-      for (DfsLogger logger : loggers) {
-        loggersOut.add(logger.getFileName());
-      }
+      return currentLog.getFileName();
     } finally {
-      logSetLock.readLock().unlock();
+      logIdLock.readLock().unlock();
     }
   }
 
-  synchronized private void createLoggers() throws IOException {
-    if (!logSetLock.isWriteLockedByCurrentThread()) {
+  synchronized private void createLogger() throws IOException {
+    if (!logIdLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("createLoggers should be called with write lock held!");
     }
 
-    if (loggers.size() != 0) {
-      throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
+    if (currentLog != null) {
+      throw new IllegalStateException("createLoggers should not be called when current log is set");
     }
 
     try {
-      DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
-      alog.open(tserver.getClientAddressString());
-      loggers.add(alog);
-      logSetId.incrementAndGet();
+      if (nextLog != null) {
+        log.info("Using next log " + nextLog.getFileName());
+        currentLog = nextLog;
+        nextLog = null;
+      } else {
+        DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+        alog.open(tserver.getClientAddressString());
+        currentLog = alog;
+      }
+      if (nextLog == null) {
+        createNextLog();
+      }
+      logId.incrementAndGet();
       return;
     } catch (Exception t) {
       walErrors.put(System.currentTimeMillis(), "");
@@ -210,30 +217,58 @@ public class TabletServerLogger {
     }
   }
 
+  private void createNextLog() {
+    if (nextLogThread == null) {
+      nextLogThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            log.info("Creating next WAL");
+            this.setName("Creating next WAL");
+            DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+            alog.open(tserver.getClientAddressString());
+            for (Tablet tablet : tserver.getOnlineTablets()) {
+              // TODO
+              tserver.addLoggersToMetadata(alog, tablet.getExtent());
+            }
+            nextLog = alog;
+
+            log.info("Created next WAL " + alog.getFileName());
+          } catch (Exception t) {
+            log.error(t, t);
+          } finally {
+            nextLogThread = null;
+          }
+        }
+      };
+      nextLogThread.start();
+    }
+  }
+
   public void resetLoggers() throws IOException {
-    logSetLock.writeLock().lock();
+    logIdLock.writeLock().lock();
     try {
       close();
     } finally {
-      logSetLock.writeLock().unlock();
+      logIdLock.writeLock().unlock();
     }
   }
 
   synchronized private void close() throws IOException {
-    if (!logSetLock.isWriteLockedByCurrentThread()) {
+    if (!logIdLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      for (DfsLogger logger : loggers) {
-        try {
-          logger.close();
-        } catch (DfsLogger.LogClosedException ex) {
-          // ignore
-        } catch (Throwable ex) {
-          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex);
-        }
+      try {
+        currentLog.close();
+      } catch (DfsLogger.LogClosedException ex) {
+        // ignore
+      } catch (Throwable ex) {
+        log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
+      } finally {
+        this.tserver.walogClosed(currentLog);
       }
-      loggers.clear();
+      currentLog = null;
       logSizeEstimate.set(0);
     } catch (Throwable t) {
       throw new IOException(t);
@@ -251,7 +286,7 @@ public class TabletServerLogger {
 
   private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
     // Work very hard not to lock this during calls to the outside world
-    int currentLogSet = logSetId.get();
+    int currentLogId = logId.get();
 
     int seq = -1;
     int attempt = 1;
@@ -259,20 +294,22 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
-        currentLogSet = initializeLoggers(copy);
+        DfsLogger copy = null;
+        AtomicInteger currentId = new AtomicInteger(-1);
+        copy = initializeLoggers(currentId);
+        currentLogId = currentId.get();
 
         // add the logger to the log set for the memory in the tablet,
         // update the metadata table if we've never used this tablet
 
-        if (currentLogSet == logSetId.get()) {
+        if (currentLogId == logId.get()) {
           for (CommitSession commitSession : sessions) {
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
                 defineTablet(commitSession);
-                if (currentLogSet == logSetId.get())
-                  tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
+                if (currentLogId == logId.get())
+                  tserver.addLoggersToMetadata(copy, commitSession.getExtent());
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }
@@ -280,39 +317,29 @@ public class TabletServerLogger {
               // Need to release
               KeyExtent extent = commitSession.getExtent();
               if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
-                Set<String> logs = new HashSet<String>();
-                for (DfsLogger logger : copy) {
-                  logs.add(logger.getFileName());
-                }
-                Status status = StatusUtil.fileCreated(System.currentTimeMillis());
-                log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs);
+                Status status = Status.newBuilder().setInfiniteEnd(true).setCreatedTime(System.currentTimeMillis()).build();
+                log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName());
                 // Got some new WALs, note this in the metadata table
-                ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status);
+                ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status);
               }
             }
           }
         }
 
         // Make sure that the logs haven't changed out from underneath our copy
-        if (currentLogSet == logSetId.get()) {
+        if (currentLogId == logId.get()) {
 
           // write the mutation to the logs
           seq = seqGen.incrementAndGet();
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
-          ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
-          for (DfsLogger wal : copy) {
-            LoggerOperation lop = writer.write(wal, seq);
-            if (lop != null)
-              queuedOperations.add(lop);
-          }
-
-          for (LoggerOperation lop : queuedOperations) {
+          LoggerOperation lop = writer.write(copy, seq);
+          if (lop != null) {
             lop.await();
           }
 
           // double-check: did the log set change?
-          success = (currentLogSet == logSetId.get());
+          success = (currentLogId == logId.get());
         }
       } catch (DfsLogger.LogClosedException ex) {
         log.debug("Logs closed while writing, retrying " + attempt);
@@ -327,13 +354,13 @@ public class TabletServerLogger {
       // Some sort of write failure occurred. Grab the write lock and reset the logs.
       // But since multiple threads will attempt it, only attempt the reset when
       // the logs haven't changed.
-      final int finalCurrent = currentLogSet;
+      final int finalCurrent = currentLogId;
       if (!success) {
-        testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+        testLockAndRun(logIdLock, new TestCallWithWriteLock() {
 
           @Override
           boolean test() {
-            return finalCurrent == logSetId.get();
+            return finalCurrent == logId.get();
           }
 
           @Override
@@ -346,7 +373,7 @@ public class TabletServerLogger {
     }
     // if the log gets too big, reset it .. grab the write lock first
     logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
-    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
+    testLockAndRun(logIdLock, new TestCallWithWriteLock() {
       @Override
       boolean test() {
         return logSizeEstimate.get() > maxSize;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index 17290c0..70b1922 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.accumulo.core.data.KeyExtent;
@@ -85,7 +84,7 @@ public class CommitSession {
     return committer;
   }
 
-  public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+  public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) {
     return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 8ba8128..b05f0c6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -424,7 +424,9 @@ class DatafileManager {
         if (log.isDebugEnabled()) {
           log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);
         }
-        ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
+        for (String logFile : logFileOnly) {
+          ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength());
+        }
       }
     } finally {
       tablet.finishClearingUnusedLogs();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f2d5375..fa1ae86 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -37,6 +37,7 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -198,7 +199,7 @@ public class Tablet implements TabletCommitter {
   }
 
   // stores info about user initiated major compaction that is waiting on a minor compaction to finish
-  private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
+  private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
 
   static enum CompactionState {
     WAITING_TO_START, IN_PROGRESS
@@ -628,8 +629,8 @@ public class Tablet implements TabletCommitter {
           // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress.
           Status status = StatusUtil.openWithUnknownLength();
           for (LogEntry logEntry : logEntries) {
-            log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
-            ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status);
+            log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status));
+            ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status);
           }
         }
 
@@ -641,11 +642,9 @@ public class Tablet implements TabletCommitter {
         }
       }
       // make some closed references that represent the recovered logs
-      currentLogs = new HashSet<DfsLogger>();
+      currentLogs = new ConcurrentSkipListSet<DfsLogger>();
       for (LogEntry logEntry : logEntries) {
-        for (String log : logEntry.logSet) {
-          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
-        }
+        currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString()));
       }
 
       log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries()
@@ -937,6 +936,8 @@ public class Tablet implements TabletCommitter {
     long count = 0;
 
     try {
+      String oldName = Thread.currentThread().getName();
+      Thread.currentThread().setName("Minor compacting " + this.extent);
       Span span = Trace.start("write");
       CompactionStats stats;
       try {
@@ -957,6 +958,7 @@ public class Tablet implements TabletCommitter {
             commitSession, flushId);
       } finally {
         span.stop();
+        Thread.currentThread().setName(oldName);
       }
       return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
     } catch (Exception e) {
@@ -991,7 +993,7 @@ public class Tablet implements TabletCommitter {
   private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
     CommitSession oldCommitSession = getTabletMemory().prepareForMinC();
     otherLogs = currentLogs;
-    currentLogs = new HashSet<DfsLogger>();
+    currentLogs = new ConcurrentSkipListSet<DfsLogger>();
 
     FileRef mergeFile = null;
     if (mincReason != MinorCompactionReason.RECOVERY) {
@@ -2373,14 +2375,10 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
+  private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>();
 
-  public synchronized Set<String> getCurrentLogFiles() {
-    Set<String> result = new HashSet<String>();
-    for (DfsLogger log : currentLogs) {
-      result.add(log.getFileName());
-    }
-    return result;
+  public Set<DfsLogger> getCurrentLogFiles() {
+    return new HashSet<DfsLogger>(currentLogs);
   }
 
   Set<String> beginClearingUnusedLogs() {
@@ -2439,12 +2437,12 @@ public class Tablet implements TabletCommitter {
   // this lock is basically used to synchronize writing of log info to metadata
   private final ReentrantLock logLock = new ReentrantLock();
 
-  public synchronized int getLogCount() {
+  public int getLogCount() {
     return currentLogs.size();
   }
 
   @Override
-  public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
+  public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
 
     boolean releaseLock = true;
 
@@ -2481,28 +2479,26 @@ public class Tablet implements TabletCommitter {
 
         int numAdded = 0;
         int numContained = 0;
-        for (DfsLogger logger : more) {
-          if (addToOther) {
-            if (otherLogs.add(logger))
-              numAdded++;
+        if (addToOther) {
+          if (otherLogs.add(more))
+            numAdded++;
 
-            if (currentLogs.contains(logger))
-              numContained++;
-          } else {
-            if (currentLogs.add(logger))
-              numAdded++;
+          if (currentLogs.contains(more))
+            numContained++;
+        } else {
+          if (currentLogs.add(more))
+            numAdded++;
 
-            if (otherLogs.contains(logger))
-              numContained++;
-          }
+          if (otherLogs.contains(more))
+            numContained++;
         }
 
-        if (numAdded > 0 && numAdded != more.size()) {
+        if (numAdded > 0 && numAdded != 1) {
           // expect to add all or none
           throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs);
         }
 
-        if (numContained > 0 && numContained != more.size()) {
+        if (numContained > 0 && numContained != 1) {
           // expect to contain all or none
           throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs);
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index b56d0af..39bde5c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.accumulo.core.client.Durability;
@@ -35,7 +34,7 @@ public interface TabletCommitter {
 
   void commit(CommitSession commitSession, List<Mutation> mutations);
 
-  boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
+  boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish);
 
   void finishUpdatingLogsUsed();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index b429607..b8a60c1 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -202,9 +202,6 @@ public class NullTserver {
     }
 
     @Override
-    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {}
-
-    @Override
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveCompaction>();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
index 45799c4..de8ebc8 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -58,6 +58,11 @@ import com.google.common.net.HostAndPort;
 public class ProxyDurabilityIT extends ConfigurableMacIT {
 
   @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
index 9cae889..93ca138 100644
--- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
@@ -53,7 +53,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT {
 
   @Override
   public int defaultTimeoutSeconds() {
-    return 60;
+    return 120;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
index f793925..8703f18 100644
--- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -20,25 +20,33 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class BalanceIT extends ConfigurableMacIT {
+public class BalanceIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
 
-  @Test(timeout = 60 * 1000)
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
   public void testBalance() throws Exception {
     String tableName = getUniqueNames(1)[0];
     Connector c = getConnector();
-    System.out.println("Creating table");
+    log.info("Creating table");
     c.tableOperations().create(tableName);
     SortedSet<Text> splits = new TreeSet<Text>();
     for (int i = 0; i < 10; i++) {
       splits.add(new Text("" + i));
     }
-    System.out.println("Adding splits");
+    log.info("Adding splits");
     c.tableOperations().addSplits(tableName, splits);
-    System.out.println("Waiting for balance");
+    log.info("Waiting for balance");
     c.instanceOperations().waitForBalance();
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index bdfbd13..c113a08 100644
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -125,6 +125,7 @@ public class CleanWalIT extends AccumuloClusterIT {
   private int countLogs(String tableName, Connector conn) throws TableNotFoundException {
     Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+    scanner.setRange(MetadataSchema.TabletsSection.getRange());
     int count = 0;
     for (Entry<Key,Value> entry : scanner) {
       log.debug("Saw " + entry.getKey() + "=" + entry.getValue());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
index b68870d..7bd1842 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1251,6 +1251,7 @@ public class ConditionalWriterIT extends AccumuloClusterIT {
     conn.tableOperations().create(tableName);
 
     DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
+    UtilWaitThread.sleep(1000);
     Span root = Trace.on("traceTest");
     ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index 7f2f6f9..685d71a 100644
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -17,7 +17,6 @@
 package org.apache.accumulo.test;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -25,6 +24,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -125,11 +125,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
     String tableId = conn.tableOperations().tableIdMap().get(tableName);
     Assert.assertNotNull("Table ID was null", tableId);
 
-    LogEntry logEntry = new LogEntry();
-    logEntry.server = "127.0.0.1:12345";
-    logEntry.filename = emptyWalog.toURI().toString();
-    logEntry.tabletId = 10;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
+    LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString());
 
     log.info("Taking {} offline", tableName);
     conn.tableOperations().offline(tableName, true);
@@ -184,11 +180,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
     String tableId = conn.tableOperations().tableIdMap().get(tableName);
     Assert.assertNotNull("Table ID was null", tableId);
 
-    LogEntry logEntry = new LogEntry();
-    logEntry.server = "127.0.0.1:12345";
-    logEntry.filename = partialHeaderWalog.toURI().toString();
-    logEntry.tabletId = 10;
-    logEntry.logSet = Collections.singleton(logEntry.filename);
+    LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString());
 
     log.info("Taking {} offline", tableName);
     conn.tableOperations().offline(tableName, true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
deleted file mode 100644
index 10b8810..0000000
--- a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.cluster.ClusterControl;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.FunctionalTestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-// Verify that a recovery of a log without any mutations removes the log reference
-public class NoMutationRecoveryIT extends AccumuloClusterIT {
-
-  @Override
-  public int defaultTimeoutSeconds() {
-    return 10 * 60;
-  }
-
-  @Override
-  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.useMiniDFS(true);
-    cfg.setNumTservers(1);
-  }
-
-  public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) {
-    // comparison, without timestamp
-    Key akey = a.getKey();
-    Key bkey = b.getKey();
-    return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue());
-  }
-
-  @Test
-  public void test() throws Exception {
-    Connector conn = getConnector();
-    final String table = getUniqueNames(1)[0];
-    conn.tableOperations().create(table);
-    String tableId = conn.tableOperations().tableIdMap().get(table);
-    update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes()));
-    Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME);
-    conn.tableOperations().flush(table, null, null, true);
-    assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, Range.prefix(tableId))));
-    conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
-    update(conn, MetadataTable.NAME, logRef);
-    assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME)));
-    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
-    conn.tableOperations().flush(RootTable.NAME, null, null, true);
-
-    ClusterControl control = cluster.getClusterControl();
-    control.stopAllServers(ServerType.TABLET_SERVER);
-    control.startAllServers(ServerType.TABLET_SERVER);
-
-    Scanner s = conn.createScanner(table, Authorizations.EMPTY);
-    int count = 0;
-    for (Entry<Key,Value> e : s) {
-      assertEquals(e.getKey().getRow().toString(), "row");
-      assertEquals(e.getKey().getColumnFamily().toString(), "cf");
-      assertEquals(e.getKey().getColumnQualifier().toString(), "cq");
-      assertEquals(e.getValue().toString(), "value");
-      count++;
-    }
-    assertEquals(1, count);
-    for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) {
-      assertFalse(equals(ref, logRef));
-    }
-  }
-
-  private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception {
-    Key k = logRef.getKey();
-    update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue());
-  }
-
-  private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception {
-    return getLogRefs(conn, table, new Range());
-  }
-
-  private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception {
-    Scanner s = conn.createScanner(table, Authorizations.EMPTY);
-    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
-    s.setRange(r);
-    return s;
-  }
-
-  private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception {
-    return getLogRefs(conn, table).iterator().next();
-  }
-
-  private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception {
-    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-    Mutation m = new Mutation(row);
-    m.put(cf, cq, value);
-    bw.addMutation(m);
-    bw.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 72f098f..21d1115 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -299,7 +299,7 @@ public class ShellServerIT extends SharedMiniClusterIT {
     ts.exec("config -t " + table2 + " -np", true, "345M", true);
     ts.exec("getsplits -t " + table2, true, "row5", true);
     ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true);
-    ts.exec("onlinetable " + table, true);
+    ts.exec("online " + table, true);
     ts.exec("deletetable -f " + table, true);
     ts.exec("deletetable -f " + table2, true);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
index 6c81369..d83f038 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java
@@ -201,10 +201,10 @@ public class ExamplesIT extends AccumuloClusterIT {
     Entry<Integer,String> entry = getClusterControl().execWithStdout(
         Ingest.class,
         new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable", dataTable,
-            "--vis", visibility, "--chunkSize", Integer.toString(10000), getUsableDir()});
+            "--vis", visibility, "--chunkSize", Integer.toString(10000), System.getProperty("user.dir") + "/src/test"});
     assertEquals("Got non-zero return code. Stdout=" + entry.getValue(), 0, entry.getKey().intValue());
     entry = getClusterControl().execWithStdout(QueryUtil.class,
-        new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", "accumulo-site.xml"});
+        new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", "log4j.properties"});
     if (ClusterType.MINI == getClusterType()) {
       MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster;
       for (LogWriter writer : impl.getLogWriters()) {
@@ -214,7 +214,7 @@ public class ExamplesIT extends AccumuloClusterIT {
 
     log.info("result " + entry.getValue());
     assertEquals(0, entry.getKey().intValue());
-    assertTrue(entry.getValue().contains("accumulo-site.xml"));
+    assertTrue(entry.getValue().contains("log4j.properties"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
index bd0555b..4c86172 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@ -51,8 +51,8 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT {
       int n = socket.getInputStream().read(buffer);
       String response = new String(buffer, 0, n);
       long total = Long.parseLong(response.split(":")[1].trim());
-      assertTrue("Total watches was not greater than 500, but was " + total, total > 500);
-      assertTrue("Total watches was not less than 600, but was " + total, total < 600);
+      assertTrue("Total watches was not greater than 600, but was " + total, total > 600);
+      assertTrue("Total watches was not less than 600, but was " + total, total < 675);
     } finally {
       socket.close();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
new file mode 100644
index 0000000..fcd1fd7
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.test.continuous.ContinuousIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class RollWALPerformanceIT extends ConfigurableMacIT {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M");
+    cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100");
+    cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.useMiniDFS(true);
+  }
+
+  private long ingest() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    log.info("Creating the table");
+    c.tableOperations().create(tableName);
+
+    log.info("Splitting the table");
+    final long SPLIT_COUNT = 100;
+    final long distance = Long.MAX_VALUE / SPLIT_COUNT;
+    final SortedSet<Text> splits = new TreeSet<Text>();
+    for (int i = 1; i < SPLIT_COUNT; i++) {
+      splits.add(new Text(String.format("%016x", i * distance)));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+
+    log.info("Waiting for balance");
+    c.instanceOperations().waitForBalance();
+
+    final Instance inst = c.getInstance();
+
+    log.info("Starting ingest");
+    final long start = System.currentTimeMillis();
+    final String args[] = {
+        "-i", inst.getInstanceName(),
+        "-z", inst.getZooKeepers(),
+        "-u", "root",
+        "-p", ROOT_PASSWORD,
+        "--batchThreads", "2",
+        "--table", tableName,
+        "--num", Long.toString(1000*1000),  // 1M 100 byte entries
+    };
+
+    ContinuousIngest.main(args);
+    final long result = System.currentTimeMillis() - start;
+    log.debug(String.format("Finished in %,d ms", result));
+    log.debug("Dropping table");
+    c.tableOperations().delete(tableName);
+    return result;
+  }
+
+  private long getAverage() throws Exception {
+    final int REPEAT = 3;
+    long totalTime = 0;
+    for (int i = 0; i < REPEAT; i++) {
+      totalTime += ingest();
+    }
+    return totalTime / REPEAT;
+  }
+
+  private void testWalPerformanceOnce() throws Exception {
+    // get time with a small WAL, which will cause many WAL roll-overs
+    long avg1 = getAverage();
+    // use a bigger WAL max size to eliminate WAL roll-overs
+    Connector c = getConnector();
+    c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G");
+    c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+    c.tableOperations().flush(RootTable.NAME, null, null, true);
+    for (ProcessReference  tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
+      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+    }
+    getCluster().start();
+    long avg2 = getAverage();
+    log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2));
+    assertTrue(avg1 > avg2);
+    double percent = (100. * avg1) / avg2;
+    log.info(String.format("Percent of large log: %.2f%%", percent));
+    assertTrue(percent < 125.);
+  }
+
+  @Test(timeout= 20 * 60 * 1000)
+  public void testWalPerformance() throws Exception {
+    testWalPerformanceOnce();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 1ef47e5..9af5445 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
@@ -78,6 +79,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
     cfg.setNumTservers(1);
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
     cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
     // Wait longer to try to let the replication table come online before a cycle runs
     cfg.setProperty(Property.GC_CYCLE_START, "10s");
@@ -102,18 +104,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
     Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    Range r = MetadataSchema.TabletsSection.getRange(tableId);
-    s.setRange(r);
-    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+    s.setRange(CurrentLogsSection.getRange());
+    s.fetchColumnFamily(CurrentLogsSection.COLF);
 
     Set<String> wals = new HashSet<String>();
     for (Entry<Key,Value> entry : s) {
       log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
       // hostname:port/uri://path/to/wal
-      String cq = entry.getKey().getColumnQualifier().toString();
-      int index = cq.indexOf('/');
-      // Normalize the path
-      String path = new Path(cq.substring(index + 1)).toString();
+      String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
       log.debug("Extracted file: " + path);
       wals.add(path);
     }
@@ -228,11 +226,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 
-    log.info("Checking to see that log entries are removed from tablet section after MinC");
-    // After compaction, the log column should be gone from the tablet
-    Set<String> walsAfterMinc = getWalsForTable(table);
-    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
     Set<String> filesForTable = getFilesForTable(table);
     Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
     log.info("Files for table before MajC: {}", filesForTable);
@@ -258,14 +251,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
       fileExists = fs.exists(fileToBeDeleted);
     }
 
-    // At this point in time, we *know* that the GarbageCollector has run which means that the Status
-    // for our WAL should not be altered.
-
-    log.info("Re-checking that WALs are still not referenced for our table");
-
-    Set<String> walsAfterMajc = getWalsForTable(table);
-    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
     Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
     Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 
@@ -326,11 +311,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 
-    log.info("Checking to see that log entries are removed from tablet section after MinC");
-    // After compaction, the log column should be gone from the tablet
-    Set<String> walsAfterMinc = getWalsForTable(table);
-    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
-
     Set<String> filesForTable = getFilesForTable(table);
     Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
     log.info("Files for table before MajC: {}", filesForTable);
@@ -359,11 +339,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     // At this point in time, we *know* that the GarbageCollector has run which means that the Status
     // for our WAL should not be altered.
 
-    log.info("Re-checking that WALs are still not referenced for our table");
-
-    Set<String> walsAfterMajc = getWalsForTable(table);
-    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
-
     Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
     Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index 125286f..14812c4 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -146,7 +146,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
     }
   }
 
-  @Test
+  @Test(timeout = 10 * 60 * 1000)
   public void dataWasReplicatedToThePeer() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
         ROOT_PASSWORD);


Mime
View raw message