accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [4/4] accumulo git commit: ACCUMULO-3423 fix merge, rename WalMarker to WalStateManager
Date Thu, 21 May 2015 16:47:10 GMT
ACCUMULO-3423 fix merge, rename WalMarker to WalStateManager


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47d1a4db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47d1a4db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47d1a4db

Branch: refs/heads/master
Commit: 47d1a4db42788e41274f10467c7ccd11a10a3015
Parents: 08633f0
Author: Eric Newton <eric.newton@gmail.com>
Authored: Thu May 21 12:43:21 2015 -0400
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Thu May 21 12:43:21 2015 -0400

----------------------------------------------------------------------
 .../apache/accumulo/server/init/Initialize.java |   4 +-
 .../apache/accumulo/server/log/WalMarker.java   | 217 -------------------
 .../accumulo/server/log/WalStateManager.java    | 217 +++++++++++++++++++
 .../accumulo/server/util/ListVolumesUsed.java   |   4 +-
 .../gc/GarbageCollectWriteAheadLogs.java        |  12 +-
 .../CloseWriteAheadLogReferences.java           |   8 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  14 +-
 .../accumulo/master/TabletGroupWatcher.java     |   4 +-
 .../apache/accumulo/tserver/TabletServer.java   |   8 +-
 .../org/apache/accumulo/test/UnusedWALIT.java   |   4 +-
 .../java/org/apache/accumulo/test/VolumeIT.java |   6 +-
 .../accumulo/test/functional/WALSunnyDayIT.java |   6 +-
 ...bageCollectorCommunicatesWithTServersIT.java |   6 +-
 .../test/replication/ReplicationIT.java         |  10 +-
 14 files changed, 260 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 8564b87..80c5879 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -88,7 +88,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -550,7 +550,7 @@ public class Initialize implements KeywordExecutable {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + WalMarker.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + WalStateManager.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
   }
 
   private String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException
{

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
deleted file mode 100644
index 9cfd99f..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.log;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.Path;
-import org.apache.zookeeper.KeeperException;
-
-/*
- * WAL Markers.  This class governs the space in Zookeeper that advertises the status of
Write-Ahead Logs
- * in use by tablet servers and the replication machinery.
- *
- * The Master needs to know the state of the WALs to mark tablets during recovery.
- * The GC needs to know when a log is no longer needed so it can be removed.
- * The replication mechanism needs to know when a log is closed and can be forwarded to the
destination table.
- *
- * The state of the WALs is kept in Zookeeper under <root>/wals.
- * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
- * Under the server znode, is a node for each log, using the UUID for the log.
- * In each of the WAL znodes, is the current state of the log, and the full path to the log.
- *
- * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state
of the file.
- *
- * In the event of a recovery, the log is identified as belonging to a dead server.  The
master will update
- * the tablets assigned to that server with log references. Once all tablets have been reassigned
and the log
- * references are removed, the log will be eligible for deletion.
- *
- * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still
need the log.
- * The GC will defer log removal until replication is finished with it.
- *
- */
-public class WalMarker {
-
-  public class WalMarkerException extends Exception {
-    static private final long serialVersionUID = 1L;
-
-    public WalMarkerException(Exception ex) {
-      super(ex);
-    }
-  }
-
-  public final static String ZWALS = "/wals";
-
-  public static enum WalState {
-    /* log is open, and may be written to */
-    OPEN,
-    /* log is closed, and will not be written to again */
-    CLOSED,
-    /* unreferenced: no tablet needs the log for recovery */
-    UNREFERENCED
-  }
-
-  private final Instance instance;
-  private final ZooReaderWriter zoo;
-
-  public WalMarker(Instance instance, ZooReaderWriter zoo) {
-    this.instance = instance;
-    this.zoo = zoo;
-  }
-
-  private String root() {
-    return ZooUtil.getRoot(instance) + ZWALS;
-  }
-
-  // Tablet server exists
-  public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
-    byte[] data = new byte[0];
-    try {
-      zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // Tablet server opens a new WAL
-  public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
-    updateState(tsi, path, WalState.OPEN);
-  }
-
-  private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException
{
-    byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
-    try {
-      NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
-      if (state == WalState.OPEN) {
-        policy = NodeExistsPolicy.FAIL;
-      }
-      zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // Tablet server has no references to the WAL
-  public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
-    updateState(tsi, path, WalState.UNREFERENCED);
-  }
-
-  private static Pair<WalState,Path> parse(byte data[]) {
-    String parts[] = new String(data, UTF_8).split(",");
-    return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
-  }
-
-  // Master needs to know the logs for the given instance
-  public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
-    List<Path> result = new ArrayList<>();
-    try {
-      String zpath = root() + "/" + tsi.toString();
-      zoo.sync(zpath);
-      for (String child : zoo.getChildren(zpath)) {
-        Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
-        if (parts.getFirst() != WalState.UNREFERENCED) {
-          result.add(parts.getSecond());
-        }
-      }
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-    return result;
-  }
-
-  // garbage collector wants the list of logs markers for all servers
-  public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException
{
-    Map<TServerInstance,List<UUID>> result = new HashMap<>();
-    try {
-      String path = root();
-      for (String child : zoo.getChildren(path)) {
-        List<UUID> logs = result.get(child);
-        if (logs == null) {
-          result.put(new TServerInstance(child), logs = new ArrayList<>());
-        }
-        for (String idString : zoo.getChildren(path + "/" + child)) {
-          logs.add(UUID.fromString(idString));
-        }
-      }
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-    return result;
-  }
-
-  // garbage collector wants to know the state (open/closed) of a log, and the filename to
delete
-  public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException
{
-    try {
-      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
-      return parse(zoo.getData(path, null));
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // utility combination of getAllMarkers and state
-  public Map<Path,WalState> getAllState() throws WalMarkerException {
-    Map<Path,WalState> result = new HashMap<>();
-    for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet())
{
-      for (UUID id : entry.getValue()) {
-        Pair<WalState,Path> state = state(entry.getKey(), id);
-        result.put(state.getSecond(), state.getFirst());
-      }
-    }
-    return result;
-  }
-
-  // garbage collector knows it's safe to remove the marker for a closed log
-  public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException
{
-    try {
-      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
-      zoo.delete(path, -1);
-    } catch (InterruptedException | KeeperException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // garbage collector knows the instance is dead, and has no markers
-  public void forget(TServerInstance instance) throws WalMarkerException {
-    String path = root() + "/" + instance.toString();
-    try {
-      zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
-    } catch (InterruptedException | KeeperException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // tablet server can mark the log as closed (but still needed), for replication to begin
-  public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
-    updateState(instance, path, WalState.CLOSED);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
new file mode 100644
index 0000000..32f3cbe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs
+ * in use by tablet servers and the replication machinery.
+ *
+ * The Master needs to know the state of the WALs to mark tablets during recovery.
+ * The GC needs to know when a log is no longer needed so it can be removed.
+ * The replication mechanism needs to know when a log is closed and can be forwarded to the
destination table.
+ *
+ * The state of the WALs is kept in Zookeeper under /accumulo/<instanceid>/wals.
+ * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
+ * Under the server znode, is a node for each log, using the UUID for the log.
+ * In each of the WAL znodes, is the current state of the log, and the full path to the log.
+ *
+ * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state
of the file.
+ *
+ * In the event of a recovery, the log is identified as belonging to a dead server.  The
master will update
+ * the tablets assigned to that server with log references. Once all tablets have been reassigned
and the log
+ * references are removed, the log will be eligible for deletion.
+ *
+ * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still
need the log.
+ * The GC will defer log removal until replication is finished with it.
+ *
+ */
+public class WalStateManager {
+
+  public class WalMarkerException extends Exception {
+    static private final long serialVersionUID = 1L;
+
+    public WalMarkerException(Exception ex) {
+      super(ex);
+    }
+  }
+
+  public final static String ZWALS = "/wals";
+
+  public static enum WalState {
+    /* log is open, and may be written to */
+    OPEN,
+    /* log is closed, and will not be written to again */
+    CLOSED,
+    /* unreferenced: no tablet needs the log for recovery */
+    UNREFERENCED
+  }
+
+  private final Instance instance;
+  private final ZooReaderWriter zoo;
+
+  public WalStateManager(Instance instance, ZooReaderWriter zoo) {
+    this.instance = instance;
+    this.zoo = zoo;
+  }
+
+  private String root() {
+    return ZooUtil.getRoot(instance) + ZWALS;
+  }
+
+  // Tablet server exists
+  public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
+    byte[] data = new byte[0];
+    try {
+      zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server opens a new WAL
+  public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.OPEN);
+  }
+
+  private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException
{
+    byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
+    try {
+      NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
+      if (state == WalState.OPEN) {
+        policy = NodeExistsPolicy.FAIL;
+      }
+      zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server has no references to the WAL
+  public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.UNREFERENCED);
+  }
+
+  private static Pair<WalState,Path> parse(byte data[]) {
+    String parts[] = new String(data, UTF_8).split(",");
+    return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
+  }
+
+  // Master needs to know the logs for the given instance
+  public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
+    List<Path> result = new ArrayList<>();
+    try {
+      String zpath = root() + "/" + tsi.toString();
+      zoo.sync(zpath);
+      for (String child : zoo.getChildren(zpath)) {
+        Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
+        if (parts.getFirst() != WalState.UNREFERENCED) {
+          result.add(parts.getSecond());
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants the list of logs markers for all servers
+  public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException
{
+    Map<TServerInstance,List<UUID>> result = new HashMap<>();
+    try {
+      String path = root();
+      for (String child : zoo.getChildren(path)) {
+        List<UUID> logs = result.get(child);
+        if (logs == null) {
+          result.put(new TServerInstance(child), logs = new ArrayList<>());
+        }
+        for (String idString : zoo.getChildren(path + "/" + child)) {
+          logs.add(UUID.fromString(idString));
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants to know the state (open/closed) of a log, and the filename to
delete
+  public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException
{
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      return parse(zoo.getData(path, null));
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // utility combination of getAllMarkers and state
+  public Map<Path,WalState> getAllState() throws WalMarkerException {
+    Map<Path,WalState> result = new HashMap<>();
+    for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet())
{
+      for (UUID id : entry.getValue()) {
+        Pair<WalState,Path> state = state(entry.getKey(), id);
+        result.put(state.getSecond(), state.getFirst());
+      }
+    }
+    return result;
+  }
+
+  // garbage collector knows it's safe to remove the marker for a closed log
+  public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException
{
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      zoo.delete(path, -1);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // garbage collector knows the instance is dead, and has no markers
+  public void forget(TServerInstance instance) throws WalMarkerException {
+    String path = root() + "/" + instance.toString();
+    try {
+      zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // tablet server can mark the log as closed (but still needed), for replication to begin
+  public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
+    updateState(instance, path, WalState.CLOSED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 7cf3f37..9ad461b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -34,7 +34,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 
@@ -125,7 +125,7 @@ public class ListVolumesUsed {
 
     volumes.clear();
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     for (Path path : wals.getAllState().keySet()) {
       volumes.add(getLogURI(path.toString()));
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 414d29e..b8fb9fb 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -45,9 +45,9 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
 import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -72,7 +72,7 @@ public class GarbageCollectWriteAheadLogs {
   private final VolumeManager fs;
   private final boolean useTrash;
   private final LiveTServerSet liveServers;
-  private final WalMarker walMarker;
+  private final WalStateManager walMarker;
   private final Iterable<TabletLocationState> store;
 
   /**
@@ -97,7 +97,7 @@ public class GarbageCollectWriteAheadLogs {
       }
     });
     liveServers.startListeningForTabletServerChanges();
-    this.walMarker = new WalMarker(context.getInstance(), ZooReaderWriter.getInstance());
+    this.walMarker = new WalStateManager(context.getInstance(), ZooReaderWriter.getInstance());
     this.store = new Iterable<TabletLocationState>() {
       @Override
       public Iterator<TabletLocationState> iterator() {
@@ -119,7 +119,7 @@ public class GarbageCollectWriteAheadLogs {
    *          a started LiveTServerSet instance
    */
   @VisibleForTesting
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash,
LiveTServerSet liveTServerSet, WalMarker walMarker,
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash,
LiveTServerSet liveTServerSet, WalStateManager walMarker,
       Iterable<TabletLocationState> store) throws IOException {
     this.context = context;
     this.fs = fs;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 03d2e67..a3652e2 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -47,9 +47,9 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -168,7 +168,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
    * @return The Set of WALs that are referenced in the metadata table
    */
   protected HashSet<String> getReferencedWals(Connector conn) {
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
 
     HashSet<String> referencedWals = new HashSet<>();
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 9cb32c8..1ab8eb6 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -38,8 +38,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -78,7 +78,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void testRemoveUnusedLog() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
@@ -105,7 +105,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void testKeepClosedLog() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
@@ -128,7 +128,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void deleteUnreferenceLogOnDeadServer() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);
@@ -159,7 +159,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void ignoreReferenceLogOnDeadServer() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);
@@ -185,7 +185,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void replicationDelaysFileCollection() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 3ff1aa9..d55781e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -68,7 +68,7 @@ import org.apache.accumulo.master.state.TableStats;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.ClosableIterator;
@@ -129,7 +129,7 @@ class TabletGroupWatcher extends Daemon {
     int[] oldCounts = new int[TabletState.values().length];
     EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
 
-    WalMarker wals = new WalMarker(master.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(master.getInstance(), ZooReaderWriter.getInstance());
 
     while (this.master.stillMaster()) {
       // slow things down a little, otherwise we spam the logs when there are many wake-up
events

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 23a4b34..c0a29eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,8 +161,8 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -321,7 +321,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
   private final ServerConfigurationFactory confFactory;
 
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
-  private final WalMarker walMarker;
+  private final WalStateManager walMarker;
 
   public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
     super(confFactory);
@@ -367,7 +367,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
         TabletLocator.clearLocators();
       }
     }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
-    walMarker = new WalMarker(instance, ZooReaderWriter.getInstance());
+    walMarker = new WalStateManager(instance, ZooReaderWriter.getInstance());
 
     // Create the secret manager
     setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
index b1c010c..8d22ad3 100644
--- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -35,7 +35,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -122,7 +122,7 @@ public class UnusedWALIT extends ConfigurableMacIT {
   }
 
   private int getWALCount(Connector c) throws Exception {
-    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(c.getInstance(), ZooReaderWriter.getInstance());
     int result = 0;
     for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet())
{
       result += entry.getValue().size();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 8f4fe75..b66d13f 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -65,8 +65,8 @@ import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.init.Initialize;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.util.Admin;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -428,7 +428,7 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
       for (Path path : paths) {
         if (entry.getKey().toString().startsWith(path.toString())) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 22e2930..8d7dd62 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -52,8 +52,8 @@ import org.apache.accumulo.master.state.SetGoalState;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -200,7 +200,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
 
   private Map<String,Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception
{
     Map<String,Boolean> result = new HashMap<>();
-    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(c.getInstance(), ZooReaderWriter.getInstance());
     for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
       // WALs are in use if they are not unreferenced
       result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 47873f6..ddaef00 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -46,8 +46,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -105,7 +105,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
 
     Set<String> result = new HashSet<String>();
     for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 41cb75a..da9dd24 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -76,8 +76,8 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
@@ -149,7 +149,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
     // Map of logs to tableId
     Multimap<String,String> logs = HashMultimap.create();
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet())
{
       for (UUID id : entry.getValue()) {
         Pair<WalState,Path> state = wals.state(entry.getKey(), id);
@@ -161,7 +161,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     return logs;
   }
 
-  private Multimap<String,String> getAllLogs(Connector conn) throws TableNotFoundException
{
+  private Multimap<String,String> getAllLogs(Connector conn) throws Exception {
     Multimap<String,String> logs = getLogs(conn);
     try {
       Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
@@ -333,7 +333,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Set<String> wals = Sets.newHashSet();
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
-      WalMarker markers = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+      WalStateManager markers = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
       for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) {
         wals.add(entry.getKey().toString());
       }


Mime
View raw message