accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [10/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL metadata table updates"
Date Sun, 10 May 2015 21:05:56 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 6f2c9a2..df0848d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.tserver;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
@@ -29,7 +30,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -45,7 +45,6 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -148,8 +147,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerOpts;
-import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -1501,7 +1500,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       final AssignmentHandler ah = new AssignmentHandler(extent);
       // final Runnable ah = new LoggingRunnable(log, );
       // Root tablet assignment must take place immediately
-
       if (extent.isRootTablet()) {
         new Daemon("Root Tablet Assignment") {
           @Override
@@ -1694,6 +1692,66 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     }
 
     @Override
+    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+      String myname = getClientAddressString();
+      myname = myname.replace(':', '+');
+      Set<String> loggers = new HashSet<String>();
+      logger.getLogFiles(loggers);
+      Set<String> loggerUUIDs = new HashSet<String>();
+      for (String logger : loggers)
+        loggerUUIDs.add(new Path(logger).getName());
+
+      nextFile: for (String filename : filenames) {
+        String uuid = new Path(filename).getName();
+        // skip any log we're currently using
+        if (loggerUUIDs.contains(uuid))
+          continue nextFile;
+
+        List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
+        synchronized (onlineTablets) {
+          onlineTabletsCopy.addAll(onlineTablets.values());
+        }
+        for (Tablet tablet : onlineTabletsCopy) {
+          for (String current : tablet.getCurrentLogFiles()) {
+            if (current.contains(uuid)) {
+              log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
+              continue nextFile;
+            }
+          }
+        }
+
+        try {
+          Path source = new Path(filename);
+          if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
+            Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
+            fs.mkdirs(walogArchive);
+            Path dest = new Path(walogArchive, source.getName());
+            log.info("Archiving walog " + source + " to " + dest);
+            if (!fs.rename(source, dest))
+              log.error("rename is unsuccessful");
+          } else {
+            log.info("Deleting walog " + filename);
+            Path sourcePath = new Path(filename);
+            if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath))
+                && !fs.deleteRecursively(sourcePath))
+              log.warn("Failed to delete walog " + source);
+            for (String recovery : ServerConstants.getRecoveryDirs()) {
+              Path recoveryPath = new Path(recovery, source.getName());
+              try {
+                if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
+                  log.info("Deleted any recovery log " + filename);
+              } catch (FileNotFoundException ex) {
+                // ignore
+              }
+            }
+          }
+        } catch (IOException e) {
+          log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
+        }
+      }
+    }
+
+    @Override
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       try {
         checkPermission(credentials, null, "getActiveCompactions");
@@ -1714,23 +1772,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
     @Override
     public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
-      String log = logger.getLogFile();
-      // Might be null if there no active logger
-      if (null == log) {
-        return Collections.emptyList();
-      }
-      return Collections.singletonList(log);
-    }
-
-    @Override
-    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
-      log.warn("Garbage collector is attempting to remove logs through the tablet server");
-      log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + "Restart your file Garbage Collector.");
+      Set<String> logs = new HashSet<String>();
+      logger.getLogFiles(logs);
+      return new ArrayList<String>(logs);
     }
   }
 
   private class SplitRunner implements Runnable {
-    private final Tablet tablet;
+    private Tablet tablet;
 
     public SplitRunner(Tablet tablet) {
       this.tablet = tablet;
@@ -1984,7 +2033,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
           log.error("Unexpected error ", e);
         }
         log.debug("Unassigning " + tls);
-        TabletStateStore.unassign(TabletServer.this, tls, null);
+        TabletStateStore.unassign(TabletServer.this, tls);
       } catch (DistributedStoreException ex) {
         log.warn("Unable to update storage", ex);
       } catch (KeeperException e) {
@@ -2188,6 +2237,29 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     }
   }
 
+  public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) {
+    if (!this.onlineTablets.containsKey(extent)) {
+      log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline");
+      // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs,
+      // if it doesn't, we'll need to do the same recovery with the old files.
+      return;
+    }
+
+    log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id);
+    long now = RelativeTime.currentTimeMillis();
+    List<String> logSet = new ArrayList<String>();
+    for (DfsLogger log : logs)
+      logSet.add(log.getFileName());
+    LogEntry entry = new LogEntry();
+    entry.extent = extent;
+    entry.tabletId = id;
+    entry.timestamp = now;
+    entry.server = logs.get(0).getLogger();
+    entry.filename = logs.get(0).getFileName();
+    entry.logSet = logSet;
+    MetadataTableUtil.addLogEntry(this, entry, getLock());
+  }
+
   private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName)
       throws UnknownHostException {
     Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -2906,7 +2978,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions.incrementAndGet();
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
-    markUnusedWALs();
   }
 
   public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException {
@@ -2925,11 +2996,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     });
     for (LogEntry entry : sorted) {
       Path recovery = null;
-      Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename));
-      finished = SortedLogState.getFinishedMarkerPath(finished);
-      TabletServer.log.info("Looking for " + finished);
-      if (fs.exists(finished)) {
-        recovery = finished.getParent();
+      for (String log : entry.logSet) {
+        Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log));
+        finished = SortedLogState.getFinishedMarkerPath(finished);
+        TabletServer.log.info("Looking for " + finished);
+        if (fs.exists(finished)) {
+          recovery = finished.getParent();
+          break;
+        }
       }
       if (recovery == null)
         throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry);
@@ -2966,9 +3040,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   }
 
   public Collection<Tablet> getOnlineTablets() {
-    synchronized (onlineTablets) {
-      return new ArrayList<Tablet>(onlineTablets.values());
-    }
+    return Collections.unmodifiableCollection(onlineTablets.values());
   }
 
   public VolumeManager getFileSystem() {
@@ -2994,61 +3066,4 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   public SecurityOperation getSecurityOperation() {
     return security;
   }
-
-  // avoid unnecessary redundant markings to meta
-  final ConcurrentHashMap<DfsLogger,EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
-  final Object levelLocks[] = new Object[TabletLevel.values().length];
-  {
-    for (int i = 0; i < levelLocks.length; i++) {
-      levelLocks[i] = new Object();
-    }
-  }
-
-  // remove any meta entries after a rolled log is no longer referenced
-  Set<DfsLogger> closedLogs = new HashSet<>();
-
-  private void markUnusedWALs() {
-    Set<DfsLogger> candidates;
-    synchronized (closedLogs) {
-      candidates = new HashSet<>(closedLogs);
-    }
-    for (Tablet tablet : getOnlineTablets()) {
-      candidates.removeAll(tablet.getCurrentLogFiles());
-    }
-    try {
-      Set<Path> filenames = new HashSet<>();
-      for (DfsLogger candidate : candidates) {
-        filenames.add(candidate.getPath());
-      }
-      MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
-      synchronized (closedLogs) {
-        closedLogs.removeAll(candidates);
-      }
-    } catch (AccumuloException ex) {
-      log.info(ex.toString(), ex);
-    }
-  }
-
-  public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) {
-    // serialize the updates to the metadata per level: avoids updating the level more than once
-    // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
-    synchronized (levelLocks[level.ordinal()]) {
-      EnumSet<TabletLevel> set = null;
-      set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
-      if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
-        log.info("Writing log marker for level " + level + " " + copy.getFileName());
-        MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level);
-      }
-      set = metadataTableLogs.get(copy);
-      set.add(level);
-    }
-  }
-
-  public void walogClosed(DfsLogger currentLog) {
-    metadataTableLogs.remove(currentLog);
-    synchronized (closedLogs) {
-      closedLogs.add(currentLog);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cd7ce08..8512690 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -72,7 +72,7 @@ import com.google.common.base.Optional;
  * Wrap a connection to a logger.
  *
  */
-public class DfsLogger implements Comparable<DfsLogger> {
+public class DfsLogger {
   public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
   public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 
@@ -371,7 +371,6 @@ public class DfsLogger implements Comparable<DfsLogger> {
 
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
-    log.debug("Address is " + address);
     String logger = Joiner.on("+").join(address.split(":"));
 
     log.debug("DfsLogger.open() begin");
@@ -464,11 +463,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
   }
 
   public String getFileName() {
-    return logPath;
-  }
-
-  public Path getPath() {
-    return new Path(logPath);
+    return logPath.toString();
   }
 
   public void close() throws IOException {
@@ -614,9 +609,4 @@ public class DfsLogger implements Comparable<DfsLogger> {
     return Joiner.on(":").join(parts[parts.length - 2].split("[+]"));
   }
 
-  @Override
-  public int compareTo(DfsLogger o) {
-    return getFileName().compareTo(o.getFileName());
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index ab3dea2..37882cd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -180,7 +180,7 @@ public class SortedLogRecovery {
     // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
     // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
     while (reader.next(key, value)) {
-      // log.debug("Event " + key.event + " tablet " + key.tablet);
+      // LogReader.printEntry(entry);
       if (key.event != DEFINE_TABLET)
         break;
       if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
@@ -209,7 +209,7 @@ public class SortedLogRecovery {
         if (lastStartToFinish.compactionStatus == Status.INITIAL)
           lastStartToFinish.compactionStatus = Status.COMPLETE;
         if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
         lastStartToFinish.update(fileno, key.seq);
 
         // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
@@ -218,7 +218,7 @@ public class SortedLogRecovery {
           lastStartToFinish.update(-1);
       } else if (key.event == COMPACTION_FINISH) {
         if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart);
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
         if (lastStartToFinish.compactionStatus == Status.INITIAL)
           lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
         else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
@@ -249,6 +249,8 @@ public class SortedLogRecovery {
         break;
       if (key.tid != tid)
         break;
+      // log.info("Replaying " + key);
+      // log.info(value);
       if (key.event == MUTATION) {
         mr.receive(value.mutations.get(0));
       } else if (key.event == MANY_MUTATIONS) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 3fb3c86..1d385d9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -21,16 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -39,9 +37,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
@@ -76,22 +72,20 @@ public class TabletServerLogger {
 
   private final TabletServer tserver;
 
-  // The current logger
-  private DfsLogger currentLog = null;
-  private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>();
-  private ThreadPoolExecutor nextLogMaker;
+  // The current log set: always updated to a new set with every change of loggers
+  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
 
-  // The current generation of logs.
-  // Because multiple threads can be using a log at one time, a log
+  // The current generation of logSet.
+  // Because multiple threads can be using a log set at one time, a log
   // failure is likely to affect multiple threads, who will all attempt to
-  // create a new log. This will cause many unnecessary updates to the
+  // create a new logSet. This will cause many unnecessary updates to the
   // metadata table.
   // We'll use this generational counter to determine if another thread has
-  // already fetched a new log.
-  private final AtomicInteger logId = new AtomicInteger();
+  // already fetched a new logSet.
+  private AtomicInteger logSetId = new AtomicInteger();
 
   // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
-  private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
 
   private final AtomicInteger seqGen = new AtomicInteger();
 
@@ -152,74 +146,62 @@ public class TabletServerLogger {
     this.flushCounter = flushCounter;
   }
 
-  private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
-    final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>();
-    testLockAndRun(logIdLock, new TestCallWithWriteLock() {
+  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
+    final int[] result = {-1};
+    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       @Override
       boolean test() {
-        result.set(currentLog);
-        if (currentLog != null)
-          logIdOut.set(logId.get());
-        return currentLog == null;
+        copy.clear();
+        copy.addAll(loggers);
+        if (!loggers.isEmpty())
+          result[0] = logSetId.get();
+        return loggers.isEmpty();
       }
 
       @Override
       void withWriteLock() throws IOException {
         try {
-          createLogger();
-          result.set(currentLog);
-          if (currentLog != null)
-            logIdOut.set(logId.get());
+          createLoggers();
+          copy.clear();
+          copy.addAll(loggers);
+          if (copy.size() > 0)
+            result[0] = logSetId.get();
           else
-            logIdOut.set(-1);
+            result[0] = -1;
         } catch (IOException e) {
           log.error("Unable to create loggers", e);
         }
       }
     });
-    return result.get();
+    return result[0];
   }
 
-  /**
-   * Get the current WAL file
-   *
-   * @return The name of the current log, or null if there is no current log.
-   */
-  public String getLogFile() {
-    logIdLock.readLock().lock();
+  public void getLogFiles(Set<String> loggersOut) {
+    logSetLock.readLock().lock();
     try {
-      if (null == currentLog) {
-        return null;
+      for (DfsLogger logger : loggers) {
+        loggersOut.add(logger.getFileName());
       }
-      return currentLog.getFileName();
     } finally {
-      logIdLock.readLock().unlock();
+      logSetLock.readLock().unlock();
     }
   }
 
-  synchronized private void createLogger() throws IOException {
-    if (!logIdLock.isWriteLockedByCurrentThread()) {
+  synchronized private void createLoggers() throws IOException {
+    if (!logSetLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("createLoggers should be called with write lock held!");
     }
 
-    if (currentLog != null) {
-      throw new IllegalStateException("createLoggers should not be called when current log is set");
+    if (loggers.size() != 0) {
+      throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
     }
 
     try {
-      startLogMaker();
-      Object next = nextLog.take();
-      if (next instanceof Exception) {
-        throw (Exception) next;
-      }
-      if (next instanceof DfsLogger) {
-        currentLog = (DfsLogger) next;
-        logId.incrementAndGet();
-        log.info("Using next log " + currentLog.getFileName());
-        return;
-      } else {
-        throw new RuntimeException("Error: unexpected type seen: " + next);
-      }
+      DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+      alog.open(tserver.getClientAddressString());
+      loggers.add(alog);
+      logSetId.incrementAndGet();
+      return;
     } catch (Exception t) {
       walErrors.put(System.currentTimeMillis(), "");
       if (walErrors.size() >= HALT_AFTER_ERROR_COUNT) {
@@ -229,63 +211,22 @@ public class TabletServerLogger {
     }
   }
 
-  private synchronized void startLogMaker() {
-    if (nextLogMaker != null) {
-      return;
-    }
-    nextLogMaker = new SimpleThreadPool(1, "WALog creator");
-    nextLogMaker.submit(new Runnable() {
-      @Override
-      public void run() {
-        while (!nextLogMaker.isShutdown()) {
-          try {
-            log.debug("Creating next WAL");
-            DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
-            alog.open(tserver.getClientAddressString());
-            log.debug("Created next WAL " + alog.getFileName());
-            while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
-              log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
-            }
-          } catch (Exception t) {
-            log.error("{}", t.getMessage(), t);
-            try {
-              nextLog.offer(t, 12, TimeUnit.HOURS);
-            } catch (InterruptedException ex) {
-              // ignore
-            }
-          }
-        }
-      }
-    });
-  }
-
-  public void resetLoggers() throws IOException {
-    logIdLock.writeLock().lock();
-    try {
-      close();
-    } finally {
-      logIdLock.writeLock().unlock();
-    }
-  }
-
   synchronized private void close() throws IOException {
-    if (!logIdLock.isWriteLockedByCurrentThread()) {
+    if (!logSetLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      if (null != currentLog) {
+      for (DfsLogger logger : loggers) {
         try {
-          currentLog.close();
+          logger.close();
         } catch (DfsLogger.LogClosedException ex) {
           // ignore
         } catch (Throwable ex) {
-          log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex);
-        } finally {
-          this.tserver.walogClosed(currentLog);
+          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex);
         }
-        currentLog = null;
-        logSizeEstimate.set(0);
       }
+      loggers.clear();
+      logSizeEstimate.set(0);
     } catch (Throwable t) {
       throw new IOException(t);
     }
@@ -302,7 +243,7 @@ public class TabletServerLogger {
 
   private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
     // Work very hard not to lock this during calls to the outside world
-    int currentLogId = logId.get();
+    int currentLogSet = logSetId.get();
 
     int seq = -1;
     int attempt = 1;
@@ -310,22 +251,20 @@ public class TabletServerLogger {
     while (!success) {
       try {
         // get a reference to the loggers that no other thread can touch
-        DfsLogger copy = null;
-        AtomicInteger currentId = new AtomicInteger(-1);
-        copy = initializeLoggers(currentId);
-        currentLogId = currentId.get();
+        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
+        currentLogSet = initializeLoggers(copy);
 
         // add the logger to the log set for the memory in the tablet,
         // update the metadata table if we've never used this tablet
 
-        if (currentLogId == logId.get()) {
+        if (currentLogSet == logSetId.get()) {
           for (CommitSession commitSession : sessions) {
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
                 defineTablet(commitSession);
-                if (currentLogId == logId.get())
-                  tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent()));
+                if (currentLogSet == logSetId.get())
+                  tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }
@@ -333,29 +272,39 @@ public class TabletServerLogger {
               // Need to release
               KeyExtent extent = commitSession.getExtent();
               if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
-                Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis());
-                log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName());
+                Set<String> logs = new HashSet<String>();
+                for (DfsLogger logger : copy) {
+                  logs.add(logger.getFileName());
+                }
+                Status status = StatusUtil.fileCreated(System.currentTimeMillis());
+                log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs);
                 // Got some new WALs, note this in the metadata table
-                ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status);
+                ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status);
               }
             }
           }
         }
 
         // Make sure that the logs haven't changed out from underneath our copy
-        if (currentLogId == logId.get()) {
+        if (currentLogSet == logSetId.get()) {
 
           // write the mutation to the logs
           seq = seqGen.incrementAndGet();
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
-          LoggerOperation lop = writer.write(copy, seq);
-          if (lop != null) {
+          ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
+          for (DfsLogger wal : copy) {
+            LoggerOperation lop = writer.write(wal, seq);
+            if (lop != null)
+              queuedOperations.add(lop);
+          }
+
+          for (LoggerOperation lop : queuedOperations) {
             lop.await();
           }
 
           // double-check: did the log set change?
-          success = (currentLogId == logId.get());
+          success = (currentLogSet == logSetId.get());
         }
       } catch (DfsLogger.LogClosedException ex) {
         log.debug("Logs closed while writing, retrying " + attempt);
@@ -370,13 +319,13 @@ public class TabletServerLogger {
       // Some sort of write failure occurred. Grab the write lock and reset the logs.
       // But since multiple threads will attempt it, only attempt the reset when
       // the logs haven't changed.
-      final int finalCurrent = currentLogId;
+      final int finalCurrent = currentLogSet;
       if (!success) {
-        testLockAndRun(logIdLock, new TestCallWithWriteLock() {
+        testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 
           @Override
           boolean test() {
-            return finalCurrent == logId.get();
+            return finalCurrent == logSetId.get();
           }
 
           @Override
@@ -389,7 +338,7 @@ public class TabletServerLogger {
     }
     // if the log gets too big, reset it .. grab the write lock first
     logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
-    testLockAndRun(logIdLock, new TestCallWithWriteLock() {
+    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
       @Override
       boolean test() {
         return logSizeEstimate.get() > maxSize;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index dee705c..d908f1d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.accumulo.core.data.Mutation;
@@ -85,7 +86,7 @@ public class CommitSession {
     return committer;
   }
 
-  public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) {
+  public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
     return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index ab15ccc..db1b418 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -424,9 +424,7 @@ class DatafileManager {
         if (log.isDebugEnabled()) {
           log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly);
         }
-        for (String logFile : logFileOnly) {
-          ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength());
-        }
+        ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength());
       }
     } finally {
       tablet.finishClearingUnusedLogs();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 17864be..1f4625b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -37,7 +37,6 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -201,7 +200,7 @@ public class Tablet implements TabletCommitter {
   }
 
   // stores info about user initiated major compaction that is waiting on a minor compaction to finish
-  private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
+  private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
 
   static enum CompactionState {
     WAITING_TO_START, IN_PROGRESS
@@ -628,8 +627,8 @@ public class Tablet implements TabletCommitter {
           // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress.
           Status status = StatusUtil.openWithUnknownLength();
           for (LogEntry logEntry : logEntries) {
-            log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status));
-            ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status);
+            log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
+            ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status);
           }
         }
 
@@ -641,9 +640,11 @@ public class Tablet implements TabletCommitter {
         }
       }
       // make some closed references that represent the recovered logs
-      currentLogs = new ConcurrentSkipListSet<DfsLogger>();
+      currentLogs = new HashSet<DfsLogger>();
       for (LogEntry logEntry : logEntries) {
-        currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString()));
+        for (String log : logEntry.logSet) {
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
+        }
       }
 
       log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries()
@@ -934,9 +935,7 @@ public class Tablet implements TabletCommitter {
 
     long count = 0;
 
-    String oldName = Thread.currentThread().getName();
     try {
-      Thread.currentThread().setName("Minor compacting " + this.extent);
       Span span = Trace.start("write");
       CompactionStats stats;
       try {
@@ -967,7 +966,6 @@ public class Tablet implements TabletCommitter {
       failed = true;
       throw new RuntimeException(e);
     } finally {
-      Thread.currentThread().setName(oldName);
       try {
         getTabletMemory().finalizeMinC();
       } catch (Throwable t) {
@@ -992,7 +990,7 @@ public class Tablet implements TabletCommitter {
   private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
     CommitSession oldCommitSession = getTabletMemory().prepareForMinC();
     otherLogs = currentLogs;
-    currentLogs = new ConcurrentSkipListSet<DfsLogger>();
+    currentLogs = new HashSet<DfsLogger>();
 
     FileRef mergeFile = null;
     if (mincReason != MinorCompactionReason.RECOVERY) {
@@ -2376,11 +2374,14 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>();
+  private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>();
 
-  // currentLogs may be updated while a tablet is otherwise locked
-  public Set<DfsLogger> getCurrentLogFiles() {
-    return new HashSet<DfsLogger>(currentLogs);
+  public synchronized Set<String> getCurrentLogFiles() {
+    Set<String> result = new HashSet<String>();
+    for (DfsLogger log : currentLogs) {
+      result.add(log.getFileName());
+    }
+    return result;
   }
 
   Set<String> beginClearingUnusedLogs() {
@@ -2439,13 +2440,13 @@ public class Tablet implements TabletCommitter {
   // this lock is basically used to synchronize writing of log info to metadata
   private final ReentrantLock logLock = new ReentrantLock();
 
-  public int getLogCount() {
+  public synchronized int getLogCount() {
     return currentLogs.size();
   }
 
   // don't release the lock if this method returns true for success; instead, the caller should clean up by calling finishUpdatingLogsUsed()
   @Override
-  public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) {
+  public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) {
 
     boolean releaseLock = true;
 
@@ -2482,26 +2483,28 @@ public class Tablet implements TabletCommitter {
 
         int numAdded = 0;
         int numContained = 0;
-        if (addToOther) {
-          if (otherLogs.add(more))
-            numAdded++;
+        for (DfsLogger logger : more) {
+          if (addToOther) {
+            if (otherLogs.add(logger))
+              numAdded++;
 
-          if (currentLogs.contains(more))
-            numContained++;
-        } else {
-          if (currentLogs.add(more))
-            numAdded++;
+            if (currentLogs.contains(logger))
+              numContained++;
+          } else {
+            if (currentLogs.add(logger))
+              numAdded++;
 
-          if (otherLogs.contains(more))
-            numContained++;
+            if (otherLogs.contains(logger))
+              numContained++;
+          }
         }
 
-        if (numAdded > 0 && numAdded != 1) {
+        if (numAdded > 0 && numAdded != more.size()) {
           // expect to add all or none
           throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs);
         }
 
-        if (numContained > 0 && numContained != 1) {
+        if (numContained > 0 && numContained != more.size()) {
           // expect to contain all or none
           throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs);
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index 934ce20..c7e3a66 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.tablet;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.accumulo.core.client.Durability;
@@ -37,7 +38,7 @@ public interface TabletCommitter {
   /**
    * If this method returns true, the caller must call {@link #finishUpdatingLogsUsed()} to clean up
    */
-  boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish);
+  boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
 
   void finishUpdatingLogsUsed();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
deleted file mode 100644
index 44058d3..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.log;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class LogEntryTest {
-
-  @Test
-  public void test() throws Exception {
-    KeyExtent extent = new KeyExtent(new Text("1"), null, new Text(""));
-    long ts = 12345678L;
-    String server = "localhost:1234";
-    String filename = "default/foo";
-    LogEntry entry = new LogEntry(extent, ts, server, filename);
-    assertEquals(extent, entry.extent);
-    assertEquals(server, entry.server);
-    assertEquals(filename, entry.filename);
-    assertEquals(ts, entry.timestamp);
-    assertEquals("1<; default/foo", entry.toString());
-    assertEquals(new Text("log"), entry.getColumnFamily());
-    assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier());
-    LogEntry copy = LogEntry.fromBytes(entry.toBytes());
-    assertEquals(entry.toString(), copy.toString());
-    Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo"));
-    key.setTimestamp(ts);
-    LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue());
-    assertEquals(entry.toString(), copy2.toString());
-    assertEquals(entry.timestamp, copy2.timestamp);
-    assertEquals("foo", entry.getUniqueID());
-    assertEquals("localhost:1234/default/foo", entry.getName());
-    assertEquals(new Value("default/foo".getBytes()), entry.getValue());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 1186c68..d0de29f 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -202,6 +202,9 @@ public class NullTserver {
     }
 
     @Override
+    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {}
+
+    @Override
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveCompaction>();
     }
@@ -228,9 +231,6 @@ public class NullTserver {
     public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
       return null;
     }
-
-    @Override
-    public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { }
   }
 
   static class Opts extends Help {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
index 53653da..6338e00 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -60,11 +60,6 @@ import com.google.common.net.HostAndPort;
 public class ProxyDurabilityIT extends ConfigurableMacIT {
 
   @Override
-  protected int defaultTimeoutSeconds() {
-    return 60;
-  }
-
-  @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
@@ -116,7 +111,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
     assertEquals(0, count(tableName));
 
     ConditionalWriterOptions cfg = new ConditionalWriterOptions();
-    cfg.setDurability(Durability.SYNC);
+    cfg.setDurability(Durability.LOG);
     String cwriter = client.createConditionalWriter(login, tableName, cfg);
     ConditionalUpdates updates = new ConditionalUpdates();
     updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
@@ -125,7 +120,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT {
     assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
     assertEquals(1, count(tableName));
     restartTServer();
-    assertEquals(1, count(tableName));
+    assertEquals(0, count(tableName));
 
     proxyServer.stop();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
index 0dcdf42..25337b2 100644
--- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java
@@ -54,7 +54,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT {
 
   @Override
   public int defaultTimeoutSeconds() {
-    return 120;
+    return 60;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
index 8703f18..f793925 100644
--- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -20,33 +20,25 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class BalanceIT extends AccumuloClusterIT {
-  private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
+public class BalanceIT extends ConfigurableMacIT {
 
-  @Override
-  public int defaultTimeoutSeconds() {
-    return 60;
-  }
-
-  @Test
+  @Test(timeout = 60 * 1000)
   public void testBalance() throws Exception {
     String tableName = getUniqueNames(1)[0];
     Connector c = getConnector();
-    log.info("Creating table");
+    System.out.println("Creating table");
     c.tableOperations().create(tableName);
     SortedSet<Text> splits = new TreeSet<Text>();
     for (int i = 0; i < 10; i++) {
       splits.add(new Text("" + i));
     }
-    log.info("Adding splits");
+    System.out.println("Adding splits");
     c.tableOperations().addSplits(tableName, splits);
-    log.info("Waiting for balance");
+    System.out.println("Waiting for balance");
     c.instanceOperations().waitForBalance();
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index 3f9e1cc..d754a14 100644
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@ -129,7 +129,6 @@ public class CleanWalIT extends AccumuloClusterIT {
   private int countLogs(String tableName, Connector conn) throws TableNotFoundException {
     Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
-    scanner.setRange(MetadataSchema.TabletsSection.getRange());
     int count = 0;
     for (Entry<Key,Value> entry : scanner) {
       log.debug("Saw " + entry.getKey() + "=" + entry.getValue());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 65be396..b7637a6 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1294,7 +1294,6 @@ public class ConditionalWriterIT extends AccumuloClusterIT {
     conn.tableOperations().create(tableName);
 
     DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
-    UtilWaitThread.sleep(1000);
     Span root = Trace.on("traceTest");
     ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
deleted file mode 100644
index 96ae579..0000000
--- a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.junit.Test;
-
-import com.google.common.collect.Iterators;
-
-public class GarbageCollectWALIT extends ConfigurableMacIT {
-
-  @Override
-  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
-    cfg.setNumTservers(1);
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  @Test(timeout = 2 * 60 * 1000)
-  public void test() throws Exception {
-    // not yet, please
-    String tableName = getUniqueNames(1)[0];
-    cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
-    Connector c = getConnector();
-    c.tableOperations().create(tableName);
-    // count the number of WALs in the filesystem
-    assertEquals(2, countWALsInFS(cluster));
-    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
-    cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
-    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
-    Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
-    // let GC run
-    UtilWaitThread.sleep(3 * 5 * 1000);
-    assertEquals(2, countWALsInFS(cluster));
-  }
-
-  private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
-    FileSystem fs = cluster.getFileSystem();
-    RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
-    int result = 0;
-    while (iterator.hasNext()) {
-      LocatedFileStatus next = iterator.next();
-      if (!next.isDirectory()) {
-        result++;
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
index 27f1f69..b78a311 100644
--- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -26,7 +27,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -127,7 +127,11 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
     String tableId = conn.tableOperations().tableIdMap().get(tableName);
     Assert.assertNotNull("Table ID was null", tableId);
 
-    LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString());
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = emptyWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
 
     log.info("Taking {} offline", tableName);
     conn.tableOperations().offline(tableName, true);
@@ -182,7 +186,11 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT {
     String tableId = conn.tableOperations().tableIdMap().get(tableName);
     Assert.assertNotNull("Table ID was null", tableId);
 
-    LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString());
+    LogEntry logEntry = new LogEntry();
+    logEntry.server = "127.0.0.1:12345";
+    logEntry.filename = partialHeaderWalog.toURI().toString();
+    logEntry.tabletId = 10;
+    logEntry.logSet = Collections.singleton(logEntry.filename);
 
     log.info("Taking {} offline", tableName);
     conn.tableOperations().offline(tableName, true);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
new file mode 100644
index 0000000..6a9975c
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Verify that a recovery of a log without any mutations removes the log reference
+public class NoMutationRecoveryIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(NoMutationRecoveryIT.class);
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Before
+  public void takeTraceTableOffline() throws Exception {
+    Connector conn = getConnector();
+    if (conn.tableOperations().exists("trace")) {
+      conn.tableOperations().offline("trace", true);
+    }
+  }
+
+  @After
+  public void takeTraceTableOnline() throws Exception {
+    Connector conn = getConnector();
+    if (conn.tableOperations().exists("trace")) {
+      conn.tableOperations().online("trace", true);
+    }
+  }
+
+  public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) {
+    // comparison, without timestamp
+    Key akey = a.getKey();
+    Key bkey = b.getKey();
+    log.info("Comparing {} to {}", akey.toStringNoTruncate(), bkey.toStringNoTruncate());
+    return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue());
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector conn = getConnector();
+    final String table = getUniqueNames(1)[0];
+    conn.tableOperations().create(table);
+    String tableId = conn.tableOperations().tableIdMap().get(table);
+
+    log.info("Created {} with id {}", table, tableId);
+
+    // Add a record to the table
+    update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes()));
+
+    // Get the WAL reference used by the table we just added the update to
+    Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME);
+
+    log.info("Log reference in metadata table {} {}", logRef.getKey().toStringNoTruncate(), logRef.getValue());
+
+    // Flush the record to disk
+    conn.tableOperations().flush(table, null, null, true);
+
+    Range range = Range.prefix(tableId);
+    log.info("Fetching WAL references over " + table);
+    assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, range)));
+
+    // Grant permission to the admin user to write to the Metadata table
+    conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+
+    // Add the wal record back to the metadata table
+    update(conn, MetadataTable.NAME, logRef);
+
+    // Assert that we can get the bogus update back out again
+    assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME)));
+
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+    conn.tableOperations().flush(RootTable.NAME, null, null, true);
+
+    ClusterControl control = cluster.getClusterControl();
+    control.stopAllServers(ServerType.TABLET_SERVER);
+    control.startAllServers(ServerType.TABLET_SERVER);
+
+    // Verify that we can read the original record we wrote
+    Scanner s = conn.createScanner(table, Authorizations.EMPTY);
+    int count = 0;
+    for (Entry<Key,Value> e : s) {
+      assertEquals(e.getKey().getRow().toString(), "row");
+      assertEquals(e.getKey().getColumnFamily().toString(), "cf");
+      assertEquals(e.getKey().getColumnQualifier().toString(), "cq");
+      assertEquals(e.getValue().toString(), "value");
+      count++;
+    }
+    assertEquals(1, count);
+
+    // Verify that the bogus log reference we wrote it gone
+    for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) {
+      assertFalse("Unexpected found reference to bogus log entry: " + ref.getKey().toStringNoTruncate() + " " + ref.getValue(), equals(ref, logRef));
+    }
+  }
+
+  private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception {
+    Key k = logRef.getKey();
+    update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue());
+  }
+
+  private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception {
+    return getLogRefs(conn, table, new Range());
+  }
+
+  private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception {
+    Scanner s = conn.createScanner(table, Authorizations.EMPTY);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+    s.setRange(r);
+    return s;
+  }
+
+  private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception {
+    return getLogRefs(conn, table).iterator().next();
+  }
+
+  private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception {
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    Mutation m = new Mutation(row);
+    m.put(cf, cq, value);
+    bw.addMutation(m);
+    bw.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 6618a65..f5c211c 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -350,7 +350,7 @@ public class ShellServerIT extends SharedMiniClusterIT {
     ts.exec("config -t " + table2 + " -np", true, "345M", true);
     ts.exec("getsplits -t " + table2, true, "row5", true);
     ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true);
-    ts.exec("online " + table, true);
+    ts.exec("onlinetable " + table, true);
     ts.exec("deletetable -f " + table, true);
     ts.exec("deletetable -f " + table2, true);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
deleted file mode 100644
index 03d783c..0000000
--- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Test;
-
-import com.google.common.collect.Iterators;
-
-// When reviewing the changes for ACCUMULO-3423, kturner suggested
-// "tablets will now have log references that contain no data,
-// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
-// It would be useful to have an IT that will test this situation.
-public class UnusedWALIT extends ConfigurableMacIT {
-
-  @Override
-  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    final long logSize = 1024 * 1024 * 10;
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
-    cfg.setNumTservers(1);
-    // use raw local file system so walogs sync and flush will work
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-    hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
-  }
-
-  @Test(timeout = 2 * 60 * 1000)
-  public void test() throws Exception {
-    // don't want this bad boy cleaning up walog entries
-    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
-
-    // make two tables
-    String[] tableNames = getUniqueNames(2);
-    String bigTable = tableNames[0];
-    String lilTable = tableNames[1];
-    Connector c = getConnector();
-    c.tableOperations().create(bigTable);
-    c.tableOperations().create(lilTable);
-
-    // put some data in a log that should be replayed for both tables
-    writeSomeData(c, bigTable, 0, 10, 0, 10);
-    scanSomeData(c, bigTable, 0, 10, 0, 10);
-    writeSomeData(c, lilTable, 0, 1, 0, 1);
-    scanSomeData(c, lilTable, 0, 1, 0, 1);
-    assertEquals(1, getWALCount(c));
-
-    // roll the logs by pushing data into bigTable
-    writeSomeData(c, bigTable, 0, 3000, 0, 1000);
-    assertEquals(2, getWALCount(c));
-
-    // put some data in the latest log
-    writeSomeData(c, lilTable, 1, 10, 0, 10);
-    scanSomeData(c, lilTable, 1, 10, 0, 10);
-
-    // bounce the tserver
-    getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
-    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
-
-    // wait for the metadata table to be online
-    Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
-
-    // check our two sets of data in different logs
-    scanSomeData(c, lilTable, 0, 1, 0, 1);
-    scanSomeData(c, lilTable, 1, 10, 0, 10);
-  }
-
-  private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
-    Scanner s = c.createScanner(table, Authorizations.EMPTY);
-    s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
-    int row = startRow;
-    int col = startCol;
-    for (Entry<Key,Value> entry : s) {
-      assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
-      assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16));
-      if (col == startCol + colCount) {
-        col = startCol;
-        row++;
-        if (row == startRow + rowCount) {
-          break;
-        }
-      }
-    }
-    assertEquals(row, startRow + rowCount);
-  }
-
-  private int getWALCount(Connector c) throws Exception {
-    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    try {
-      return Iterators.size(s.iterator());
-    } finally {
-      s.close();
-    }
-  }
-
-  private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
-    BatchWriterConfig config = new BatchWriterConfig();
-    config.setMaxMemory(10 * 1024 * 1024);
-    BatchWriter bw = conn.createBatchWriter(table, config);
-    for (int r = startRow; r < startRow + rowCount; r++) {
-      Mutation m = new Mutation(Integer.toHexString(r));
-      for (int c = startCol; c < startCol + colCount; c++) {
-        m.put("", Integer.toHexString(c), "");
-      }
-      bw.addMutation(m);
-    }
-    bw.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 2b24219..d9b9429 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -103,7 +103,6 @@ public class VolumeIT extends ConfigurableMacIT {
     cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
     cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
     cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -426,21 +425,6 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
-    Text path = new Text();
-    for (String table : new String[] {RootTable.NAME, MetadataTable.NAME}) {
-      Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
-      meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      outer: for (Entry<Key,Value> entry : meta) {
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        for (int i = 0; i < paths.length; i++) {
-          if (path.toString().startsWith(paths[i].toString())) {
-            continue outer;
-          }
-        }
-        Assert.fail("Unexpected volume " + path);
-      }
-    }
-
     // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
     // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
 
@@ -451,7 +435,6 @@ public class VolumeIT extends ConfigurableMacIT {
     }
 
     Assert.assertEquals(200, sum);
-
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 75fd4e1..5c3694a 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -57,7 +57,6 @@ import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -73,11 +72,9 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.harness.AccumuloClusterIT;
 import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.TestIngest;
 import org.apache.accumulo.test.TestMultiTableIngest;
 import org.apache.accumulo.test.VerifyIngest;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
@@ -88,11 +85,6 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Iterators;
 
 public class ReadWriteIT extends AccumuloClusterIT {
-  @Override
-  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
-  }
-
   private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class);
 
   static final int ROWS = 200000;


Mime
View raw message