accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-3871 broke replication with log markers in zookeeper
Date Mon, 01 Jun 2015 20:47:48 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 5fbd67157 -> 4f8d61db5


ACCUMULO-3871 broke replication with log markers in zookeeper


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a108651e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a108651e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a108651e

Branch: refs/heads/master
Commit: a108651e3338f582b35fb7c44c4e29759ec3a41e
Parents: f6bd3ee
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Jun 1 16:47:17 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Jun 1 16:47:17 2015 -0400

----------------------------------------------------------------------
 .../CloseWriteAheadLogReferences.java           | 88 +++-----------------
 .../CloseWriteAheadLogReferencesTest.java       | 69 ---------------
 ...bageCollectorCommunicatesWithTServersIT.java |  6 +-
 3 files changed, 12 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index a3652e2..1444127 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -17,7 +17,6 @@
 package org.apache.accumulo.gc.replication;
 
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
@@ -44,7 +43,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.log.WalStateManager;
@@ -102,54 +100,23 @@ public class CloseWriteAheadLogReferences implements Runnable {
     }
 
     Span findWalsSpan = Trace.start("findReferencedWals");
-    HashSet<String> referencedWals = null;
+    HashSet<String> closed = null;
     try {
       sw.start();
-      referencedWals = getReferencedWals(conn);
+      closed = getClosedLogs(conn);
     } finally {
       sw.stop();
       findWalsSpan.stop();
     }
 
-    log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString());
-    log.debug("Referenced WALs: " + referencedWals);
+    log.info("Found " + closed.size() + " WALs referenced in metadata in " + sw.toString());
     sw.reset();
 
-    // ACCUMULO-3320 WALs cannot be closed while a TabletServer may still use it later.
-    //
-    // In addition to the WALs that are actively referenced in the metadata table, tservers
can also hold on to a WAL that is not presently referenced by any
-    // tablet. For example, a tablet could MinC which would end in all logs for that tablet
being removed. However, if more data was ingested into the table,
-    // the same WAL could be re-used again by that tserver.
-    //
-    // If this code happened to run after the compaction but before the log is again referenced
by a tabletserver, we might delete the WAL reference, only to
-    // have it recreated again which causes havoc with the replication status for a table.
-    final TInfo tinfo = Tracer.traceInfo();
-    Set<String> activeWals;
-    Span findActiveWalsSpan = Trace.start("findActiveWals");
-    try {
-      sw.start();
-      activeWals = getActiveWals(tinfo);
-    } finally {
-      sw.stop();
-      findActiveWalsSpan.stop();
-    }
-
-    if (null == activeWals) {
-      log.warn("Could not compute the set of currently active WALs. Not closing any files");
-      return;
-    }
-
-    log.debug("Got active WALs from all tservers " + activeWals);
-
-    referencedWals.addAll(activeWals);
-
-    log.info("Found " + activeWals.size() + " WALs actively in use by TabletServers in "
+ sw.toString());
-
     Span updateReplicationSpan = Trace.start("updateReplicationTable");
     long recordsClosed = 0;
     try {
       sw.start();
-      recordsClosed = updateReplicationEntries(conn, referencedWals);
+      recordsClosed = updateReplicationEntries(conn, closed);
     } finally {
       sw.stop();
       updateReplicationSpan.stop();
@@ -158,8 +125,6 @@ public class CloseWriteAheadLogReferences implements Runnable {
     log.info("Closed " + recordsClosed + " WAL replication references in replication table
in " + sw.toString());
   }
 
-  static final EnumSet<WalState> NOT_OPEN = EnumSet.complementOf(EnumSet.of(WalState.OPEN));
-
   /**
    * Construct the set of referenced WALs from zookeeper
    *
@@ -167,22 +132,22 @@ public class CloseWriteAheadLogReferences implements Runnable {
    *          Connector
    * @return The Set of WALs that are referenced in the metadata table
    */
-  protected HashSet<String> getReferencedWals(Connector conn) {
+  protected HashSet<String> getClosedLogs(Connector conn) {
     WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
 
-    HashSet<String> referencedWals = new HashSet<>();
+    HashSet<String> result = new HashSet<>();
     try {
       for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
-        if (NOT_OPEN.contains(entry.getValue())) {
+        if (entry.getValue() == WalState.UNREFERENCED || entry.getValue() == WalState.CLOSED)
{
           Path path = entry.getKey();
-          log.debug("Found WAL " + path.toString());
-          referencedWals.add(path.toString());
+          log.debug("Found closed WAL " + path.toString());
+          result.add(path.toString());
         }
       }
     } catch (WalMarkerException e) {
       throw new RuntimeException(e);
     }
-    return referencedWals;
+    return result;
   }
 
   /**
@@ -292,39 +257,6 @@ public class CloseWriteAheadLogReferences implements Runnable {
   }
 
   /**
-   * Fetch the set of WALs in use by tabletservers
-   *
-   * @return Set of WALs in use by tservers, null if they cannot be computed for some reason
-   */
-  protected Set<String> getActiveWals(TInfo tinfo) {
-    List<String> tservers = getActiveTservers(tinfo);
-
-    // Compute the total set of WALs used by tservers
-    Set<String> walogs = null;
-    if (null != tservers) {
-      walogs = new HashSet<String>();
-      // TODO If we have a lot of tservers, this might start to take a fair amount of time
-      // Consider adding a threadpool to parallelize the requests.
-      // Alternatively, we might have to move to a solution that doesn't involve tserver
RPC
-      for (String tserver : tservers) {
-        HostAndPort address = HostAndPort.fromString(tserver);
-        List<String> activeWalsForServer = getActiveWalsForServer(tinfo, address);
-        if (null == activeWalsForServer) {
-          log.debug("Could not fetch active wals from " + address);
-          return null;
-        }
-        log.debug("Got raw active wals for " + address + ", " + activeWalsForServer);
-        for (String activeWal : activeWalsForServer) {
-          // Normalize the WAL URI
-          walogs.add(new Path(activeWal).toString());
-        }
-      }
-    }
-
-    return walogs;
-  }
-
-  /**
    * Get the active tabletservers as seen by the master.
    *
    * @return The active tabletservers, null if they can't be computed.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 9af60dc..af8a9d0 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -20,12 +20,9 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -49,7 +46,6 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.replication.StatusUtil;
@@ -64,7 +60,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
 
 public class CloseWriteAheadLogReferencesTest {
 
@@ -177,68 +172,4 @@ public class CloseWriteAheadLogReferencesTest {
     Assert.assertFalse(status.getClosed());
   }
 
-  @Test
-  public void getActiveWals() throws Exception {
-    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-
-    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
-    EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(tservers);
-    int numWals = 0;
-    for (String tserver : tservers) {
-      EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString(tserver))).andReturn(Arrays.asList("/wal"
+ numWals));
-      numWals++;
-    }
-
-    EasyMock.replay(closeWals);
-
-    Set<String> wals = closeWals.getActiveWals(tinfo);
-
-    EasyMock.verify(closeWals);
-
-    Set<String> expectedWals = new HashSet<String>();
-    for (int i = 0; i < numWals; i++) {
-      expectedWals.add("/wal" + i);
-    }
-
-    Assert.assertEquals(expectedWals, wals);
-  }
-
-  @Test
-  public void offlineMaster() throws Exception {
-    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-
-    EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(null);
-
-    EasyMock.replay(closeWals);
-
-    Set<String> wals = closeWals.getActiveWals(tinfo);
-
-    EasyMock.verify(closeWals);
-
-    Assert.assertNull("Expected to get null for active WALs", wals);
-  }
-
-  @Test
-  public void offlineTserver() throws Exception {
-    CloseWriteAheadLogReferences closeWals = EasyMock.createMockBuilder(CloseWriteAheadLogReferences.class).addMockedMethod("getActiveTservers")
-        .addMockedMethod("getActiveWalsForServer").createMock();
-    TInfo tinfo = EasyMock.createMock(TInfo.class);
-
-    List<String> tservers = Arrays.asList("localhost:12345", "localhost:12346");
-    EasyMock.expect(closeWals.getActiveTservers(tinfo)).andReturn(tservers);
-    EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString("localhost:12345"))).andReturn(Arrays.asList("/wal"
+ 0));
-    EasyMock.expect(closeWals.getActiveWalsForServer(tinfo, HostAndPort.fromString("localhost:12346"))).andReturn(null);
-
-    EasyMock.replay(closeWals);
-
-    Set<String> wals = closeWals.getActiveWals(tinfo);
-
-    EasyMock.verify(closeWals);
-
-    Assert.assertNull("Expected to get null for active WALs", wals);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a108651e/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 38d2276..a961e78 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -257,7 +257,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     Assert.assertEquals("Status before and after MinC should be identical", fileToStatus,
fileToStatusAfterMinc);
   }
 
-  @Test
+  @Test(timeout = 2 * 60 * 1000)
   public void testUnreferencedWalInTserverIsClosed() throws Exception {
     final String[] names = getUniqueNames(2);
     // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver
@@ -305,7 +305,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
 
     String walName = fileToStatus.keySet().iterator().next();
-    Assert.assertEquals("Expected log file name from tablet to equal replication entry",
wals.iterator().next(), walName);
+    Assert.assertTrue("Expected log file name from tablet to equal replication entry", wals.contains(walName));
 
     Status status = fileToStatus.get(walName);
 
@@ -342,8 +342,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
     Assert.assertEquals("Expected to still find only one replication status message: " +
fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 
-    Assert.assertEquals("Status before and after MinC should be identical", fileToStatus,
fileToStatusAfterMinc);
-
     /*
      * To verify that the WALs is still getting closed, we have to force the tserver to close
the existing WAL and open a new one instead. The easiest way to do
      * this is to write a load of data that will exceed the 1.33% full threshold that the
logger keeps track of


Mime
View raw message