accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [09/10] accumulo git commit: ACCUMULO-3320 Integration test to ensure that WALs are not closed when a tserver may continue to use it.
Date Mon, 10 Nov 2014 21:46:28 GMT
ACCUMULO-3320 Integration test to ensure that WALs are not closed when a tserver may continue
to use it.

Also test for the converse that after a WAL is unused by a tserver, it is still closed.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/09cb6b2a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/09cb6b2a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/09cb6b2a

Branch: refs/heads/master
Commit: 09cb6b2a82d6634c1bd991ed4a0012fb87a459f3
Parents: 84191c5
Author: Josh Elser <elserj@apache.org>
Authored: Sun Nov 9 12:19:05 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Nov 10 13:34:51 2014 -0800

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  12 +-
 .../core/replication/ReplicationTable.java      |   3 +
 .../CloseWriteAheadLogReferences.java           |  11 +-
 .../master/replication/ReplicationDriver.java   |   6 +
 ...arbageCollectorCommunicatesWithTServers.java | 446 +++++++++++++++++++
 5 files changed, 470 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f59b654..1195668 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -206,12 +206,12 @@ public enum Property {
   TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling
client connections on the tablet servers"),
   @Deprecated
   TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", PropertyType.MEMORY,
-      "This setting is deprecated. See tserver.total.mutation.queue.max. " 
+      "This setting is deprecated. See tserver.total.mutation.queue.max. "
           + "The amount of memory to use to store write-ahead-log mutations-per-session before
flushing them. Since the buffer is per write session, consider the"
           + " max number of concurrent writer when configuring. When using Hadoop 2, Accumulo
will call hsync() on the WAL . For a small number of "
           + "concurrent writers, increasing this buffer size decreases the frequncy of hsync
calls. For a large number of concurrent writers a small buffers "
           + "size is ok because of group commit."),
-  TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY,

+  TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY,
       "The amount of memory used to store write-ahead-log mutations before flushing them."),
   TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "30",
PropertyType.COUNT,
       "To find a tablets split points, all index files are opened. This setting determines
how many index "
@@ -502,6 +502,8 @@ public enum Property {
   REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum
size of data to send in a replication message"),
   REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.UnorderedWorkAssigner",
PropertyType.CLASSNAME,
       "Replication WorkAssigner implementation to use"),
+  REPLICATION_DRIVER_DELAY("replication.driver.delay", "0s", PropertyType.TIMEDURATION,
+      "Amount of time to wait before the replication work loop begins in the master."),
   REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION,
"Amount of time to wait before first checking for replication work, not useful outside of
tests"),
   REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION,
"Amount of time to wait before re-checking for replication work, not useful outside of tests"),
 
@@ -814,7 +816,7 @@ public enum Property {
 
   /**
    * Creates a new instance of a class specified in a configuration property. The table classpath
context is used if set.
-   * 
+   *
    * @param conf
    *          configuration containing property
    * @param property
@@ -835,7 +837,7 @@ public enum Property {
 
   /**
    * Creates a new instance of a class specified in a configuration property.
-   * 
+   *
    * @param conf
    *          configuration containing property
    * @param property
@@ -856,7 +858,7 @@ public enum Property {
   /**
    * Collects together properties from the given configuration pertaining to compaction strategies.
The relevant properties all begin with the prefix in
    * {@link #TABLE_COMPACTION_STRATEGY_PREFIX}. In the returned map, the prefix is removed
from each property's key.
-   * 
+   *
    * @param tableConf
    *          configuration
    * @return map of compaction strategy property keys and values, with the detection prefix
removed from each key

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
index ec7c202..c6f8ada 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -37,10 +37,12 @@ import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableMap;
 
 public class ReplicationTable {
+  private static final Logger log = Logger.getLogger(ReplicationTable.class);
 
   public static final String ID = "+rep";
   public static final String NAME = Namespaces.ACCUMULO_NAMESPACE + ".replication";
@@ -90,6 +92,7 @@ public class ReplicationTable {
 
   public static void setOnline(Connector conn) throws AccumuloSecurityException, AccumuloException
{
     try {
+      log.info("Bringing replication table online");
       conn.tableOperations().online(NAME, true);
     } catch (TableNotFoundException e) {
       throw new AssertionError(NAME + " should exist, but doesn't.");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb74f18..a41e965 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
@@ -121,6 +122,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
     }
 
     log.info("Found " + referencedWals.size() + " WALs referenced in metadata in " + sw.toString());
+    log.debug("Referenced WALs: " + referencedWals);
     sw.reset();
 
     /*
@@ -202,6 +204,8 @@ public class CloseWriteAheadLogReferences implements Runnable {
         // The value may contain multiple WALs
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
 
+        log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
+
         // Normalize each log file (using Path) and add it to the set
         for (String logFile : logEntry.logSet) {
           referencedWals.add(normalizedWalPaths.get(logFile));
@@ -251,12 +255,13 @@ public class CloseWriteAheadLogReferences implements Runnable {
         }
 
         // Ignore things that aren't completely replicated as we can't delete those anyways
-        entry.getKey().getRow(replFileText);
-        String replFile = replFileText.toString().substring(ReplicationSection.getRowPrefix().length());
+        MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
+        String replFile = replFileText.toString();
+        boolean isReferenced = referencedWals.contains(replFile);
 
         // We only want to clean up WALs (which is everything but rfiles) and only when
         // metadata doesn't have a reference to the given WAL
-        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !referencedWals.contains(replFile))
{
+        if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isReferenced)
{
           try {
             closeWal(bw, entry.getKey());
             recordsClosed++;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index ea378f4..f822a90 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -55,6 +55,12 @@ public class ReplicationDriver extends Daemon {
   public void run() {
     CountSampler sampler = new CountSampler(10);
 
+    long millisToWait = conf.getTimeInMillis(Property.REPLICATION_DRIVER_DELAY);
+    log.debug("Waiting " + millisToWait + "ms before starting main replication loop");
+    UtilWaitThread.sleep(millisToWait);
+
+    log.debug("Starting replication loop");
+
     while (master.stillMaster()) {
       if (null == workMaker) {
         try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/09cb6b2a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
new file mode 100644
index 0000000..dff2726
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServers.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientConfigurationHelper;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.AbstractMacIT;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.bouncycastle.util.Arrays;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer
may still continue to use it. Checking that no tablet references a
+ * WAL is insufficient to determine if a WAL will never be used in the future.
+ */
+public class GarbageCollectorCommunicatesWithTServers extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServers.class);
+
+  private final int GC_PERIOD_SECONDS = 1;
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
+    // Wait longer to try to let the replication table come online before a cycle runs
+    cfg.setProperty(Property.GC_CYCLE_START, "10s");
+    cfg.setProperty(Property.REPLICATION_NAME, "master");
+    // Set really long delays for the master to do stuff for replication
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s");
+    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s");
+    cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  /**
+   * Fetch all of the WALs referenced by tablets in the metadata table for this table
+   */
+  private Set<String> getWalsForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+
+    Set<String> wals = new HashSet<String>();
+    for (Entry<Key,Value> entry : s) {
+      log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      // hostname:port/uri://path/to/wal
+      String cq = entry.getKey().getColumnQualifier().toString();
+      int index = cq.indexOf('/');
+      // Normalize the path
+      String path = new Path(cq.substring(index + 1)).toString();
+      log.debug("Extracted file: " + path);
+      wals.add(path);
+    }
+
+    return wals;
+  }
+
+  /**
+   * Fetch all of the rfiles referenced by tablets in the metadata table for this table
+   */
+  private Set<String> getFilesForTable(String tableName) throws Exception {
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    Set<String> rfiles = new HashSet<String>();
+    for (Entry<Key,Value> entry : s) {
+      log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      // uri://path/to/wal
+      String cq = entry.getKey().getColumnQualifier().toString();
+      String path = new Path(cq).toString();
+      log.debug("Normalize path to rfile: {}", path);
+      rfiles.add(path);
+    }
+
+    return rfiles;
+  }
+
+  /**
+   * Get the replication status messages for the given table that exist in the metadata table
(~repl entries)
+   */
+  private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception
{
+    final Connector conn = getConnector();
+    final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
+
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    Range r = MetadataSchema.ReplicationSection.getRange();
+    s.setRange(r);
+    s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId));
+
+    Map<String,Status> fileToStatus = new HashMap<String,Status>();
+    for (Entry<Key,Value> entry : s) {
+      Text file = new Text();
+      MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+      Status status = Status.parseFrom(entry.getValue().get());
+      log.info("Got status for {}: {}", file, ProtobufUtil.toString(status));
+      fileToStatus.put(file.toString(), status);
+    }
+
+    return fileToStatus;
+  }
+
+  @Test
+  public void testActiveWalPrecludesClosing() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.flush();
+
+    log.info("Checking that metadata only has one WAL recorded for this table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Waiting for replication table to come online");
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    Assert.assertEquals("Expected log file name from tablet to equal replication entry",
wals.iterator().next(), walName);
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
+
+    log.info("Checking to see that log entries are removed from tablet section after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction,
filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    // At this point in time, we *know* that the GarbageCollector has run which means that
the Status
+    // for our WAL should not be altered.
+
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc,
0, walsAfterMajc.size());
+
+    Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status message: " +
fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    Assert.assertEquals("Status before and after MinC should be identical", fileToStatus,
fileToStatusAfterMinc);
+  }
+
+  @Test
+  public void testUnreferencedWalInTserverIsClosed() throws Exception {
+    final String[] names = getUniqueNames(2);
+    // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver
+    final String table = names[0], otherTable = names[1];
+    final Connector conn = getConnector();
+
+    // Bring the replication table online first and foremost
+    ReplicationTable.setOnline(conn);
+
+    log.info("Creating {}", table);
+    conn.tableOperations().create(table);
+
+    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+
+    log.info("Writing a few mutations to the table");
+
+    BatchWriter bw = conn.createBatchWriter(table, null);
+
+    byte[] empty = new byte[0];
+    for (int i = 0; i < 5; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, empty);
+      bw.addMutation(m);
+    }
+
+    log.info("Flushing mutations to the server");
+    bw.close();
+
+    log.info("Checking that metadata only has one WAL recorded for this table");
+
+    Set<String> wals = getWalsForTable(table);
+    Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size());
+
+    log.info("Compacting the table which will remove all WALs from the tablets");
+
+    // Flush our test table to remove the WAL references in it
+    conn.tableOperations().flush(table, null, null, true);
+    // Flush the metadata table too because it will have a reference to the WAL
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+
+    log.info("Waiting for replication table to come online");
+
+    log.info("Fetching replication statuses from metadata table");
+
+    Map<String,Status> fileToStatus = getMetadataStatusForTable(table);
+
+    Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size());
+
+    String walName = fileToStatus.keySet().iterator().next();
+    Assert.assertEquals("Expected log file name from tablet to equal replication entry",
wals.iterator().next(), walName);
+
+    Status status = fileToStatus.get(walName);
+
+    Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
+
+    log.info("Checking to see that log entries are removed from tablet section after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
+
+    Set<String> filesForTable = getFilesForTable(table);
+    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
+    log.info("Files for table before MajC: {}", filesForTable);
+
+    // Issue a MajC to roll a new file in HDFS
+    conn.tableOperations().compact(table, null, null, false, true);
+
+    Set<String> filesForTableAfterCompaction = getFilesForTable(table);
+
+    log.info("Files for table after MajC: {}", filesForTableAfterCompaction);
+
+    Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size());
+    Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction,
filesForTable);
+
+    // Use the rfile which was just replaced by the MajC to determine when the GC has ran
+    Path fileToBeDeleted = new Path(filesForTable.iterator().next());
+    FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration());
+
+    boolean fileExists = fs.exists(fileToBeDeleted);
+    while (fileExists) {
+      log.info("File which should get deleted still exists: {}", fileToBeDeleted);
+      Thread.sleep(2000);
+      fileExists = fs.exists(fileToBeDeleted);
+    }
+
+    // At this point in time, we *know* that the GarbageCollector has run which means that
the Status
+    // for our WAL should not be altered.
+
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc,
0, walsAfterMajc.size());
+
+    Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
+    Assert.assertEquals("Expected to still find only one replication status message: " +
fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
+
+    Assert.assertEquals("Status before and after MinC should be identical", fileToStatus,
fileToStatusAfterMinc);
+
+    /*
+     * To verify that the WALs is still getting closed, we have to force the tserver to close
the existing WAL and open a new one instead. The easiest way to do
+     * this is to write a load of data that will exceed the 1.33% full threshold that the
logger keeps track of
+     */
+
+    conn.tableOperations().create(otherTable);
+    bw = conn.createBatchWriter(otherTable, null);
+    // 500k
+    byte[] bigValue = new byte[1024 * 500];
+    Arrays.fill(bigValue, (byte)1);
+    // 500k * 50
+    for (int i = 0; i < 50; i++) {
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put(empty, empty, bigValue);
+      bw.addMutation(m);
+      if (i % 10 == 0) {
+        bw.flush();
+      }
+    }
+
+    bw.close();
+
+    conn.tableOperations().flush(otherTable, null, null, true);
+
+    final TCredentials tcreds = new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)).toThrift(conn.getInstance());
+
+    // Get the tservers which the master deems as active
+    List<String> tservers = MasterClient.execute(conn.getInstance(), new ClientExecReturn<List<String>,MasterClientService.Client>()
{
+      @Override
+      public List<String> execute(MasterClientService.Client client) throws Exception
{
+        return client.getActiveTservers(Tracer.traceInfo(), tcreds);
+      }
+    });
+
+    Assert.assertEquals("Expected only one active tservers", 1, tservers.size());
+
+    String tserver = tservers.get(0);
+    AccumuloConfiguration rpcConfig = ClientConfigurationHelper.getClientRpcConfiguration(conn.getInstance());
+
+    // Get the active WALs from that server
+    log.info("Fetching active WALs from {}", tserver);
+
+    Client client = ThriftUtil.getTServerClient(tserver, rpcConfig);
+    List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), tcreds);
+
+    log.info("Active wals: {}", activeWalsForTserver);
+
+    Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size());
+
+    String activeWal = new Path(activeWalsForTserver.get(0)).toString();
+
+    Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL
we saw", walName, activeWal);
+
+    log.info("Ensuring that replication status does get closed after WAL is no longer in
use by Tserver");
+
+    do {
+      Map<String,Status> replicationStatuses = getMetadataStatusForTable(table);
+
+      log.info("Got replication status messages {}", replicationStatuses);
+      Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size());
+
+      status = replicationStatuses.values().iterator().next();
+      log.info("Current status: {}", ProtobufUtil.toString(status));
+
+      if (status.getClosed()) {
+        return;
+      }
+
+      log.info("Status is not yet closed, waiting for garbage collector to close it");
+
+      Thread.sleep(2000);
+    } while (true);
+  }
+}


Mime
View raw message