Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 85BAA18E77 for ; Mon, 15 Jun 2015 21:52:47 +0000 (UTC) Received: (qmail 93457 invoked by uid 500); 15 Jun 2015 21:52:47 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 93409 invoked by uid 500); 15 Jun 2015 21:52:47 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 93396 invoked by uid 99); 15 Jun 2015 21:52:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jun 2015 21:52:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50ECCE10A2; Mon, 15 Jun 2015 21:52:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Mon, 15 Jun 2015 21:52:47 -0000 Message-Id: <558b3af6f4be4ae0ae81df1bfd6f7ade@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] accumulo git commit: ACCUMULO-3423 more fixes for replication Repository: accumulo Updated Branches: refs/heads/master 67249ec2e -> b7a529b75 ACCUMULO-3423 more fixes for replication Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ecf2298e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ecf2298e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ecf2298e Branch: refs/heads/master Commit: ecf2298eb9b764c01edc3f16f0c0cb6e6c4006cb Parents: 844166a Author: Eric C. Newton Authored: Mon Jun 15 09:59:38 2015 -0400 Committer: Eric C. Newton Committed: Mon Jun 15 09:59:38 2015 -0400 ---------------------------------------------------------------------- .../accumulo/server/log/WalStateManager.java | 8 ++++- .../CloseWriteAheadLogReferences.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 11 +++++++ .../accumulo/master/TabletGroupWatcher.java | 4 ++- .../test/replication/ReplicationIT.java | 31 +++++--------------- 5 files changed, 29 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java index 1540938..52844c1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java @@ -34,6 +34,8 @@ import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs @@ -68,6 +70,8 @@ public class WalStateManager { } } + private static final Logger log = LoggerFactory.getLogger(WalStateManager.class); + public final static String ZWALS = "/wals"; public static enum WalState { @@ -113,6 +117,7 @@ public class WalStateManager { if (state == WalState.OPEN) { policy = NodeExistsPolicy.FAIL; } + log.debug("Setting {} to {}", path.getName(), state); zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy); } catch (KeeperException | InterruptedException e) { throw new WalMarkerException(e); @@ -193,6 +198,7 @@ public class WalStateManager { // garbage collector knows it's safe to remove the marker for a closed log public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException { try { + log.debug("Removing {}", uuid); String path = root() + "/" + instance.toString() + "/" + uuid.toString(); zoo.delete(path, -1); } catch (InterruptedException | KeeperException e) { @@ -211,8 +217,8 @@ public class WalStateManager { } // tablet server can mark the log as closed (but still needed), for replication to begin + // master can mark a log as unreferenced after it has made log recovery markers on the tablets that need to be recovered public void closeWal(TServerInstance instance, Path path) throws WalMarkerException { updateState(instance, path, WalState.CLOSED); } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index 8857939..0c09396 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -185,7 +185,7 @@ public class CloseWriteAheadLogReferences implements Runnable { // We only want to clean up WALs (which is everything but rfiles) and only when // metadata doesn't have a reference to the given WAL - if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isClosed) { + if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && isClosed) { try { closeWal(bw, entry.getKey()); recordsClosed++; http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 9a324fb..0cf84f2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -97,6 +97,8 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.master.LiveTServerSet; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer; @@ -1586,4 +1588,13 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List return new HashSet(serversToShutdown); } } + + public void markDeadServerLogsAsClosed(Map> logsForDeadServers) throws WalMarkerException { + WalStateManager mgr = new WalStateManager(this.inst, ZooReaderWriter.getInstance()); + for (Entry> server : logsForDeadServers.entrySet()) { + for (Path path : server.getValue()) { + mgr.closeWal(server.getKey(), path); + } + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index d55781e..d2cbf62 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -69,6 +69,7 @@ import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.state.Assignment; import org.apache.accumulo.server.master.state.ClosableIterator; @@ -748,12 +749,13 @@ class TabletGroupWatcher extends Daemon { private void flushChanges(SortedMap currentTServers, List assignments, List assigned, List assignedToDeadServers, Map> logsForDeadServers, Map unassigned) - throws DistributedStoreException, TException { + throws DistributedStoreException, TException, WalMarkerException { if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); Master.log.debug("logs for dead servers: " + logsForDeadServers); store.unassign(assignedToDeadServers, logsForDeadServers); + this.master.markDeadServerLogsAsClosed(logsForDeadServers); this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ecf2298e/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java index 55379a4..e0a9121 100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java @@ -435,26 +435,17 @@ public class ReplicationIT extends ConfigurableMacBase { Assert.assertTrue("Replication table did not exist", online); Assert.assertTrue(ReplicationTable.isOnline(conn)); - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); // Verify that we found a single replication record that's for table1 Scanner s = ReplicationTable.getScanner(conn); StatusSection.limit(s); - Iterator> iter = s.iterator(); - attempts = 5; - while (attempts > 0) { - if (!iter.hasNext()) { - s.close(); - Thread.sleep(1000); - s = ReplicationTable.getScanner(conn); - iter = s.iterator(); - attempts--; - } else { + for (int i = 0; i < 5; i++) { + if (Iterators.size(s.iterator()) == 1) { break; } + Thread.sleep(1000); } - Assert.assertTrue(iter.hasNext()); - Entry entry = iter.next(); + Entry entry = Iterators.getOnlyElement(s.iterator()); // We should at least find one status record for this table, we might find a second if another log was started from ingesting the data Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey() .getColumnQualifier().toString()); @@ -469,23 +460,15 @@ public class ReplicationIT extends ConfigurableMacBase { // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2 // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there - // After writing data, we'll get a replication table online - Assert.assertTrue(ReplicationTable.isOnline(conn)); - Set tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2)); Set tableIdsForMetadata = Sets.newHashSet(tableIds); + List> records = new ArrayList<>(); s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); s.setRange(MetadataSchema.ReplicationSection.getRange()); - - List> records = new ArrayList<>(); for (Entry metadata : s) { records.add(metadata); - } - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - for (Entry replication : s) { - records.add(replication); + log.debug("Meta: {} => {}", metadata.getKey().toStringNoTruncate(), metadata.getValue().toString()); } Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size()); @@ -503,7 +486,7 @@ public class ReplicationIT extends ConfigurableMacBase { // Verify that we found two replication records: one for table1 and one for table2 s = ReplicationTable.getScanner(conn); StatusSection.limit(s); - iter = s.iterator(); + Iterator> iter = s.iterator(); Assert.assertTrue("Found no records in replication table", iter.hasNext()); entry = iter.next(); Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString()));