accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [03/10] accumulo git commit: ACCUMULO-3320 Use the active WALs from each tserver to preclude WAL deletion
Date Mon, 10 Nov 2014 21:46:22 GMT
ACCUMULO-3320 Use the active WALs from each tserver to preclude WAL deletion


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1aee3d4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1aee3d4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1aee3d4

Branch: refs/heads/master
Commit: f1aee3d4ea1a428885fb66f550f7391653d78187
Parents: 2b34734
Author: Josh Elser <elserj@apache.org>
Authored: Sat Nov 8 23:14:40 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Nov 10 13:34:51 2014 -0800

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 115 +--------------
 .../CloseWriteAheadLogReferences.java           | 141 +++++++++++++++++++
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  79 -----------
 .../CloseWriteAheadLogReferencesTest.java       |  78 ++++++++++
 4 files changed, 221 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 50256b2..4a9dc3e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -36,13 +36,11 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -53,14 +51,12 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -73,7 +69,6 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -360,25 +355,15 @@ public class GarbageCollectWriteAheadLogs {
       throw new IllegalArgumentException(e);
     }
 
-
-    final AccumuloConfiguration conf = new ServerConfigurationFactory(instance).getConfiguration();
-    final TInfo tinfo = Tracer.traceInfo();
-    final TCredentials tcreds = SystemCredentials.get().toThrift(getInstance());
-    Set<String> activeWals = getActiveWals(conf, tinfo, tcreds);
-
     int count = 0;
 
     Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+
     while (walIter.hasNext()) {
       Entry<String,Path> wal = walIter.next();
       String fullPath = wal.getValue().toString();
-      if (null == activeWals) {
-        log.debug("Could not contact servers to determine if WAL is still in use");
-        walIter.remove();
-        sortedWALogs.remove(wal.getKey());
-      }
       if (neededByReplication(conn, fullPath)) {
-        log.debug("Removing WAL from candidate deletion as it is still needed for replication:
{} ", fullPath);
+        log.debug("Removing WAL from candidate deletion as it is still needed for replication:
{}", fullPath);
         // If we haven't already removed it, check to see if this WAL is
         // "in use" by replication (needed for replication purposes)
         status.currentLog.inUse++;
@@ -394,102 +379,6 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
-  private String getMasterAddress() {
-    try {
-      List<String> locations = getInstance().getMasterLocations();
-      if (locations.size() == 0)
-        return null;
-      return locations.get(0);
-    } catch (Exception e) {
-      log.warn("Failed to obtain master host " + e);
-    }
-
-    return null;
-  }
-
-  private MasterClientService.Client getMasterConnection(AccumuloConfiguration conf) {
-    final String address = getMasterAddress();
-    try {
-      if (address == null) {
-        log.warn("Could not fetch Master address");
-        return null;
-      }
-      return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT,
conf);
-    } catch (Exception e) {
-      log.warn("Issue with masterConnection (" + address + ") " + e, e);
-    }
-    return null;
-  }
-
-  /**
-   * Fetch the set of WALs in use by tabletservers
-   *
-   * @return Set of WALs in use by tservers, null if they cannot be computed for some reason
-   */
-  protected Set<String> getActiveWals(AccumuloConfiguration conf, TInfo tinfo, TCredentials
tcreds) {
-    List<String> tservers = getActiveTservers(conf, tinfo, tcreds);
-
-    // Compute the total set of WALs used by tservers
-    Set<String> walogs = null;
-    if (null != tservers) {
-      walogs = new HashSet<String>();
-      for (String tserver : tservers) {
-        HostAndPort address = HostAndPort.fromString(tserver);
-        List<String> activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds,
address);
-        if (null == activeWalsForServer) {
-          log.debug("Could not fetch active wals from " + address);
-          return null;
-        }
-        log.debug("Got active wals for " + address + ", " + activeWalsForServer);
-        walogs.addAll(activeWalsForServer);
-      }
-    }
-
-    return walogs;
-  }
-
-  /**
-   * Get the active tabletservers as seen by the master.
-   *
-   * @return The active tabletservers, null if they can't be computed.
-   */
-  protected List<String> getActiveTservers(AccumuloConfiguration conf, TInfo tinfo,
TCredentials tcreds) {
-    MasterClientService.Client client = null;
-
-    List<String> tservers = null;
-    try {
-      client = getMasterConnection(conf);
-
-      if (null != client) {
-        tservers = client.getActiveTservers(tinfo, tcreds);
-      }
-    } catch (TException e) {
-      // If we can't fetch the tabletservers, we can't fetch any active WALs
-      log.warn("Failed to fetch active tabletservers from the master", e);
-      return null;
-    } finally {
-      ThriftUtil.returnClient(client);
-    }
-
-    return tservers;
-  }
-
-  protected List<String> getActiveWalsForServer(AccumuloConfiguration conf, TInfo tinfo,
TCredentials tcreds, HostAndPort server) {
-    TabletClientService.Client tserverClient = null;
-    try {
-      tserverClient = ThriftUtil.getClient(new TabletClientService.Client.Factory(), server,
conf);
-      return tserverClient.getActiveLogs(tinfo, tcreds);
-    } catch (TTransportException e) {
-      log.warn("Failed to fetch active write-ahead logs from " + server, e);
-      return null;
-    } catch (TException e) {
-      log.warn("Failed to fetch active write-ahead logs from " + server, e);
-      return null;
-    } finally {
-      ThriftUtil.returnClient(tserverClient);
-    }
-  }
-
   /**
    * Determine if the given WAL is needed for replication
    *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index bfa53ae..ae17f16 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.gc.replication;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -29,11 +30,14 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
@@ -43,11 +47,20 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +68,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -109,6 +123,38 @@ public class CloseWriteAheadLogReferences implements Runnable {
     log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString());
     sw.reset();
 
+    /*
+     * ACCUMULO-3320 WALs cannot be closed while a TabletServer may still use it later.
+     *
+     * In addition to the WALs that are actively referenced in the metadata table, tservers
can also hold on to a WAL that is not presently referenced by any
+     * tablet. For example, a tablet could MinC which would end in all logs for that tablet
being removed. However, if more data was ingested into the table,
+     * the same WAL could be re-used again by that tserver.
+     *
+     * If this code happened to run after the compaction but before the log is again referenced
by a tabletserver, we might delete the WAL reference, only to
+     * have it recreated again which causes havoc with the replication status for a table.
+     */
+    final AccumuloConfiguration conf = new ServerConfigurationFactory(instance).getConfiguration();
+    final TInfo tinfo = Tracer.traceInfo();
+    final TCredentials tcreds = SystemCredentials.get().toThrift(instance);
+    Set<String> activeWals;
+    Span findActiveWalsSpan = Trace.start("findActiveWals");
+    try {
+      sw.start();
+      activeWals = getActiveWals(conf, tinfo, tcreds);
+    } finally {
+      sw.stop();
+      findActiveWalsSpan.stop();
+    }
+
+    if (null == activeWals) {
+      log.warn("Could not compute the set of currently active WALs. Not closing any files");
+      return;
+    }
+
+    referencedWals.addAll(activeWals);
+
+    log.info("Found " + activeWals.size() + " WALs actively in use by TabletServers in "
+ sw.toString());
+
     Span updateReplicationSpan = Trace.start("updateReplicationTable");
     long recordsClosed = 0;
     try {
@@ -252,4 +298,99 @@ public class CloseWriteAheadLogReferences implements Runnable {
     bw.addMutation(m);
   }
 
+  private String getMasterAddress() {
+    try {
+      List<String> locations = instance.getMasterLocations();
+      if (locations.size() == 0)
+        return null;
+      return locations.get(0);
+    } catch (Exception e) {
+      log.warn("Failed to obtain master host " + e);
+    }
+
+    return null;
+  }
+
+  private MasterClientService.Client getMasterConnection(AccumuloConfiguration conf) {
+    final String address = getMasterAddress();
+    try {
+      if (address == null) {
+        log.warn("Could not fetch Master address");
+        return null;
+      }
+      return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT,
conf);
+    } catch (Exception e) {
+      log.warn("Issue with masterConnection (" + address + ") " + e, e);
+    }
+    return null;
+  }
+
+  /**
+   * Fetch the set of WALs in use by tabletservers
+   *
+   * @return Set of WALs in use by tservers, null if they cannot be computed for some reason
+   */
+  protected Set<String> getActiveWals(AccumuloConfiguration conf, TInfo tinfo, TCredentials
tcreds) {
+    List<String> tservers = getActiveTservers(conf, tinfo, tcreds);
+
+    // Compute the total set of WALs used by tservers
+    Set<String> walogs = null;
+    if (null != tservers) {
+      walogs = new HashSet<String>();
+      for (String tserver : tservers) {
+        HostAndPort address = HostAndPort.fromString(tserver);
+        List<String> activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds,
address);
+        if (null == activeWalsForServer) {
+          log.debug("Could not fetch active wals from " + address);
+          return null;
+        }
+        log.debug("Got active wals for " + address + ", " + activeWalsForServer);
+        walogs.addAll(activeWalsForServer);
+      }
+    }
+
+    return walogs;
+  }
+
+  /**
+   * Get the active tabletservers as seen by the master.
+   *
+   * @return The active tabletservers, null if they can't be computed.
+   */
+  protected List<String> getActiveTservers(AccumuloConfiguration conf, TInfo tinfo,
TCredentials tcreds) {
+    MasterClientService.Client client = null;
+
+    List<String> tservers = null;
+    try {
+      client = getMasterConnection(conf);
+
+      if (null != client) {
+        tservers = client.getActiveTservers(tinfo, tcreds);
+      }
+    } catch (TException e) {
+      // If we can't fetch the tabletservers, we can't fetch any active WALs
+      log.warn("Failed to fetch active tabletservers from the master", e);
+      return null;
+    } finally {
+      ThriftUtil.returnClient(client);
+    }
+
+    return tservers;
+  }
+
+  protected List<String> getActiveWalsForServer(AccumuloConfiguration conf, TInfo tinfo,
TCredentials tcreds, HostAndPort server) {
+    TabletClientService.Client tserverClient = null;
+    try {
+      tserverClient = ThriftUtil.getClient(new TabletClientService.Client.Factory(), server,
conf);
+      return tserverClient.getActiveLogs(tinfo, tcreds);
+    } catch (TTransportException e) {
+      log.warn("Failed to fetch active write-ahead logs from " + server, e);
+      return null;
+    } catch (TException e) {
+      log.warn("Failed to fetch active write-ahead logs from " + server, e);
+      return null;
+    } finally {
+      ThriftUtil.returnClient(tserverClient);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 780907f..71e5f7d 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -27,15 +27,12 @@ import static org.junit.Assert.assertTrue;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -44,7 +41,6 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -58,13 +54,10 @@ import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -73,7 +66,6 @@ import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.net.HostAndPort;
 
 public class GarbageCollectWriteAheadLogsTest {
   private static final long BLOCK_SIZE = 64000000L;
@@ -525,75 +517,4 @@ public class GarbageCollectWriteAheadLogsTest {
       }
     }
   }
-
-  @Test
-  public void getActiveWals() throws Exception {
-    GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
-
-    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
-    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers);
-    int numWals = 0;
-    for (String tserver : tservers) {
-      EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal"
+ numWals));
-      numWals++;
-    }
-
-    EasyMock.replay(gcWals);
-
-    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
-
-    EasyMock.verify(gcWals);
-
-    Set<String> expectedWals = new HashSet<String>();
-    for (int i = 0; i < numWals; i++) {
-      expectedWals.add("/wal" + i);
-    }
-
-    Assert.assertEquals(expectedWals, wals);
-  }
-
-  @Test
-  public void offlineMaster() throws Exception {
-    GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
-
-    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(null);
-
-    EasyMock.replay(gcWals);
-
-    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
-
-    EasyMock.verify(gcWals);
-
-    Assert.assertNull("Expected to get null for active WALs", wals);
-  }
-
-  @Test
-  public void offlineTserver() throws Exception {
-    GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
-
-    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
-    EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers);
-    EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal"
+ 0));
-    EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12346"))).andReturn(null);
-
-    EasyMock.replay(gcWals);
-
-    Set<String> wals = gcWals.getActiveWals(conf, tinfo, tcreds);
-
-    EasyMock.verify(gcWals);
-
-    Assert.assertNull("Expected to get null for active WALs", wals);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 86ac87e..2c0164d 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -23,8 +23,11 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
@@ -37,6 +40,7 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
@@ -52,8 +56,11 @@ import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Assert;
 import org.junit.Before;
@@ -64,6 +71,7 @@ import org.junit.rules.TestName;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
 
 public class CloseWriteAheadLogReferencesTest {
 
@@ -364,4 +372,74 @@ public class CloseWriteAheadLogReferencesTest {
     Assert.assertFalse(status.getClosed());
   }
 
+  @Test
+  public void getActiveWals() throws Exception {
+    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+    
+    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
+    EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers);
+    int numWals = 0;
+    for (String tserver : tservers) {
+      EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal"
+ numWals));
+      numWals++;
+    }
+    
+    EasyMock.replay(closeWals);
+    
+    Set<String> wals = closeWals.getActiveWals(conf, tinfo, tcreds);
+    
+    EasyMock.verify(closeWals);
+    
+    Set<String> expectedWals = new HashSet<String>();
+    for (int i = 0; i < numWals; i++) {
+      expectedWals.add("/wal" + i);
+    }
+    
+    Assert.assertEquals(expectedWals, wals);
+  }
+  
+  @Test
+  public void offlineMaster() throws Exception {
+    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+    
+    EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(null);
+    
+    EasyMock.replay(closeWals);
+    
+    Set<String> wals = closeWals.getActiveWals(conf, tinfo, tcreds);
+    
+    EasyMock.verify(closeWals);
+    
+    Assert.assertNull("Expected to get null for active WALs", wals);
+  }
+  
+  @Test
+  public void offlineTserver() throws Exception {
+    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
+        .addMockedMethod("getActiveWalsForServer").createMock();
+    AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class);
+    TInfo tinfo = EasyMock.createMock(TInfo.class);
+    TCredentials tcreds = EasyMock.createMock(TCredentials.class);
+    
+    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
+    EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers);
+    EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal"
+ 0));
+    EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12346"))).andReturn(null);
+    
+    EasyMock.replay(closeWals);
+    
+    Set<String> wals = closeWals.getActiveWals(conf, tinfo, tcreds);
+    
+    EasyMock.verify(closeWals);
+    
+    Assert.assertNull("Expected to get null for active WALs", wals);
+  }
 }


Mime
View raw message