accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [12/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL metadata table updates"
Date Sun, 10 May 2015 21:05:58 GMT
Revert "ACCUMULO-3423 optimize WAL metadata table updates"

This reverts commit 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87.

Conflicts:
	core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
	server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
	server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
	server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
	server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
	server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
	server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
	server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
	test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
	test/src/test/java/org/apache/accumulo/test/VolumeIT.java
	test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
	test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
	test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/36ca2575
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/36ca2575
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/36ca2575

Branch: refs/heads/1.7
Commit: 36ca257547edd92ae7d39ec06f64a3cf527e038e
Parents: 59913ea
Author: Josh Elser <elserj@apache.org>
Authored: Sat May 9 14:48:33 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Sat May 9 14:48:33 2015 -0400

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  |   4 +-
 .../org/apache/accumulo/core/conf/Property.java |   4 +-
 .../accumulo/core/metadata/RootTable.java       |   1 -
 .../core/metadata/schema/MetadataSchema.java    |  48 --
 .../core/tabletserver/log/LogEntry.java         |  78 +--
 .../core/metadata/MetadataTableSchemaTest.java  |  47 --
 .../org/apache/accumulo/server/TabletLevel.java |  32 --
 .../apache/accumulo/server/fs/VolumeUtil.java   |  22 +-
 .../apache/accumulo/server/init/Initialize.java |   1 -
 .../server/master/state/MetaDataStateStore.java |  47 +-
 .../master/state/MetaDataTableScanner.java      |   6 +-
 .../master/state/TabletLocationState.java       |   7 -
 .../server/master/state/TabletStateStore.java   |  17 +-
 .../master/state/ZooTabletStateStore.java       |  36 +-
 .../accumulo/server/replication/StatusUtil.java |  13 -
 .../accumulo/server/util/ListVolumesUsed.java   |  18 +-
 .../server/util/MasterMetadataUtil.java         |  16 +-
 .../accumulo/server/util/MetadataTableUtil.java | 239 +++-----
 .../server/util/ReplicationTableUtil.java       |  13 +-
 .../server/util/ReplicationTableUtilTest.java   |   2 +-
 .../gc/GarbageCollectWriteAheadLogs.java        | 496 +++++++++-------
 .../accumulo/gc/SimpleGarbageCollector.java     |   1 +
 .../CloseWriteAheadLogReferences.java           |  23 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 567 +++++++++++++++++++
 .../CloseWriteAheadLogReferencesTest.java       | 151 ++++-
 .../java/org/apache/accumulo/master/Master.java |   3 -
 .../master/MasterClientServiceHandler.java      |   3 +-
 .../accumulo/master/TabletGroupWatcher.java     |  30 +-
 .../accumulo/master/replication/WorkMaker.java  |   1 -
 .../accumulo/master/state/MergeStats.java       |   3 +-
 .../master/ReplicationOperationsImplTest.java   |   9 +-
 .../apache/accumulo/master/TestMergeState.java  |   2 +-
 .../master/state/RootTabletStateStoreTest.java  |   4 +-
 .../src/main/findbugs/exclude-filter.xml        |   2 +-
 .../apache/accumulo/tserver/TabletServer.java   | 183 +++---
 .../apache/accumulo/tserver/log/DfsLogger.java  |  14 +-
 .../accumulo/tserver/log/SortedLogRecovery.java |   8 +-
 .../tserver/log/TabletServerLogger.java         | 193 +++----
 .../accumulo/tserver/tablet/CommitSession.java  |   3 +-
 .../tserver/tablet/DatafileManager.java         |   4 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  59 +-
 .../tserver/tablet/TabletCommitter.java         |   3 +-
 .../accumulo/tserver/log/LogEntryTest.java      |  56 --
 .../test/performance/thrift/NullTserver.java    |   6 +-
 .../accumulo/proxy/ProxyDurabilityIT.java       |   9 +-
 .../test/BadDeleteMarkersCreatedIT.java         |   2 +-
 .../org/apache/accumulo/test/BalanceIT.java     |  20 +-
 .../org/apache/accumulo/test/CleanWalIT.java    |   1 -
 .../accumulo/test/ConditionalWriterIT.java      |   1 -
 .../accumulo/test/GarbageCollectWALIT.java      |  81 ---
 .../MissingWalHeaderCompletesRecoveryIT.java    |  14 +-
 .../accumulo/test/NoMutationRecoveryIT.java     | 178 ++++++
 .../org/apache/accumulo/test/ShellServerIT.java |   2 +-
 .../org/apache/accumulo/test/UnusedWALIT.java   | 144 -----
 .../java/org/apache/accumulo/test/VolumeIT.java |  17 -
 .../accumulo/test/functional/ReadWriteIT.java   |   8 -
 .../accumulo/test/functional/WALSunnyDayIT.java | 250 --------
 .../test/functional/WatchTheWatchCountIT.java   |   2 +-
 .../test/performance/RollWALPerformanceIT.java  | 120 ----
 ...bageCollectorCommunicatesWithTServersIT.java |  35 +-
 .../replication/MultiInstanceReplicationIT.java |   2 +-
 .../test/replication/ReplicationIT.java         | 370 ++++++++----
 62 files changed, 1886 insertions(+), 1845 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 925877d..6a5c74a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -153,7 +153,9 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
     try {
       for (Entry<Key,Value> entry : metaBs) {
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-        wals.add(new Path(logEntry.filename).toString());
+        for (String log : logEntry.logSet) {
+          wals.add(new Path(log).toString());
+        }
       }
     } finally {
       metaBs.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 09d462d..b0ade7a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -342,8 +342,8 @@ public enum Property {
       + "no longer in use are removed from the filesystem."),
   GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port for the garbage collector's monitor service"),
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
-  GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured."),
-  GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting."),
+  GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
+  GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
   GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
 
   // properties that are specific to the monitor server behavior

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index 97d73d1..292ba3b 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -41,7 +41,6 @@ public class RootTable {
   public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET + "/future_location";
   public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET + "/lastlocation";
   public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
-  public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
   public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
 
   public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null, null);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index fe75f9e..6baae17 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,14 +16,11 @@
  */
 package org.apache.accumulo.core.metadata.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.schema.Section;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.hadoop.io.Text;
@@ -281,49 +278,4 @@ public class MetadataSchema {
       buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
     }
   }
-
-  /**
-   * Holds references to the WALs in use in a live Tablet Server.
-   * <p>
-   * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL  [] -></code>
-   */
-  public static class CurrentLogsSection {
-    private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
-    private static byte LEFT_BRACKET = (byte) '[';
-    public static final Text COLF = new Text("log");
-    public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
-
-    public static Range getRange() {
-      return section.getRange();
-    }
-
-    public static String getRowPrefix() {
-      return section.getRowPrefix();
-    }
-
-    public static void getTabletServer(Key k, Text hostPort, Text session) {
-      Preconditions.checkNotNull(k);
-      Preconditions.checkNotNull(hostPort);
-      Preconditions.checkNotNull(session);
-
-      Text row = new Text();
-      k.getRow(row);
-      if (!row.toString().startsWith(section.getRowPrefix())) {
-        throw new IllegalArgumentException("Bad key " + k.toString());
-      }
-      for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
-        if (row.charAt(sessionStart) == LEFT_BRACKET) {
-          hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
-          session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
-          return;
-        }
-      }
-      throw new IllegalArgumentException("Bad key " + k.toString());
-    }
-
-    public static void getPath(Key k, Text path) {
-      k.getColumnQualifier(path);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index ab70bb0..7fe61d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -16,10 +16,10 @@
  */
 package org.apache.accumulo.core.tabletserver.log;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -29,29 +29,30 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.base.Joiner;
+
 public class LogEntry {
-  public final KeyExtent extent;
-  public final long timestamp;
-  public final String server;
-  public final String filename;
+  public KeyExtent extent;
+  public long timestamp;
+  public String server;
+  public String filename;
+  public int tabletId;
+  public Collection<String> logSet;
+
+  public LogEntry() {}
 
   public LogEntry(LogEntry le) {
     this.extent = le.extent;
     this.timestamp = le.timestamp;
     this.server = le.server;
     this.filename = le.filename;
-  }
-
-  public LogEntry(KeyExtent extent, long timestamp, String server, String filename) {
-    this.extent = extent;
-    this.timestamp = timestamp;
-    this.server = server;
-    this.filename = filename;
+    this.tabletId = le.tabletId;
+    this.logSet = new ArrayList<String>(le.logSet);
   }
 
   @Override
   public String toString() {
-    return extent.toString() + " " + filename;
+    return extent.toString() + " " + filename + " (" + tabletId + ")";
   }
 
   public String getName() {
@@ -64,35 +65,43 @@ public class LogEntry {
     out.writeLong(timestamp);
     out.writeUTF(server);
     out.writeUTF(filename);
+    out.write(tabletId);
+    out.write(logSet.size());
+    for (String s : logSet) {
+      out.writeUTF(s);
+    }
     return Arrays.copyOf(out.getData(), out.getLength());
   }
 
-  static public LogEntry fromBytes(byte bytes[]) throws IOException {
+  public void fromBytes(byte bytes[]) throws IOException {
     DataInputBuffer inp = new DataInputBuffer();
     inp.reset(bytes, bytes.length);
-    KeyExtent extent = new KeyExtent();
+    extent = new KeyExtent();
     extent.readFields(inp);
-    long timestamp = inp.readLong();
-    String server = inp.readUTF();
-    String filename = inp.readUTF();
-    return new LogEntry(extent, timestamp, server, filename);
+    timestamp = inp.readLong();
+    server = inp.readUTF();
+    filename = inp.readUTF();
+    tabletId = inp.read();
+    int count = inp.read();
+    ArrayList<String> logSet = new ArrayList<String>(count);
+    for (int i = 0; i < count; i++)
+      logSet.add(inp.readUTF());
+    this.logSet = logSet;
   }
 
   static private final Text EMPTY_TEXT = new Text();
 
   public static LogEntry fromKeyValue(Key key, Value value) {
-    String qualifier = key.getColumnQualifier().toString();
-    if (qualifier.indexOf('/') < 1) {
-      throw new IllegalArgumentException("Bad key for log entry: " + key);
-    }
-    KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+    LogEntry result = new LogEntry();
+    result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
     String[] parts = key.getColumnQualifier().toString().split("/", 2);
-    String server = parts[0];
-    // handle old-style log entries that specify log sets
-    parts = value.toString().split("\\|")[0].split(";");
-    String filename = parts[parts.length - 1];
-    long timestamp = key.getTimestamp();
-    return new LogEntry(extent, timestamp, server, filename);
+    result.server = parts[0];
+    result.filename = parts[1];
+    parts = value.toString().split("\\|");
+    result.tabletId = Integer.parseInt(parts[1]);
+    result.logSet = Arrays.asList(parts[0].split(";"));
+    result.timestamp = key.getTimestamp();
+    return result;
   }
 
   public Text getRow() {
@@ -103,16 +112,11 @@ public class LogEntry {
     return MetadataSchema.TabletsSection.LogColumnFamily.NAME;
   }
 
-  public String getUniqueID() {
-    String parts[] = filename.split("/");
-    return parts[parts.length - 1];
-  }
-
   public Text getColumnQualifier() {
     return new Text(server + "/" + filename);
   }
 
   public Value getValue() {
-    return new Value(filename.getBytes(UTF_8));
+    return new Value((Joiner.on(";").join(logSet) + "|" + tabletId).getBytes());
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
deleted file mode 100644
index cfe59f2..0000000
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.metadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class MetadataTableSchemaTest {
-
-  @Test
-  public void testGetTabletServer() throws Exception {
-    Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
-    Text hostPort = new Text();
-    Text session = new Text();
-    CurrentLogsSection.getTabletServer(key, hostPort, session);
-    assertEquals("host:43861", hostPort.toString());
-    assertEquals("14a7df0e6420003", session.toString());
-    try {
-      Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
-      CurrentLogsSection.getTabletServer(bogus, hostPort, session);
-      fail("bad argument not thrown");
-    } catch (IllegalArgumentException ex) {
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
deleted file mode 100644
index 50886ef..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public enum TabletLevel {
-  ROOT, META, NORMAL;
-
-  public static TabletLevel getLevel(KeyExtent extent) {
-    if (!extent.isMeta())
-      return NORMAL;
-    if (extent.isRootTablet())
-      return ROOT;
-    return META;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 4722e60..c3595cd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -128,12 +128,15 @@ public class VolumeUtil {
       switchedPath = le.filename;
 
     ArrayList<String> switchedLogs = new ArrayList<String>();
-    String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
-    if (switchedLog != null) {
-      switchedLogs.add(switchedLog);
-      numSwitched++;
-    } else {
-      switchedLogs.add(le.filename);
+    for (String log : le.logSet) {
+      String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
+      if (switchedLog != null) {
+        switchedLogs.add(switchedLog);
+        numSwitched++;
+      } else {
+        switchedLogs.add(log);
+      }
+
     }
 
     if (numSwitched == 0) {
@@ -141,7 +144,9 @@ public class VolumeUtil {
       return null;
     }
 
-    LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server, switchedPath);
+    LogEntry newLogEntry = new LogEntry(le);
+    newLogEntry.filename = switchedPath;
+    newLogEntry.logSet = switchedLogs;
 
     log.trace("Switched " + le + " to " + newLogEntry);
 
@@ -239,7 +244,7 @@ public class VolumeUtil {
         log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
         // Before deleting these logs, we need to mark them for replication
         for (LogEntry logEntry : logsToRemove) {
-          ReplicationTableUtil.updateFiles(context, extent, logEntry.filename, status);
+          ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet, status);
         }
       }
     }
@@ -248,6 +253,7 @@ public class VolumeUtil {
 
     // method this should return the exact strings that are in the metadata table
     return ret;
+
   }
 
   private static String decommisionedTabletDir(AccumuloServerContext context, ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9afb93f..c6f1dd8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -533,7 +533,6 @@ public class Initialize implements KeywordExecutable {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 600349b..7ee6f0c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -17,9 +17,6 @@
 package org.apache.accumulo.server.master.state;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -30,14 +27,9 @@ import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
 
 public class MetaDataStateStore extends TabletStateStore {
-  private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
 
   private static final int THREADS = 4;
   private static final int LATENCY = 1000;
@@ -67,7 +59,7 @@ public class MetaDataStateStore extends TabletStateStore {
 
   @Override
   public ClosableIterator<TabletLocationState> iterator() {
-    return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state, targetTableName);
+    return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state);
   }
 
   @Override
@@ -124,7 +116,7 @@ public class MetaDataStateStore extends TabletStateStore {
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
 
     BatchWriter writer = createBatchWriter();
     try {
@@ -132,15 +124,6 @@ public class MetaDataStateStore extends TabletStateStore {
         Mutation m = new Mutation(tls.extent.getMetadataEntry());
         if (tls.current != null) {
           tls.current.clearLocation(m);
-          if (logsForDeadServers != null) {
-            List<Path> logs = logsForDeadServers.get(tls.current);
-            if (logs != null) {
-              for (Path log : logs) {
-                LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
-                m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
-              }
-            }
-          }
         }
         if (tls.future != null) {
           tls.future.clearFutureLocation(m);
@@ -162,30 +145,4 @@ public class MetaDataStateStore extends TabletStateStore {
   public String name() {
     return "Normal Tablets";
   }
-
-  @Override
-  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
-      for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
-        if (entry.getValue().isEmpty()) {
-          continue;
-        }
-        Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
-        for (Path log : entry.getValue()) {
-          m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
-        }
-        writer.addMutation(m);
-      }
-    } catch (Exception ex) {
-      log.error("Error marking logs as unused: " + logs);
-      throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index bec2dc4..d64c108 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -141,7 +141,6 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
     boolean chopped = false;
 
     for (Entry<Key,Value> entry : decodedRow.entrySet()) {
-
       Key key = entry.getKey();
       Text row = key.getRow();
       Text cf = key.getColumnFamily();
@@ -174,9 +173,8 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
       }
     }
     if (extent == null) {
-      String msg = "No prev-row for key extent " + decodedRow;
-      log.error(msg);
-      throw new BadLocationStateException(msg, k.getRow());
+      log.warn("No prev-row for key extent: " + decodedRow);
+      return null;
     }
     return new TabletLocationState(extent, future, current, last, walogs, chopped);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index 8116ecf..fb30440 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -68,13 +68,6 @@ public class TabletLocationState {
   final public Collection<Collection<String>> walogs;
   final public boolean chopped;
 
-  public TServerInstance futureOrCurrent() {
-    if (current != null) {
-      return current;
-    }
-    return future;
-  }
-
   @Override
   public String toString() {
     return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 147e071..5413e31 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -18,11 +18,8 @@ package org.apache.accumulo.server.master.state;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.hadoop.fs.Path;
 
 /**
  * Interface for storing information about tablet assignments. There are three implementations:
@@ -59,13 +56,10 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
    *
    * @param tablets
    *          the tablets' current information
-   * @param logsForDeadServers
-   *          a cache of logs in use by servers when they died
    */
-  abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException;
+  abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
 
-  public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance,List<Path>> logsForDeadServers)
-      throws DistributedStoreException {
+  public static void unassign(AccumuloServerContext context, TabletLocationState tls) throws DistributedStoreException {
     TabletStateStore store;
     if (tls.extent.isRootTablet()) {
       store = new ZooTabletStateStore();
@@ -74,7 +68,7 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
     } else {
       store = new MetaDataStateStore(context);
     }
-    store.unassign(Collections.singletonList(tls), logsForDeadServers);
+    store.unassign(Collections.singletonList(tls));
   }
 
   public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException {
@@ -89,9 +83,4 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
     store.setLocations(Collections.singletonList(assignment));
   }
 
-  /**
-   * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
-   */
-  abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException;
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 03627e3..ab99396 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -21,17 +21,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,9 +85,10 @@ public class ZooTabletStateStore extends TabletStateStore {
           for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
             byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
             if (logInfo != null) {
-              LogEntry logEntry = LogEntry.fromBytes(logInfo);
-              logs.add(Collections.singleton(logEntry.filename));
-              log.debug("root tablet log " + logEntry.filename);
+              LogEntry logEntry = new LogEntry();
+              logEntry.fromBytes(logInfo);
+              logs.add(logEntry.logSet);
+              log.debug("root tablet logSet " + logEntry.logSet);
             }
           }
           TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
@@ -165,29 +161,12 @@ public class ZooTabletStateStore extends TabletStateStore {
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
     if (tablets.size() != 1)
       throw new IllegalArgumentException("There is only one root tablet");
     TabletLocationState tls = tablets.iterator().next();
     if (tls.extent.compareTo(RootTable.EXTENT) != 0)
       throw new IllegalArgumentException("You can only store the root tablet location");
-    if (logsForDeadServers != null) {
-      List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
-      if (logs != null) {
-        for (Path entry : logs) {
-          LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString());
-          byte[] value;
-          try {
-            value = logEntry.toBytes();
-          } catch (IOException ex) {
-            throw new DistributedStoreException(ex);
-          }
-          store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
-          store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString()
-              + logEntry.getUniqueID());
-        }
-      }
-    }
     store.remove(RootTable.ZROOT_TABLET_LOCATION);
     store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
     log.debug("unassign root tablet location");
@@ -198,9 +177,4 @@ public class ZooTabletStateStore extends TabletStateStore {
     return "Root Table";
   }
 
-  @Override
-  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
-    // the root table is not replicated, so unassigning the root tablet has removed the current log marker
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index ad892b8..e6e3cfd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -153,19 +153,6 @@ public class StatusUtil {
   /**
    * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
    */
-  public static Status openWithUnknownLength(long timeCreated) {
-    Builder builder = Status.newBuilder();
-    builder.setBegin(0);
-    builder.setEnd(0);
-    builder.setInfiniteEnd(true);
-    builder.setClosed(false);
-    builder.setCreatedTime(timeCreated);
-    return builder.build();
-  }
-
-  /**
-   * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
-   */
   public static Status openWithUnknownLength() {
     return INF_END_REPLICATION_STATUS;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 9e3fc7d..e90d1dd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 
 /**
  *
@@ -62,6 +61,9 @@ public class ListVolumesUsed {
 
   private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
     volumes.add(getLogURI(logEntry.filename));
+    for (String logSet : logEntry.logSet) {
+      volumes.add(getLogURI(logSet));
+    }
   }
 
   private static void listZookeeper() throws Exception {
@@ -121,20 +123,6 @@ public class ListVolumesUsed {
 
     for (String volume : volumes)
       System.out.println("\tVolume : " + volume);
-
-    volumes.clear();
-    scanner.clearColumns();
-    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
-    Text path = new Text();
-    for (Entry<Key,Value> entry : scanner) {
-      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-      volumes.add(getLogURI(path.toString()));
-    }
-
-    System.out.println("Listing volumes referenced in " + name + " current logs section");
-
-    for (String volume : volumes)
-      System.out.println("\tVolume : " + volume);
   }
 
   public static void listVolumes(ClientContext context) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 29745d5..14eba68 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -248,27 +248,35 @@ public class MasterMetadataUtil {
       if (unusedWalLogs != null) {
         updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
       }
+
       return;
     }
+
     Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
+
     MetadataTableUtil.update(context, zooLock, m, extent);
+
   }
 
   /**
    * Update the data file for the root tablet
    */
-  private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+  protected static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
       Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
     IZooReaderWriter zk = ZooReaderWriter.getInstance();
+    // unusedWalLogs will contain the location/name of each log in a log set
+    // the log set is stored under one of the log names, but not both
+    // find the entry under one of the names and delete it.
     String root = MetadataTableUtil.getZookeeperLogLocation();
+    boolean foundEntry = false;
     for (String entry : unusedWalLogs) {
       String[] parts = entry.split("/");
       String zpath = root + "/" + parts[parts.length - 1];
       while (true) {
         try {
           if (zk.exists(zpath)) {
-            log.debug("Removing WAL reference for root table " + zpath);
             zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
+            foundEntry = true;
           }
           break;
         } catch (KeeperException e) {
@@ -279,6 +287,8 @@ public class MasterMetadataUtil {
         UtilWaitThread.sleep(1000);
       }
     }
+    if (unusedWalLogs.size() > 0 && !foundEntry)
+      log.warn("WALog entry for root tablet did not exist " + unusedWalLogs);
   }
 
   /**
@@ -286,7 +296,7 @@ public class MasterMetadataUtil {
    *
    * @return A Mutation to update a tablet from the given information
    */
-  private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+  protected static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
       Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
     Mutation m = new Mutation(extent.getMetadataEntry());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index d517989..5e74aac 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,6 +23,8 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -59,7 +61,6 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -81,12 +82,10 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -122,7 +121,7 @@ public class MetadataTableUtil {
     return metadataTable;
   }
 
-  public synchronized static Writer getRootTable(ClientContext context) {
+  private synchronized static Writer getRootTable(ClientContext context) {
     Credentials credentials = context.getCredentials();
     Writer rootTable = root_tables.get(credentials);
     if (rootTable == null) {
@@ -224,7 +223,7 @@ public class MetadataTableUtil {
 
       // add before removing in case of process death
       for (LogEntry logEntry : logsToAdd)
-        addRootLogEntry(context, zooLock, logEntry);
+        addLogEntry(context, logEntry, zooLock);
 
       removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
     } else {
@@ -249,35 +248,6 @@ public class MetadataTableUtil {
     }
   }
 
-  private static interface ZooOperation {
-    void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
-  }
-
-  private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
-    while (true) {
-      try {
-        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-        if (zoo.isLockHeld(zooLock.getLockID())) {
-          op.run(zoo);
-        }
-        break;
-      } catch (Exception e) {
-        log.error("Unexpected exception {}", e.getMessage(), e);
-      }
-      UtilWaitThread.sleep(1000);
-    }
-  }
-
-  private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
-    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-      @Override
-      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-        String root = getZookeeperLogLocation();
-        rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
-      }
-    });
-  }
-
   public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
     TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
 
@@ -477,6 +447,34 @@ public class MetadataTableUtil {
     return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
   }
 
+  public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
+    if (entry.extent.isRootTablet()) {
+      String root = getZookeeperLogLocation();
+      while (true) {
+        try {
+          IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+          if (zoo.isLockHeld(zooLock.getLockID())) {
+            String[] parts = entry.filename.split("/");
+            String uniqueId = parts[parts.length - 1];
+            zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+          }
+          break;
+        } catch (KeeperException e) {
+          log.error("{}", e.getMessage(), e);
+        } catch (InterruptedException e) {
+          log.error("{}", e.getMessage(), e);
+        } catch (IOException e) {
+          log.error("{}", e.getMessage(), e);
+        }
+        UtilWaitThread.sleep(1000);
+      }
+    } else {
+      Mutation m = new Mutation(entry.getRow());
+      m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+      update(context, zooLock, m, entry.extent);
+    }
+  }
+
   public static void setRootTabletDir(String dir) throws IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -567,11 +565,22 @@ public class MetadataTableUtil {
       }
     }
 
+    Collections.sort(result, new Comparator<LogEntry>() {
+      @Override
+      public int compare(LogEntry o1, LogEntry o2) {
+        long diff = o1.timestamp - o2.timestamp;
+        if (diff < 0)
+          return -1;
+        if (diff > 0)
+          return 1;
+        return 0;
+      }
+    });
     log.info("Returning logs " + result + " for extent " + extent);
     return result;
   }
 
-  static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+  static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String root = getZookeeperLogLocation();
     // there's a little race between getting the children and fetching
@@ -579,10 +588,11 @@ public class MetadataTableUtil {
     while (true) {
       result.clear();
       for (String child : zoo.getChildren(root)) {
+        LogEntry e = new LogEntry();
         try {
-          LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
+          e.fromBytes(zoo.getData(root + "/" + child, null));
           // upgrade from !0;!0<< -> +r<<
-          e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
+          e.extent = RootTable.EXTENT;
           result.add(e);
         } catch (KeeperException.NoNodeException ex) {
           continue;
@@ -652,23 +662,28 @@ public class MetadataTableUtil {
     return new LogEntryIterator(context);
   }
 
-  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
+  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
     if (extent.isRootTablet()) {
-      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-        @Override
-        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-          String root = getZookeeperLogLocation();
-          for (LogEntry entry : entries) {
-            String path = root + "/" + entry.getUniqueID();
-            log.debug("Removing " + path + " from zookeeper");
-            rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+      for (LogEntry entry : logEntries) {
+        String root = getZookeeperLogLocation();
+        while (true) {
+          try {
+            IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+            if (zoo.isLockHeld(zooLock.getLockID())) {
+              String parts[] = entry.filename.split("/");
+              zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
+            }
+            break;
+          } catch (Exception e) {
+            log.error("{}", e.getMessage(), e);
           }
+          UtilWaitThread.sleep(1000);
         }
-      });
+      }
     } else {
       Mutation m = new Mutation(extent.getMetadataEntry());
-      for (LogEntry entry : entries) {
-        m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
+      for (LogEntry entry : logEntries) {
+        m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
       }
       update(context, zooLock, m, extent);
     }
@@ -1053,130 +1068,4 @@ public class MetadataTableUtil {
     return tabletEntries;
   }
 
-  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
-    log.debug("Adding log entry " + filename);
-    if (level == TabletLevel.ROOT) {
-      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-        @Override
-        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-          String uniqueId = filename.getName();
-          StringBuilder path = new StringBuilder(root);
-          path.append("/");
-          path.append(CurrentLogsSection.getRowPrefix());
-          path.append(tabletSession.toString());
-          path.append(uniqueId);
-          rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
-        }
-      });
-    } else {
-      Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-      m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
-      String tableName = MetadataTable.NAME;
-      if (level == TabletLevel.META) {
-        tableName = RootTable.NAME;
-      }
-      BatchWriter bw = null;
-      try {
-        bw = context.getConnector().createBatchWriter(tableName, null);
-        bw.addMutation(m);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      } finally {
-        if (bw != null) {
-          try {
-            bw.close();
-          } catch (Exception e2) {
-            throw new RuntimeException(e2);
-          }
-        }
-      }
-    }
-  }
-
-  private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
-    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-      @Override
-      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-        String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-        String uniqueId = filename.getName();
-        String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
-        log.debug("Removing entry " + path + " from zookeeper");
-        rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
-      }
-    });
-  }
-
-  public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
-    // There could be a marker at the meta and/or root level, mark them both as unused
-    try {
-      BatchWriter root = null;
-      BatchWriter meta = null;
-      try {
-        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-        for (Path fname : all) {
-          Text tname = new Text(fname.toString());
-          Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-          m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
-          root.addMutation(m);
-          log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
-          m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-          m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
-          meta.addMutation(m);
-          removeCurrentRootLogMarker(context, lock, tabletSession, fname);
-        }
-      } finally {
-        if (root != null) {
-          root.close();
-        }
-        if (meta != null) {
-          meta.close();
-        }
-      }
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-
-  public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server,
-      Map<TServerInstance,List<Path>> logsForDeadServers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    // already cached
-    if (logsForDeadServers.containsKey(server)) {
-      return;
-    }
-    if (extent.isRootTablet()) {
-      final List<Path> logs = new ArrayList<>();
-      retryZooKeeperUpdate(context, lock, new ZooOperation() {
-        @Override
-        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-          logs.clear();
-          for (String child : rw.getChildren(root)) {
-            logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8)));
-          }
-        }
-      });
-      logsForDeadServers.put(server, logs);
-    } else {
-      // use the correct meta table
-      String table = MetadataTable.NAME;
-      if (extent.isMeta()) {
-        table = RootTable.NAME;
-      }
-      // fetch the current logs in use, and put them in the cache
-      Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
-      scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
-      List<Path> logs = new ArrayList<>();
-      Text path = new Text();
-      for (Entry<Key,Value> entry : scanner) {
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
-          logs.add(new Path(path.toString()));
-        }
-      }
-      logsForDeadServers.put(server, logs);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index c6d5ce4..8e755a3 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.util;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -175,14 +176,20 @@ public class ReplicationTableUtil {
   /**
    * Write replication ingest entries for each provided file with the given {@link Status}.
    */
-  public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
+  public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
     if (log.isDebugEnabled()) {
-      log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
+      log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
     }
     // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
+    if (files.isEmpty()) {
+      return;
+    }
 
     Value v = ProtobufUtil.toValue(stat);
-    update(context, createUpdateMutation(new Path(file), v, extent), extent);
+    for (String file : files) {
+      // TODO Can preclude this addition if the extent is for a table we don't need to replicate
+      update(context, createUpdateMutation(new Path(file), v, extent), extent);
+    }
   }
 
   static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 04a83d3..3983bde 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -91,7 +91,7 @@ public class ReplicationTableUtilTest {
     String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
 
     long createdTime = System.currentTimeMillis();
-    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
+    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
 
     verify(writer);
 


Mime
View raw message