Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 11923104B1 for ; Mon, 10 Nov 2014 21:46:21 +0000 (UTC) Received: (qmail 5846 invoked by uid 500); 10 Nov 2014 21:46:21 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 5807 invoked by uid 500); 10 Nov 2014 21:46:21 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 5729 invoked by uid 99); 10 Nov 2014 21:46:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Nov 2014 21:46:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A69739A49DC; Mon, 10 Nov 2014 21:46:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Mon, 10 Nov 2014 21:46:22 -0000 Message-Id: In-Reply-To: <314964d568ad4940b856ac9e07c35150@git.apache.org> References: <314964d568ad4940b856ac9e07c35150@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/10] accumulo git commit: ACCUMULO-3320 Use the active WALs from each tserver to preclude WAL deletion ACCUMULO-3320 Use the active WALs from each tserver to preclude WAL deletion Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1aee3d4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1aee3d4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1aee3d4 Branch: refs/heads/master Commit: f1aee3d4ea1a428885fb66f550f7391653d78187 Parents: 2b34734 Author: Josh Elser Authored: Sat Nov 8 23:14:40 2014 -0500 Committer: Josh Elser Committed: Mon Nov 10 13:34:51 2014 -0800 ---------------------------------------------------------------------- .../gc/GarbageCollectWriteAheadLogs.java | 115 +-------------- .../CloseWriteAheadLogReferences.java | 141 +++++++++++++++++++ .../gc/GarbageCollectWriteAheadLogsTest.java | 79 ----------- .../CloseWriteAheadLogReferencesTest.java | 78 ++++++++++ 4 files changed, 221 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 50256b2..4a9dc3e 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -36,13 +36,11 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; @@ -53,14 +51,12 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; @@ -73,7 +69,6 @@ import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -360,25 +355,15 @@ public class GarbageCollectWriteAheadLogs { throw new IllegalArgumentException(e); } - - final AccumuloConfiguration conf = new ServerConfigurationFactory(instance).getConfiguration(); - final TInfo tinfo = Tracer.traceInfo(); - final TCredentials tcreds = SystemCredentials.get().toThrift(getInstance()); - Set activeWals = getActiveWals(conf, tinfo, tcreds); - int count = 0; Iterator> walIter = nameToFileMap.entrySet().iterator(); + while (walIter.hasNext()) { Entry wal = walIter.next(); String fullPath = wal.getValue().toString(); - if (null == activeWals) { - log.debug("Could not contact servers to determine if WAL is still in use"); - walIter.remove(); - sortedWALogs.remove(wal.getKey()); - } if (neededByReplication(conn, fullPath)) { - log.debug("Removing WAL from candidate deletion as it is still needed for replication: {} ", fullPath); + log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); // If we haven't already removed it, check to see if this WAL is // "in use" by replication (needed for replication purposes) status.currentLog.inUse++; @@ -394,102 +379,6 @@ public class GarbageCollectWriteAheadLogs { return count; } - private String getMasterAddress() { - try { - List locations = getInstance().getMasterLocations(); - if (locations.size() == 0) - return null; - return locations.get(0); - } catch (Exception e) { - log.warn("Failed to obtain master host " + e); - } - - return null; - } - - private MasterClientService.Client getMasterConnection(AccumuloConfiguration conf) { - final String address = getMasterAddress(); - try { - if (address == null) { - log.warn("Could not fetch Master address"); - return null; - } - return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT, conf); - } catch (Exception e) { - log.warn("Issue with masterConnection (" + address + ") " + e, e); - } - return null; - } - - /** - * Fetch the set of WALs in use by tabletservers - * - * @return Set of WALs in use by tservers, null if they cannot be computed for some reason - */ - protected Set getActiveWals(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { - List tservers = getActiveTservers(conf, tinfo, tcreds); - - // Compute the total set of WALs used by tservers - Set walogs = null; - if (null != tservers) { - walogs = new HashSet(); - for (String tserver : tservers) { - HostAndPort address = HostAndPort.fromString(tserver); - List activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds, address); - if (null == activeWalsForServer) { - log.debug("Could not fetch active wals from " + address); - return null; - } - log.debug("Got active wals for " + address + ", " + activeWalsForServer); - walogs.addAll(activeWalsForServer); - } - } - - return walogs; - } - - /** - * Get the active tabletservers as seen by the master. - * - * @return The active tabletservers, null if they can't be computed. - */ - protected List getActiveTservers(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { - MasterClientService.Client client = null; - - List tservers = null; - try { - client = getMasterConnection(conf); - - if (null != client) { - tservers = client.getActiveTservers(tinfo, tcreds); - } - } catch (TException e) { - // If we can't fetch the tabletservers, we can't fetch any active WALs - log.warn("Failed to fetch active tabletservers from the master", e); - return null; - } finally { - ThriftUtil.returnClient(client); - } - - return tservers; - } - - protected List getActiveWalsForServer(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds, HostAndPort server) { - TabletClientService.Client tserverClient = null; - try { - tserverClient = ThriftUtil.getClient(new TabletClientService.Client.Factory(), server, conf); - return tserverClient.getActiveLogs(tinfo, tcreds); - } catch (TTransportException e) { - log.warn("Failed to fetch active write-ahead logs from " + server, e); - return null; - } catch (TException e) { - log.warn("Failed to fetch active write-ahead logs from " + server, e); - return null; - } finally { - ThriftUtil.returnClient(tserverClient); - } - } - /** * Determine if the given WAL is needed for replication * http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index bfa53ae..ae17f16 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -18,6 +18,7 @@ package org.apache.accumulo.gc.replication; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -29,11 +30,14 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; @@ -43,11 +47,20 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.security.SystemCredentials; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +68,7 @@ import com.google.common.base.Stopwatch; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -109,6 +123,38 @@ public class CloseWriteAheadLogReferences implements Runnable { log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString()); sw.reset(); + /* + * ACCUMULO-3320 WALs cannot be closed while a TabletServer may still use it later. + * + * In addition to the WALs that are actively referenced in the metadata table, tservers can also hold on to a WAL that is not presently referenced by any + * tablet. For example, a tablet could MinC which would end in all logs for that tablet being removed. However, if more data was ingested into the table, + * the same WAL could be re-used again by that tserver. + * + * If this code happened to run after the compaction but before the log is again referenced by a tabletserver, we might delete the WAL reference, only to + * have it recreated again which causes havoc with the replication status for a table. + */ + final AccumuloConfiguration conf = new ServerConfigurationFactory(instance).getConfiguration(); + final TInfo tinfo = Tracer.traceInfo(); + final TCredentials tcreds = SystemCredentials.get().toThrift(instance); + Set activeWals; + Span findActiveWalsSpan = Trace.start("findActiveWals"); + try { + sw.start(); + activeWals = getActiveWals(conf, tinfo, tcreds); + } finally { + sw.stop(); + findActiveWalsSpan.stop(); + } + + if (null == activeWals) { + log.warn("Could not compute the set of currently active WALs. Not closing any files"); + return; + } + + referencedWals.addAll(activeWals); + + log.info("Found " + activeWals.size() + " WALs actively in use by TabletServers in " + sw.toString()); + Span updateReplicationSpan = Trace.start("updateReplicationTable"); long recordsClosed = 0; try { @@ -252,4 +298,99 @@ public class CloseWriteAheadLogReferences implements Runnable { bw.addMutation(m); } + private String getMasterAddress() { + try { + List locations = instance.getMasterLocations(); + if (locations.size() == 0) + return null; + return locations.get(0); + } catch (Exception e) { + log.warn("Failed to obtain master host " + e); + } + + return null; + } + + private MasterClientService.Client getMasterConnection(AccumuloConfiguration conf) { + final String address = getMasterAddress(); + try { + if (address == null) { + log.warn("Could not fetch Master address"); + return null; + } + return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, Property.GENERAL_RPC_TIMEOUT, conf); + } catch (Exception e) { + log.warn("Issue with masterConnection (" + address + ") " + e, e); + } + return null; + } + + /** + * Fetch the set of WALs in use by tabletservers + * + * @return Set of WALs in use by tservers, null if they cannot be computed for some reason + */ + protected Set getActiveWals(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { + List tservers = getActiveTservers(conf, tinfo, tcreds); + + // Compute the total set of WALs used by tservers + Set walogs = null; + if (null != tservers) { + walogs = new HashSet(); + for (String tserver : tservers) { + HostAndPort address = HostAndPort.fromString(tserver); + List activeWalsForServer = getActiveWalsForServer(conf, tinfo, tcreds, address); + if (null == activeWalsForServer) { + log.debug("Could not fetch active wals from " + address); + return null; + } + log.debug("Got active wals for " + address + ", " + activeWalsForServer); + walogs.addAll(activeWalsForServer); + } + } + + return walogs; + } + + /** + * Get the active tabletservers as seen by the master. + * + * @return The active tabletservers, null if they can't be computed. + */ + protected List getActiveTservers(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds) { + MasterClientService.Client client = null; + + List tservers = null; + try { + client = getMasterConnection(conf); + + if (null != client) { + tservers = client.getActiveTservers(tinfo, tcreds); + } + } catch (TException e) { + // If we can't fetch the tabletservers, we can't fetch any active WALs + log.warn("Failed to fetch active tabletservers from the master", e); + return null; + } finally { + ThriftUtil.returnClient(client); + } + + return tservers; + } + + protected List getActiveWalsForServer(AccumuloConfiguration conf, TInfo tinfo, TCredentials tcreds, HostAndPort server) { + TabletClientService.Client tserverClient = null; + try { + tserverClient = ThriftUtil.getClient(new TabletClientService.Client.Factory(), server, conf); + return tserverClient.getActiveLogs(tinfo, tcreds); + } catch (TTransportException e) { + log.warn("Failed to fetch active write-ahead logs from " + server, e); + return null; + } catch (TException e) { + log.warn("Failed to fetch active write-ahead logs from " + server, e); + return null; + } finally { + ThriftUtil.returnClient(tserverClient); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 780907f..71e5f7d 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -27,15 +27,12 @@ import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; @@ -44,7 +41,6 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -58,13 +54,10 @@ import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.core.security.thrift.TCredentials; -import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -73,7 +66,6 @@ import org.junit.rules.TestName; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.net.HostAndPort; public class GarbageCollectWriteAheadLogsTest { private static final long BLOCK_SIZE = 64000000L; @@ -525,75 +517,4 @@ public class GarbageCollectWriteAheadLogsTest { } } } - - @Test - public void getActiveWals() throws Exception { - GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); - TInfo tinfo = EasyMock.createMock(TInfo.class); - TCredentials tcreds = EasyMock.createMock(TCredentials.class); - - List tservers = Arrays.asList("localhost:12345", "localhost:12346"); - EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); - int numWals = 0; - for (String tserver : tservers) { - EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal" + numWals)); - numWals++; - } - - EasyMock.replay(gcWals); - - Set wals = gcWals.getActiveWals(conf, tinfo, tcreds); - - EasyMock.verify(gcWals); - - Set expectedWals = new HashSet(); - for (int i = 0; i < numWals; i++) { - expectedWals.add("/wal" + i); - } - - Assert.assertEquals(expectedWals, wals); - } - - @Test - public void offlineMaster() throws Exception { - GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); - TInfo tinfo = EasyMock.createMock(TInfo.class); - TCredentials tcreds = EasyMock.createMock(TCredentials.class); - - EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(null); - - EasyMock.replay(gcWals); - - Set wals = gcWals.getActiveWals(conf, tinfo, tcreds); - - EasyMock.verify(gcWals); - - Assert.assertNull("Expected to get null for active WALs", wals); - } - - @Test - public void offlineTserver() throws Exception { - GarbageCollectWriteAheadLogs gcWals = EasyMock.createMockBuilder(GarbageCollectWriteAheadLogs.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); - TInfo tinfo = EasyMock.createMock(TInfo.class); - TCredentials tcreds = EasyMock.createMock(TCredentials.class); - - List tservers = Arrays.asList("localhost:12345", "localhost:12346"); - EasyMock.expect(gcWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); - EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal" + 0)); - EasyMock.expect(gcWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12346"))).andReturn(null); - - EasyMock.replay(gcWals); - - Set wals = gcWals.getActiveWals(conf, tinfo, tcreds); - - EasyMock.verify(gcWals); - - Assert.assertNull("Expected to get null for active WALs", wals); - } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1aee3d4/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index 86ac87e..2c0164d 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -23,8 +23,11 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.UUID; @@ -37,6 +40,7 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; @@ -52,8 +56,11 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Assert; import org.junit.Before; @@ -64,6 +71,7 @@ import org.junit.rules.TestName; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; public class CloseWriteAheadLogReferencesTest { @@ -364,4 +372,74 @@ public class CloseWriteAheadLogReferencesTest { Assert.assertFalse(status.getClosed()); } + @Test + public void getActiveWals() throws Exception { + CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + List tservers = Arrays.asList("localhost:12345", "localhost:12346"); + EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); + int numWals = 0; + for (String tserver : tservers) { + EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal" + numWals)); + numWals++; + } + + EasyMock.replay(closeWals); + + Set wals = closeWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(closeWals); + + Set expectedWals = new HashSet(); + for (int i = 0; i < numWals; i++) { + expectedWals.add("/wal" + i); + } + + Assert.assertEquals(expectedWals, wals); + } + + @Test + public void offlineMaster() throws Exception { + CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(null); + + EasyMock.replay(closeWals); + + Set wals = closeWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(closeWals); + + Assert.assertNull("Expected to get null for active WALs", wals); + } + + @Test + public void offlineTserver() throws Exception { + CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") + .addMockedMethod("getActiveWalsForServer").createMock(); + AccumuloConfiguration conf = EasyMock.createMock(AccumuloConfiguration.class); + TInfo tinfo = EasyMock.createMock(TInfo.class); + TCredentials tcreds = EasyMock.createMock(TCredentials.class); + + List tservers = Arrays.asList("localhost:12345", "localhost:12346"); + EasyMock.expect(closeWals.getActiveTservers(conf, tinfo, tcreds)).andReturn(tservers); + EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal" + 0)); + EasyMock.expect(closeWals.getActiveWalsForServer(conf, tinfo, tcreds, HostAndPort.fromString("localhost:12346"))).andReturn(null); + + EasyMock.replay(closeWals); + + Set wals = closeWals.getActiveWals(conf, tinfo, tcreds); + + EasyMock.verify(closeWals); + + Assert.assertNull("Expected to get null for active WALs", wals); + } }