Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9763817981 for ; Mon, 1 Jun 2015 20:47:49 +0000 (UTC) Received: (qmail 90747 invoked by uid 500); 1 Jun 2015 20:47:49 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 90679 invoked by uid 500); 1 Jun 2015 20:47:49 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 90664 invoked by uid 99); 1 Jun 2015 20:47:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Jun 2015 20:47:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 887F1E051B; Mon, 1 Jun 2015 20:47:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Mon, 01 Jun 2015 20:47:48 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] accumulo git commit: ACCUMULO-3871 broke replication with log markers in zookeeper Repository: accumulo Updated Branches: refs/heads/master 5fbd67157 -> 4f8d61db5 ACCUMULO-3871 broke replication with log markers in zookeeper Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a108651e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a108651e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a108651e Branch: refs/heads/master Commit: a108651e3338f582b35fb7c44c4e29759ec3a41e Parents: f6bd3ee Author: Eric C. Newton Authored: Mon Jun 1 16:47:17 2015 -0400 Committer: Eric C. Newton Committed: Mon Jun 1 16:47:17 2015 -0400 ---------------------------------------------------------------------- .../CloseWriteAheadLogReferences.java | 88 +++----------------- .../CloseWriteAheadLogReferencesTest.java | 69 --------------- ...bageCollectorCommunicatesWithTServersIT.java | 6 +- 3 files changed, 12 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index a3652e2..1444127 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -17,7 +17,6 @@ package org.apache.accumulo.gc.replication; import java.util.Collections; -import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; @@ -44,7 +43,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; -import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.log.WalStateManager; @@ -102,54 +100,23 @@ public class CloseWriteAheadLogReferences implements Runnable { } Span findWalsSpan = Trace.start("findReferencedWals"); - HashSet referencedWals = null; + HashSet closed = null; try { sw.start(); - referencedWals = getReferencedWals(conn); + closed = getClosedLogs(conn); } finally { sw.stop(); findWalsSpan.stop(); } - log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString()); - log.debug("Referenced WALs: " + referencedWals); + log.info("Found " + closed.size() + " WALs referenced in metadata in " + sw.toString()); sw.reset(); - // ACCUMULO-3320 WALs cannot be closed while a TabletServer may still use it later. - // - // In addition to the WALs that are actively referenced in the metadata table, tservers can also hold on to a WAL that is not presently referenced by any - // tablet. For example, a tablet could MinC which would end in all logs for that tablet being removed. However, if more data was ingested into the table, - // the same WAL could be re-used again by that tserver. - // - // If this code happened to run after the compaction but before the log is again referenced by a tabletserver, we might delete the WAL reference, only to - // have it recreated again which causes havoc with the replication status for a table. - final TInfo tinfo = Tracer.traceInfo(); - Set activeWals; - Span findActiveWalsSpan = Trace.start("findActiveWals"); - try { - sw.start(); - activeWals = getActiveWals(tinfo); - } finally { - sw.stop(); - findActiveWalsSpan.stop(); - } - - if (null == activeWals) { - log.warn("Could not compute the set of currently active WALs. Not closing any files"); - return; - } - - log.debug("Got active WALs from all tservers " + activeWals); - - referencedWals.addAll(activeWals); - - log.info("Found " + activeWals.size() + " WALs actively in use by TabletServers in " + sw.toString()); - Span updateReplicationSpan = Trace.start("updateReplicationTable"); long recordsClosed = 0; try { sw.start(); - recordsClosed = updateReplicationEntries(conn, referencedWals); + recordsClosed = updateReplicationEntries(conn, closed); } finally { sw.stop(); updateReplicationSpan.stop(); @@ -158,8 +125,6 @@ public class CloseWriteAheadLogReferences implements Runnable { log.info("Closed " + recordsClosed + " WAL replication references in replication table in " + sw.toString()); } - static final EnumSet NOT_OPEN = EnumSet.complementOf(EnumSet.of(WalState.OPEN)); - /** * Construct the set of referenced WALs from zookeeper * @@ -167,22 +132,22 @@ public class CloseWriteAheadLogReferences implements Runnable { * Connector * @return The Set of WALs that are referenced in the metadata table */ - protected HashSet getReferencedWals(Connector conn) { + protected HashSet getClosedLogs(Connector conn) { WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance()); - HashSet referencedWals = new HashSet<>(); + HashSet result = new HashSet<>(); try { for (Entry entry : wals.getAllState().entrySet()) { - if (NOT_OPEN.contains(entry.getValue())) { + if (entry.getValue() == WalState.UNREFERENCED || entry.getValue() == WalState.CLOSED) { Path path = entry.getKey(); - log.debug("Found WAL " + path.toString()); - referencedWals.add(path.toString()); + log.debug("Found closed WAL " + path.toString()); + result.add(path.toString()); } } } catch (WalMarkerException e) { throw new RuntimeException(e); } - return referencedWals; + return result; } /** @@ -292,39 +257,6 @@ public class CloseWriteAheadLogReferences implements Runnable { } /** - * Fetch the set of WALs in use by tabletservers - * - * @return Set of WALs in use by tservers, null if they cannot be computed for some reason - */ - protected Set getActiveWals(TInfo tinfo) { - List tservers = getActiveTservers(tinfo); - - // Compute the total set of WALs used by tservers - Set walogs = null; - if (null != tservers) { - walogs = new HashSet(); - // TODO If we have a lot of tservers, this might start to take a fair amount of time - // Consider adding a threadpool to parallelize the requests. - // Alternatively, we might have to move to a solution that doesn't involve tserver RPC - for (String tserver : tservers) { - HostAndPort address = HostAndPort.fromString(tserver); - List activeWalsForServer = getActiveWalsForServer(tinfo, address); - if (null == activeWalsForServer) { - log.debug("Could not fetch active wals from " + address); - return null; - } - log.debug("Got raw active wals for " + address + ", " + activeWalsForServer); - for (String activeWal : activeWalsForServer) { - // Normalize the WAL URI - walogs.add(new Path(activeWal).toString()); - } - } - } - - return walogs; - } - - /** * Get the active tabletservers as seen by the master. * * @return The active tabletservers, null if they can't be computed. http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index 9af60dc..af8a9d0 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -20,12 +20,9 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map.Entry; import java.util.Set; @@ -49,7 +46,6 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.replication.StatusUtil; @@ -64,7 +60,6 @@ import org.junit.Test; import org.junit.rules.TestName; import com.google.common.collect.Iterables; -import com.google.common.net.HostAndPort; public class CloseWriteAheadLogReferencesTest { @@ -177,68 +172,4 @@ public class CloseWriteAheadLogReferencesTest { Assert.assertFalse(status.getClosed()); } - @Test - public void getActiveWals() throws Exception { - CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - TInfo tinfo = EasyMock.createMock(TInfo.class); - - List tservers = Arrays.asList("localhost:12345", "localhost:12346"); - EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(tservers); - int numWals = 0; - for (String tserver : tservers) { - EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal" + numWals)); - numWals++; - } - - EasyMock.replay(closeWals); - - Set wals = closeWals.getActiveWals(tinfo); - - EasyMock.verify(closeWals); - - Set expectedWals = new HashSet(); - for (int i = 0; i < numWals; i++) { - expectedWals.add("/wal" + i); - } - - Assert.assertEquals(expectedWals, wals); - } - - @Test - public void offlineMaster() throws Exception { - CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - TInfo tinfo = EasyMock.createMock(TInfo.class); - - EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(null); - - EasyMock.replay(closeWals); - - Set wals = closeWals.getActiveWals(tinfo); - - EasyMock.verify(closeWals); - - Assert.assertNull("Expected to get null for active WALs", wals); - } - - @Test - public void offlineTserver() throws Exception { - CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers") - .addMockedMethod("getActiveWalsForServer").createMock(); - TInfo tinfo = EasyMock.createMock(TInfo.class); - - List tservers = Arrays.asList("localhost:12345", "localhost:12346"); - EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(tservers); - EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal" + 0)); - EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString("localhost:12346"))).andReturn(null); - - EasyMock.replay(closeWals); - - Set wals = closeWals.getActiveWals(tinfo); - - EasyMock.verify(closeWals); - - Assert.assertNull("Expected to get null for active WALs", wals); - } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 38d2276..a961e78 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -257,7 +257,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); } - @Test + @Test(timeout = 2 * 60 * 1000) public void testUnreferencedWalInTserverIsClosed() throws Exception { final String[] names = getUniqueNames(2); // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver @@ -305,7 +305,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); String walName = fileToStatus.keySet().iterator().next(); - Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + Assert.assertTrue("Expected log file name from tablet to equal replication entry", wals.contains(walName)); Status status = fileToStatus.get(walName); @@ -342,8 +342,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Map fileToStatusAfterMinc = getMetadataStatusForTable(table); Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); - Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); - /* * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of