accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [4/5] accumulo git commit: ACCUMULO-3423 optimize WAL metadata table updates
Date Fri, 24 Apr 2015 23:20:30 GMT
ACCUMULO-3423 optimize WAL metadata table updates


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fdd29f5
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fdd29f5
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fdd29f5

Branch: refs/heads/1.7
Commit: 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87
Parents: ea25e98
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri Apr 24 18:15:05 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Apr 24 18:18:56 2015 -0400

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  |   4 +-
 .../org/apache/accumulo/core/conf/Property.java |   4 +-
 .../accumulo/core/metadata/RootTable.java       |   1 +
 .../core/metadata/schema/MetadataSchema.java    |  48 ++
 .../core/tabletserver/log/LogEntry.java         |  78 ++-
 .../core/metadata/MetadataTableSchemaTest.java  |  47 ++
 .../org/apache/accumulo/server/TabletLevel.java |  34 ++
 .../apache/accumulo/server/fs/VolumeUtil.java   |  22 +-
 .../apache/accumulo/server/init/Initialize.java |   1 +
 .../server/master/state/MetaDataStateStore.java |  47 +-
 .../master/state/MetaDataTableScanner.java      |   6 +-
 .../master/state/TabletLocationState.java       |   7 +
 .../server/master/state/TabletStateStore.java   |  16 +-
 .../master/state/ZooTabletStateStore.java       |  35 +-
 .../accumulo/server/replication/StatusUtil.java |  13 +
 .../accumulo/server/util/ListVolumesUsed.java   |  18 +-
 .../server/util/MasterMetadataUtil.java         |  18 +-
 .../accumulo/server/util/MetadataTableUtil.java | 239 +++++---
 .../server/util/ReplicationTableUtil.java       |  13 +-
 .../server/util/ReplicationTableUtilTest.java   |   2 +-
 .../gc/GarbageCollectWriteAheadLogs.java        | 499 +++++++---------
 .../accumulo/gc/SimpleGarbageCollector.java     |   1 -
 .../CloseWriteAheadLogReferences.java           |  23 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 567 -------------------
 .../CloseWriteAheadLogReferencesTest.java       | 151 +----
 .../java/org/apache/accumulo/master/Master.java |   3 +
 .../master/MasterClientServiceHandler.java      |   3 +-
 .../accumulo/master/TabletGroupWatcher.java     |  37 +-
 .../accumulo/master/replication/WorkMaker.java  |   1 +
 .../accumulo/master/state/MergeStats.java       |   3 +-
 .../master/ReplicationOperationsImplTest.java   |   9 +-
 .../apache/accumulo/master/TestMergeState.java  |   2 +-
 .../master/state/RootTabletStateStoreTest.java  |   4 +-
 .../src/main/findbugs/exclude-filter.xml        |   2 +-
 .../server/GarbageCollectionLogger.java         |   3 +-
 .../apache/accumulo/tserver/TabletServer.java   | 182 +++---
 .../apache/accumulo/tserver/log/DfsLogger.java  |  14 +-
 .../accumulo/tserver/log/SortedLogRecovery.java |   8 +-
 .../tserver/log/TabletServerLogger.java         | 187 +++---
 .../accumulo/tserver/tablet/CommitSession.java  |   3 +-
 .../tserver/tablet/DatafileManager.java         |   4 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  59 +-
 .../tserver/tablet/TabletCommitter.java         |   3 +-
 .../accumulo/tserver/log/LogEntryTest.java      |  56 ++
 .../test/performance/thrift/NullTserver.java    |   6 +-
 .../accumulo/proxy/ProxyDurabilityIT.java       |   9 +-
 .../test/BadDeleteMarkersCreatedIT.java         |   2 +-
 .../org/apache/accumulo/test/BalanceIT.java     |  20 +-
 .../org/apache/accumulo/test/CleanWalIT.java    |   1 +
 .../accumulo/test/ConditionalWriterIT.java      |   1 +
 .../accumulo/test/GarbageCollectWALIT.java      |  81 +++
 .../MissingWalHeaderCompletesRecoveryIT.java    |  14 +-
 .../accumulo/test/NoMutationRecoveryIT.java     | 178 ------
 .../org/apache/accumulo/test/ShellServerIT.java |   2 +-
 .../org/apache/accumulo/test/UnusedWALIT.java   | 144 +++++
 .../java/org/apache/accumulo/test/VolumeIT.java |  17 +
 .../accumulo/test/functional/ReadWriteIT.java   |   8 +
 .../accumulo/test/functional/WALSunnyDayIT.java | 250 ++++++++
 .../test/functional/WatchTheWatchCountIT.java   |   2 +-
 .../test/performance/RollWALPerformanceIT.java  | 126 +++++
 ...bageCollectorCommunicatesWithTServersIT.java |  35 +-
 .../replication/MultiInstanceReplicationIT.java |   2 +-
 .../test/replication/ReplicationIT.java         | 370 ++++--------
 63 files changed, 1857 insertions(+), 1888 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 6a5c74a..925877d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -153,9 +153,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
     try {
       for (Entry<Key,Value> entry : metaBs) {
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-        for (String log : logEntry.logSet) {
-          wals.add(new Path(log).toString());
-        }
+        wals.add(new Path(logEntry.filename).toString());
       }
     } finally {
       metaBs.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 429abad..a5bef0a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -345,8 +345,8 @@ public enum Property {
       + "no longer in use are removed from the filesystem."),
   GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port for the garbage collector's monitor service"),
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
-  GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
-  GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
+  GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured."),
+  GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting."),
   GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
 
   // properties that are specific to the monitor server behavior

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
index 292ba3b..97d73d1 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java
@@ -41,6 +41,7 @@ public class RootTable {
   public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET + "/future_location";
   public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET + "/lastlocation";
   public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
+  public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
   public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
 
   public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null, null);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 6baae17..c787d6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,11 +16,14 @@
  */
 package org.apache.accumulo.core.metadata.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.schema.Section;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.hadoop.io.Text;
@@ -278,4 +281,49 @@ public class MetadataSchema {
       buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
     }
   }
+
+  /**
+   * Holds references to the WALs in use in a live Tablet Server.
+   * <p>
+   * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL  [] -></code>
+   */
+  public static class CurrentLogsSection {
+    private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
+    private static byte LEFT_BRACKET = (byte)'[';
+    public static final Text COLF = new Text("log");
+    public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
+
+    public static Range getRange() {
+      return section.getRange();
+    }
+
+    public static String getRowPrefix() {
+      return section.getRowPrefix();
+    }
+
+    public static void getTabletServer(Key k, Text hostPort, Text session) {
+      Preconditions.checkNotNull(k);
+      Preconditions.checkNotNull(hostPort);
+      Preconditions.checkNotNull(session);
+
+      Text row = new Text();
+      k.getRow(row);
+      if (!row.toString().startsWith(section.getRowPrefix())) {
+        throw new IllegalArgumentException("Bad key " + k.toString());
+      }
+      for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
+        if (row.charAt(sessionStart) == LEFT_BRACKET) {
+          hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
+          session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
+          return;
+        }
+      }
+      throw new IllegalArgumentException("Bad key " + k.toString());
+    }
+
+    public static void getPath(Key k, Text path) {
+      k.getColumnQualifier(path);
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
index 7fe61d1..ab70bb0 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java
@@ -16,10 +16,10 @@
  */
 package org.apache.accumulo.core.tabletserver.log;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
@@ -29,30 +29,29 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 
-import com.google.common.base.Joiner;
-
 public class LogEntry {
-  public KeyExtent extent;
-  public long timestamp;
-  public String server;
-  public String filename;
-  public int tabletId;
-  public Collection<String> logSet;
-
-  public LogEntry() {}
+  public final KeyExtent extent;
+  public final long timestamp;
+  public final String server;
+  public final String filename;
 
   public LogEntry(LogEntry le) {
     this.extent = le.extent;
     this.timestamp = le.timestamp;
     this.server = le.server;
     this.filename = le.filename;
-    this.tabletId = le.tabletId;
-    this.logSet = new ArrayList<String>(le.logSet);
+  }
+
+  public LogEntry(KeyExtent extent, long timestamp, String server, String filename) {
+    this.extent = extent;
+    this.timestamp = timestamp;
+    this.server = server;
+    this.filename = filename;
   }
 
   @Override
   public String toString() {
-    return extent.toString() + " " + filename + " (" + tabletId + ")";
+    return extent.toString() + " " + filename;
   }
 
   public String getName() {
@@ -65,43 +64,35 @@ public class LogEntry {
     out.writeLong(timestamp);
     out.writeUTF(server);
     out.writeUTF(filename);
-    out.write(tabletId);
-    out.write(logSet.size());
-    for (String s : logSet) {
-      out.writeUTF(s);
-    }
     return Arrays.copyOf(out.getData(), out.getLength());
   }
 
-  public void fromBytes(byte bytes[]) throws IOException {
+  static public LogEntry fromBytes(byte bytes[]) throws IOException {
     DataInputBuffer inp = new DataInputBuffer();
     inp.reset(bytes, bytes.length);
-    extent = new KeyExtent();
+    KeyExtent extent = new KeyExtent();
     extent.readFields(inp);
-    timestamp = inp.readLong();
-    server = inp.readUTF();
-    filename = inp.readUTF();
-    tabletId = inp.read();
-    int count = inp.read();
-    ArrayList<String> logSet = new ArrayList<String>(count);
-    for (int i = 0; i < count; i++)
-      logSet.add(inp.readUTF());
-    this.logSet = logSet;
+    long timestamp = inp.readLong();
+    String server = inp.readUTF();
+    String filename = inp.readUTF();
+    return new LogEntry(extent, timestamp, server, filename);
   }
 
   static private final Text EMPTY_TEXT = new Text();
 
   public static LogEntry fromKeyValue(Key key, Value value) {
-    LogEntry result = new LogEntry();
-    result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
+    String qualifier = key.getColumnQualifier().toString();
+    if (qualifier.indexOf('/') < 1) {
+      throw new IllegalArgumentException("Bad key for log entry: " + key);
+    }
+    KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
     String[] parts = key.getColumnQualifier().toString().split("/", 2);
-    result.server = parts[0];
-    result.filename = parts[1];
-    parts = value.toString().split("\\|");
-    result.tabletId = Integer.parseInt(parts[1]);
-    result.logSet = Arrays.asList(parts[0].split(";"));
-    result.timestamp = key.getTimestamp();
-    return result;
+    String server = parts[0];
+    // handle old-style log entries that specify log sets
+    parts = value.toString().split("\\|")[0].split(";");
+    String filename = parts[parts.length - 1];
+    long timestamp = key.getTimestamp();
+    return new LogEntry(extent, timestamp, server, filename);
   }
 
   public Text getRow() {
@@ -112,11 +103,16 @@ public class LogEntry {
     return MetadataSchema.TabletsSection.LogColumnFamily.NAME;
   }
 
+  public String getUniqueID() {
+    String parts[] = filename.split("/");
+    return parts[parts.length - 1];
+  }
+
   public Text getColumnQualifier() {
     return new Text(server + "/" + filename);
   }
 
   public Value getValue() {
-    return new Value((Joiner.on(";").join(logSet) + "|" + tabletId).getBytes());
+    return new Value(filename.getBytes(UTF_8));
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
new file mode 100644
index 0000000..cfe59f2
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.metadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class MetadataTableSchemaTest {
+
+  @Test
+  public void testGetTabletServer() throws Exception {
+    Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+    Text hostPort = new Text();
+    Text session = new Text();
+    CurrentLogsSection.getTabletServer(key, hostPort, session);
+    assertEquals("host:43861", hostPort.toString());
+    assertEquals("14a7df0e6420003", session.toString());
+    try {
+      Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+      CurrentLogsSection.getTabletServer(bogus, hostPort, session);
+      fail("bad argument not thrown");
+    } catch (IllegalArgumentException ex) {
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
new file mode 100644
index 0000000..91e5ee9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public enum TabletLevel {
+  ROOT,
+  META,
+  NORMAL;
+
+  public static TabletLevel getLevel(KeyExtent extent) {
+    if (!extent.isMeta())
+      return NORMAL;
+    if (extent.isRootTablet())
+      return ROOT;
+    return META;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index c3595cd..4722e60 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -128,15 +128,12 @@ public class VolumeUtil {
       switchedPath = le.filename;
 
     ArrayList<String> switchedLogs = new ArrayList<String>();
-    for (String log : le.logSet) {
-      String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
-      if (switchedLog != null) {
-        switchedLogs.add(switchedLog);
-        numSwitched++;
-      } else {
-        switchedLogs.add(log);
-      }
-
+    String switchedLog = switchVolume(le.filename, FileType.WAL, replacements);
+    if (switchedLog != null) {
+      switchedLogs.add(switchedLog);
+      numSwitched++;
+    } else {
+      switchedLogs.add(le.filename);
     }
 
     if (numSwitched == 0) {
@@ -144,9 +141,7 @@ public class VolumeUtil {
       return null;
     }
 
-    LogEntry newLogEntry = new LogEntry(le);
-    newLogEntry.filename = switchedPath;
-    newLogEntry.logSet = switchedLogs;
+    LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server, switchedPath);
 
     log.trace("Switched " + le + " to " + newLogEntry);
 
@@ -244,7 +239,7 @@ public class VolumeUtil {
         log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
         // Before deleting these logs, we need to mark them for replication
         for (LogEntry logEntry : logsToRemove) {
-          ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet, status);
+          ReplicationTableUtil.updateFiles(context, extent, logEntry.filename, status);
         }
       }
     }
@@ -253,7 +248,6 @@ public class VolumeUtil {
 
     // method this should return the exact strings that are in the metadata table
     return ret;
-
   }
 
   private static String decommisionedTabletDir(AccumuloServerContext context, ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index c6f1dd8..9afb93f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -533,6 +533,7 @@ public class Initialize implements KeywordExecutable {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 7ee6f0c..c154bd0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -17,6 +17,9 @@
 package org.apache.accumulo.server.master.state;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -27,9 +30,14 @@ import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 public class MetaDataStateStore extends TabletStateStore {
+  private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
 
   private static final int THREADS = 4;
   private static final int LATENCY = 1000;
@@ -59,7 +67,7 @@ public class MetaDataStateStore extends TabletStateStore {
 
   @Override
   public ClosableIterator<TabletLocationState> iterator() {
-    return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state);
+    return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state, targetTableName);
   }
 
   @Override
@@ -116,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore {
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
 
     BatchWriter writer = createBatchWriter();
     try {
@@ -124,6 +132,15 @@ public class MetaDataStateStore extends TabletStateStore {
         Mutation m = new Mutation(tls.extent.getMetadataEntry());
         if (tls.current != null) {
           tls.current.clearLocation(m);
+          if (logsForDeadServers != null) {
+            List<Path> logs = logsForDeadServers.get(tls.current);
+            if (logs != null) {
+              for (Path log : logs) {
+                LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
+                m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+              }
+            }
+          }
         }
         if (tls.future != null) {
           tls.future.clearFutureLocation(m);
@@ -145,4 +162,30 @@ public class MetaDataStateStore extends TabletStateStore {
   public String name() {
     return "Normal Tablets";
   }
+
+  @Override
+  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
+    BatchWriter writer = createBatchWriter();
+    try {
+      for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
+        if (entry.getValue().isEmpty()) {
+          continue;
+        }
+        Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
+        for (Path log : entry.getValue()) {
+          m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
+        }
+        writer.addMutation(m);
+      }
+    } catch (Exception ex) {
+      log.error("Error marking logs as unused: " + logs);
+      throw new DistributedStoreException(ex);
+    } finally {
+      try {
+        writer.close();
+      } catch (MutationsRejectedException e) {
+        throw new DistributedStoreException(e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index d64c108..bec2dc4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -141,6 +141,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
     boolean chopped = false;
 
     for (Entry<Key,Value> entry : decodedRow.entrySet()) {
+
       Key key = entry.getKey();
       Text row = key.getRow();
       Text cf = key.getColumnFamily();
@@ -173,8 +174,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
       }
     }
     if (extent == null) {
-      log.warn("No prev-row for key extent: " + decodedRow);
-      return null;
+      String msg = "No prev-row for key extent " + decodedRow;
+      log.error(msg);
+      throw new BadLocationStateException(msg, k.getRow());
     }
     return new TabletLocationState(extent, future, current, last, walogs, chopped);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index fb30440..8116ecf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -68,6 +68,13 @@ public class TabletLocationState {
   final public Collection<Collection<String>> walogs;
   final public boolean chopped;
 
+  public TServerInstance futureOrCurrent() {
+    if (current != null) {
+      return current;
+    }
+    return future;
+  }
+
   @Override
   public String toString() {
     return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 5413e31..acc10d8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -18,8 +18,11 @@ package org.apache.accumulo.server.master.state;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Interface for storing information about tablet assignments. There are three implementations:
@@ -56,10 +59,12 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
    *
    * @param tablets
    *          the tablets' current information
+   * @param logsForDeadServers
+   *          a cache of logs in use by servers when they died
    */
-  abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+  abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException;
 
-  public static void unassign(AccumuloServerContext context, TabletLocationState tls) throws DistributedStoreException {
+  public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
     TabletStateStore store;
     if (tls.extent.isRootTablet()) {
       store = new ZooTabletStateStore();
@@ -68,7 +73,7 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
     } else {
       store = new MetaDataStateStore(context);
     }
-    store.unassign(Collections.singletonList(tls));
+    store.unassign(Collections.singletonList(tls), logsForDeadServers);
   }
 
   public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException {
@@ -83,4 +88,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
     store.setLocations(Collections.singletonList(assignment));
   }
 
+  /**
+   * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
+   */
+  abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<Path>> logs) throws DistributedStoreException;
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index ab99396..bce20fd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -21,12 +21,17 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,10 +90,9 @@ public class ZooTabletStateStore extends TabletStateStore {
           for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
             byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
             if (logInfo != null) {
-              LogEntry logEntry = new LogEntry();
-              logEntry.fromBytes(logInfo);
-              logs.add(logEntry.logSet);
-              log.debug("root tablet logSet " + logEntry.logSet);
+              LogEntry logEntry = LogEntry.fromBytes(logInfo);
+              logs.add(Collections.singleton(logEntry.filename));
+              log.debug("root tablet log " + logEntry.filename);
             }
           }
           TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
@@ -161,12 +165,28 @@ public class ZooTabletStateStore extends TabletStateStore {
   }
 
   @Override
-  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+  public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException {
     if (tablets.size() != 1)
       throw new IllegalArgumentException("There is only one root tablet");
     TabletLocationState tls = tablets.iterator().next();
     if (tls.extent.compareTo(RootTable.EXTENT) != 0)
       throw new IllegalArgumentException("You can only store the root tablet location");
+    if (logsForDeadServers != null) {
+      List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
+      if (logs != null) {
+        for (Path entry : logs) {
+          LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString());
+          byte[] value;
+          try {
+            value = logEntry.toBytes();
+          } catch (IOException ex) {
+            throw new DistributedStoreException(ex);
+          }
+          store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
+          store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString() + logEntry.getUniqueID());
+        }
+      }
+    }
     store.remove(RootTable.ZROOT_TABLET_LOCATION);
     store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
     log.debug("unassign root tablet location");
@@ -177,4 +197,9 @@ public class ZooTabletStateStore extends TabletStateStore {
     return "Root Table";
   }
 
+  @Override
+  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
+    // the root table is not replicated, so unassigning the root tablet has removed the current log marker
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
index 898e3d4..d72eea2 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java
@@ -153,6 +153,19 @@ public class StatusUtil {
   /**
    * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
    */
+  public static Status openWithUnknownLength(long timeCreated) {
+    Builder builder = Status.newBuilder();
+    builder.setBegin(0);
+    builder.setEnd(0);
+    builder.setInfiniteEnd(true);
+    builder.setClosed(false);
+    builder.setCreatedTime(timeCreated);
+    return builder.build();
+  }
+
+  /**
+   * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+   */
   public static Status openWithUnknownLength() {
     return INF_END_REPLICATION_STATUS;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index e90d1dd..9e3fc7d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 
 /**
  *
@@ -61,9 +62,6 @@ public class ListVolumesUsed {
 
   private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) {
     volumes.add(getLogURI(logEntry.filename));
-    for (String logSet : logEntry.logSet) {
-      volumes.add(getLogURI(logSet));
-    }
   }
 
   private static void listZookeeper() throws Exception {
@@ -123,6 +121,20 @@ public class ListVolumesUsed {
 
     for (String volume : volumes)
       System.out.println("\tVolume : " + volume);
+
+    volumes.clear();
+    scanner.clearColumns();
+    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+    Text path = new Text();
+    for (Entry<Key,Value> entry : scanner) {
+      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+      volumes.add(getLogURI(path.toString()));
+    }
+
+    System.out.println("Listing volumes referenced in " + name + " current logs section");
+
+    for (String volume : volumes)
+      System.out.println("\tVolume : " + volume);
   }
 
   public static void listVolumes(ClientContext context) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 14eba68..4a5650e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -248,35 +248,27 @@ public class MasterMetadataUtil {
       if (unusedWalLogs != null) {
         updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
       }
-
       return;
     }
-
     Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId);
-
     MetadataTableUtil.update(context, zooLock, m, extent);
-
   }
 
   /**
    * Update the data file for the root tablet
    */
-  protected static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+  private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
       Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
     IZooReaderWriter zk = ZooReaderWriter.getInstance();
-    // unusedWalLogs will contain the location/name of each log in a log set
-    // the log set is stored under one of the log names, but not both
-    // find the entry under one of the names and delete it.
     String root = MetadataTableUtil.getZookeeperLogLocation();
-    boolean foundEntry = false;
     for (String entry : unusedWalLogs) {
       String[] parts = entry.split("/");
       String zpath = root + "/" + parts[parts.length - 1];
       while (true) {
         try {
           if (zk.exists(zpath)) {
+            log.debug("Removing WAL reference for root table " + zpath);
             zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
-            foundEntry = true;
           }
           break;
         } catch (KeeperException e) {
@@ -287,16 +279,15 @@ public class MasterMetadataUtil {
         UtilWaitThread.sleep(1000);
       }
     }
-    if (unusedWalLogs.size() > 0 && !foundEntry)
-      log.warn("WALog entry for root tablet did not exist " + unusedWalLogs);
   }
 
+
   /**
    * Create an update that updates a tablet
    *
    * @return A Mutation to update a tablet from the given information
    */
-  protected static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
+  private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time,
       Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
     Mutation m = new Mutation(extent.getMetadataEntry());
 
@@ -324,6 +315,7 @@ public class MasterMetadataUtil {
 
     TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value(Long.toString(flushId).getBytes(UTF_8)));
 
+
     return m;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5e74aac..4470c55 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -61,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -82,10 +81,12 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -121,7 +122,7 @@ public class MetadataTableUtil {
     return metadataTable;
   }
 
-  private synchronized static Writer getRootTable(ClientContext context) {
+  public synchronized static Writer getRootTable(ClientContext context) {
     Credentials credentials = context.getCredentials();
     Writer rootTable = root_tables.get(credentials);
     if (rootTable == null) {
@@ -223,7 +224,7 @@ public class MetadataTableUtil {
 
       // add before removing in case of process death
       for (LogEntry logEntry : logsToAdd)
-        addLogEntry(context, logEntry, zooLock);
+        addRootLogEntry(context, zooLock, logEntry);
 
       removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
     } else {
@@ -248,6 +249,35 @@ public class MetadataTableUtil {
     }
   }
 
+  private static interface ZooOperation {
+    void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
+  }
+
+  private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
+    while (true) {
+      try {
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+        if (zoo.isLockHeld(zooLock.getLockID())) {
+          op.run(zoo);
+        }
+        break;
+      } catch (Exception e) {
+        log.error("Unexpected exception {}", e.getMessage(), e);
+      }
+      UtilWaitThread.sleep(1000);
+    }
+  }
+
+  private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
+    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+      @Override
+      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+        String root = getZookeeperLogLocation();
+        rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+      }
+    });
+  }
+
   public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
     TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
 
@@ -447,34 +477,6 @@ public class MetadataTableUtil {
     return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
   }
 
-  public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
-    if (entry.extent.isRootTablet()) {
-      String root = getZookeeperLogLocation();
-      while (true) {
-        try {
-          IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-          if (zoo.isLockHeld(zooLock.getLockID())) {
-            String[] parts = entry.filename.split("/");
-            String uniqueId = parts[parts.length - 1];
-            zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
-          }
-          break;
-        } catch (KeeperException e) {
-          log.error("{}", e.getMessage(), e);
-        } catch (InterruptedException e) {
-          log.error("{}", e.getMessage(), e);
-        } catch (IOException e) {
-          log.error("{}", e.getMessage(), e);
-        }
-        UtilWaitThread.sleep(1000);
-      }
-    } else {
-      Mutation m = new Mutation(entry.getRow());
-      m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
-      update(context, zooLock, m, entry.extent);
-    }
-  }
-
   public static void setRootTabletDir(String dir) throws IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -565,22 +567,11 @@ public class MetadataTableUtil {
       }
     }
 
-    Collections.sort(result, new Comparator<LogEntry>() {
-      @Override
-      public int compare(LogEntry o1, LogEntry o2) {
-        long diff = o1.timestamp - o2.timestamp;
-        if (diff < 0)
-          return -1;
-        if (diff > 0)
-          return 1;
-        return 0;
-      }
-    });
     log.info("Returning logs " + result + " for extent " + extent);
     return result;
   }
 
-  static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+  static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
     IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     String root = getZookeeperLogLocation();
     // there's a little race between getting the children and fetching
@@ -588,11 +579,10 @@ public class MetadataTableUtil {
     while (true) {
       result.clear();
       for (String child : zoo.getChildren(root)) {
-        LogEntry e = new LogEntry();
         try {
-          e.fromBytes(zoo.getData(root + "/" + child, null));
+          LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
           // upgrade from !0;!0<< -> +r<<
-          e.extent = RootTable.EXTENT;
+          e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
           result.add(e);
         } catch (KeeperException.NoNodeException ex) {
           continue;
@@ -662,28 +652,23 @@ public class MetadataTableUtil {
     return new LogEntryIterator(context);
   }
 
-  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+  public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
     if (extent.isRootTablet()) {
-      for (LogEntry entry : logEntries) {
-        String root = getZookeeperLogLocation();
-        while (true) {
-          try {
-            IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-            if (zoo.isLockHeld(zooLock.getLockID())) {
-              String parts[] = entry.filename.split("/");
-              zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
-            }
-            break;
-          } catch (Exception e) {
-            log.error("{}", e.getMessage(), e);
+      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = getZookeeperLogLocation();
+          for (LogEntry entry : entries) {
+            String path = root + "/" + entry.getUniqueID();
+            log.debug("Removing " + path + " from zookeeper");
+            rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
           }
-          UtilWaitThread.sleep(1000);
         }
-      }
+      });
     } else {
       Mutation m = new Mutation(extent.getMetadataEntry());
-      for (LogEntry entry : logEntries) {
-        m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
+      for (LogEntry entry : entries) {
+        m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
       }
       update(context, zooLock, m, extent);
     }
@@ -1068,4 +1053,130 @@ public class MetadataTableUtil {
     return tabletEntries;
   }
 
+  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
+    log.debug("Adding log entry " + filename);
+    if (level == TabletLevel.ROOT) {
+      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+          String uniqueId = filename.getName();
+          StringBuilder path = new StringBuilder(root);
+          path.append("/");
+          path.append(CurrentLogsSection.getRowPrefix());
+          path.append(tabletSession.toString());
+          path.append(uniqueId);
+          rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+        }
+      });
+    } else {
+      Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+      m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
+      String tableName = MetadataTable.NAME;
+      if (level == TabletLevel.META) {
+        tableName = RootTable.NAME;
+      }
+      BatchWriter bw = null;
+      try {
+        bw = context.getConnector().createBatchWriter(tableName, null);
+        bw.addMutation(m);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (bw != null) {
+          try {
+            bw.close();
+          } catch (Exception e2) {
+            throw new RuntimeException(e2);
+          }
+        }
+      }
+    }
+  }
+
+  private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
+    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+      @Override
+      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+        String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+        String uniqueId = filename.getName();
+        String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+        log.debug("Removing entry " + path + " from zookeeper");
+        rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+      }
+    });
+  }
+
+  public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
+    // There could be a marker at the meta and/or root level, mark them both as unused
+    try {
+      BatchWriter root = null;
+      BatchWriter meta = null;
+      try {
+        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+        for (Path fname : all) {
+          Text tname = new Text(fname.toString());
+          Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+          m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+          root.addMutation(m);
+          log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+          m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+          m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+          meta.addMutation(m);
+          removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+        }
+      } finally {
+        if (root != null) {
+          root.close();
+        }
+        if (meta != null) {
+          meta.close();
+        }
+      }
+    } catch (Exception ex) {
+      throw new AccumuloException(ex);
+    }
+  }
+
+  public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> logsForDeadServers)
+      throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    // already cached
+    if (logsForDeadServers.containsKey(server)) {
+      return;
+    }
+    if (extent.isRootTablet()) {
+      final List<Path> logs = new ArrayList<>();
+      retryZooKeeperUpdate(context, lock, new ZooOperation() {
+        @Override
+        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+          logs.clear();
+          for (String child : rw.getChildren(root)) {
+            logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8)));
+          }
+        }
+      });
+      logsForDeadServers.put(server, logs);
+    } else {
+      // use the correct meta table
+      String table = MetadataTable.NAME;
+      if (extent.isMeta()) {
+        table = RootTable.NAME;
+      }
+      // fetch the current logs in use, and put them in the cache
+      Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
+      scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
+      List<Path> logs = new ArrayList<>();
+      Text path = new Text();
+      for (Entry<Key,Value> entry : scanner) {
+        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+        if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
+          logs.add(new Path(path.toString()));
+        }
+      }
+      logsForDeadServers.put(server, logs);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 8e755a3..c6d5ce4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.server.util;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -176,20 +175,14 @@ public class ReplicationTableUtil {
   /**
    * Write replication ingest entries for each provided file with the given {@link Status}.
    */
-  public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
+  public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
     if (log.isDebugEnabled()) {
-      log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+      log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
     }
     // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
-    if (files.isEmpty()) {
-      return;
-    }
 
     Value v = ProtobufUtil.toValue(stat);
-    for (String file : files) {
-      // TODO Can preclude this addition if the extent is for a table we don't need to replicate
-      update(context, createUpdateMutation(new Path(file), v, extent), extent);
-    }
+    update(context, createUpdateMutation(new Path(file), v, extent), extent);
   }
 
   static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 3983bde..04a83d3 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -91,7 +91,7 @@ public class ReplicationTableUtilTest {
     String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
 
     long createdTime = System.currentTimeMillis();
-    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
+    ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
 
     verify(writer);
 


Mime
View raw message