accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [3/3] accumulo git commit: ACCUMULO-3423 fixing issues found in ITs
Date Fri, 12 Jun 2015 15:46:28 GMT
ACCUMULO-3423 fixing issues found in ITs


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/844166a0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/844166a0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/844166a0

Branch: refs/heads/master
Commit: 844166a05a248d13148d8f82f7f135d808994c13
Parents: 2fe0bef
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri Jun 12 11:45:53 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Jun 12 11:45:53 2015 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 15 +++++
 .../CloseWriteAheadLogReferences.java           | 10 +--
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 65 +++++++++++++-------
 ...bageCollectorCommunicatesWithTServersIT.java |  4 +-
 .../test/replication/ReplicationIT.java         | 39 ++++++++----
 5 files changed, 93 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b8fb9fb..8803a40 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -39,6 +39,9 @@ import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
@@ -315,6 +318,18 @@ public class GarbageCollectWriteAheadLogs {
     Connector conn;
     try {
       conn = context.getConnector();
+      try {
+        final Scanner s = ReplicationTable.getScanner(conn);
+        StatusSection.limit(s);
+        for (Entry<Key,Value> entry : s) {
+          UUID id = path2uuid(new Path(entry.getKey().getRow().toString()));
+          candidates.remove(id);
+          log.info("Ignore closed log " + id + " because it is being replicated");
+        }
+      } catch (ReplicationTableOfflineException ex) {
+        return candidates.size();
+      }
+
       final Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
       scanner.setRange(MetadataSchema.ReplicationSection.getRange());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 1444127..8857939 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -155,10 +155,10 @@ public class CloseWriteAheadLogReferences implements Runnable {
    *
    * @param conn
    *          Connector
-   * @param referencedWals
-   *          {@link Set} of paths to WALs that are referenced in the tablets section of
the metadata table
+   * @param closedWals
+   *          {@link Set} of paths to WALs that marked as closed or unreferenced in zookeeper
    */
-  protected long updateReplicationEntries(Connector conn, Set<String> referencedWals)
{
+  protected long updateReplicationEntries(Connector conn, Set<String> closedWals) {
     BatchScanner bs = null;
     BatchWriter bw = null;
     long recordsClosed = 0;
@@ -181,11 +181,11 @@ public class CloseWriteAheadLogReferences implements Runnable {
         // Ignore things that aren't completely replicated as we can't delete those anyways
         MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
         String replFile = replFileText.toString();
-        boolean isReferenced = referencedWals.contains(replFile);
+        boolean isClosed = closedWals.contains(replFile);
 
         // We only want to clean up WALs (which is everything but rfiles) and only when
         // metadata doesn't have a reference to the given WAL
-        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isReferenced)
{
+        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isClosed)
{
           try {
             closeWal(bw, entry.getKey());
             recordsClosed++;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index a40d390..60d6026 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -34,6 +34,8 @@ import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.replication.ReplicationSchema;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
@@ -131,28 +133,35 @@ public class GarbageCollectWriteAheadLogsTest {
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
-    Scanner scanner = EasyMock.createMock(Scanner.class);
+    Scanner mscanner = EasyMock.createMock(Scanner.class);
+    Scanner rscanner = EasyMock.createMock(Scanner.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
     EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
     EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN,
path));
     EasyMock.expect(context.getConnector()).andReturn(conn);
-    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
-    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+    rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+    mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
     EasyMock.expectLastCall().once();
-    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
     EasyMock.expectLastCall().once();
-    EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+    EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
     EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
     marker.removeWalMarker(server2, id);
     EasyMock.expectLastCall().once();
     marker.forget(server2);
     EasyMock.expectLastCall().once();
-    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer1List);
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
 
   @Test
@@ -162,23 +171,30 @@ public class GarbageCollectWriteAheadLogsTest {
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
-    Scanner scanner = EasyMock.createMock(Scanner.class);
+    Scanner mscanner = EasyMock.createMock(Scanner.class);
+    Scanner rscanner = EasyMock.createMock(Scanner.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
     EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
     EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN,
path));
     EasyMock.expect(context.getConnector()).andReturn(conn);
-    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
-    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+    rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
     EasyMock.expectLastCall().once();
-    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+    mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
     EasyMock.expectLastCall().once();
-    EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
-    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
+    EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer2List);
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
 
   @Test
@@ -188,7 +204,8 @@ public class GarbageCollectWriteAheadLogsTest {
     WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
-    Scanner scanner = EasyMock.createMock(Scanner.class);
+    Scanner mscanner = EasyMock.createMock(Scanner.class);
+    Scanner rscanner = EasyMock.createMock(Scanner.class);
     String row = MetadataSchema.ReplicationSection.getRowPrefix() + path.toString();
     String colf = MetadataSchema.ReplicationSection.COLF.toString();
     String colq = "1";
@@ -200,15 +217,21 @@ public class GarbageCollectWriteAheadLogsTest {
     EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
     EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED,
path));
     EasyMock.expect(context.getConnector()).andReturn(conn);
-    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
-    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+    EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+    rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+    mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
     EasyMock.expectLastCall().once();
-    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
     EasyMock.expectLastCall().once();
-    EasyMock.expect(scanner.iterator()).andReturn(replicationWork.entrySet().iterator());
-    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
+    EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
tserverSet, marker, tabletOnServer1List);
     gc.collect(status);
-    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index ab142d0..6a14de3 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -198,13 +198,11 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacB
     log.info("Flushing mutations to the server");
     bw.flush();
 
-    log.info("Checking that metadata only has one WAL recorded for this table");
+    log.info("Checking that metadata only has two WALs recorded for this table (inUse, and
opened)");
 
     Set<String> wals = getWalsForTable(table);
     Assert.assertEquals("Expected to only find two WALs for the table", 2, wals.size());
 
-    log.info("Compacting the table which will remove all WALs from the tablets");
-
     // Flush our test table to remove the WAL references in it
     conn.tableOperations().flush(table, null, null, true);
     // Flush the metadata table too because it will have a reference to the WAL

http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 77198df..55379a4 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
@@ -405,6 +406,9 @@ public class ReplicationIT extends ConfigurableMacBase {
     // Create two tables
     conn.tableOperations().create(table1);
     conn.tableOperations().create(table2);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
+    // wait for permission to propagate
+    Thread.sleep(5000);
 
     // Enable replication on table1
     conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -467,14 +471,10 @@ public class ReplicationIT extends ConfigurableMacBase {
 
     // After writing data, we'll get a replication table online
     Assert.assertTrue(ReplicationTable.isOnline(conn));
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
 
     Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1),
conn.tableOperations().tableIdMap().get(table2));
     Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds);
 
-    // Wait to make sure the table permission propagate
-    Thread.sleep(5000);
-
     s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     s.setRange(MetadataSchema.ReplicationSection.getRange());
 
@@ -482,6 +482,11 @@ public class ReplicationIT extends ConfigurableMacBase {
     for (Entry<Key,Value> metadata : s) {
       records.add(metadata);
     }
+    s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    for (Entry<Key,Value> replication : s) {
+      records.add(replication);
+    }
 
     Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size());
 
@@ -580,12 +585,7 @@ public class ReplicationIT extends ConfigurableMacBase {
     // Sleep a sufficient amount of time to ensure that we get the straggling WALs that might
have been created at the end
     Thread.sleep(5000);
 
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Set<String> replFiles = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      replFiles.add(entry.getKey().getRow().toString());
-    }
+    Set<String> replFiles = getReferencesToFilesToBeReplicated(conn);
 
     // We might have a WAL that was use solely for the replication table
     // We want to remove that from our list as it should not appear in the replication table
@@ -608,8 +608,25 @@ public class ReplicationIT extends ConfigurableMacBase {
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
       FileSystem fs = p.getFileSystem(conf);
-      Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected:
" + p, fs.exists(p));
+      if (!fs.exists(p)) {
+        // double-check: the garbage collector can be fast
+        Set<String> currentSet = getReferencesToFilesToBeReplicated(conn);
+        log.info("Current references {}", currentSet);
+        log.info("Looking for reference to {}", replFile);
+        log.info("Contains? {}", currentSet.contains(replFile));
+        Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage
collected: " + p, !currentSet.contains(replFile));
+      }
+    }
+  }
+
+  private Set<String> getReferencesToFilesToBeReplicated(final Connector conn) throws
ReplicationTableOfflineException {
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Set<String> replFiles = new HashSet<>();
+    for (Entry<Key,Value> entry : s) {
+      replFiles.add(entry.getKey().getRow().toString());
     }
+    return replFiles;
   }
 
   @Test


Mime
View raw message