accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [35/50] [abbrv] git commit: ACCUMULO-2819 More test updates for the sequential work assigner
Date Wed, 21 May 2014 01:59:54 GMT
ACCUMULO-2819 More test updates for the sequential work assigner


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9f779184
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9f779184
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9f779184

Branch: refs/heads/ACCUMULO-378
Commit: 9f779184cbfa8e3c18be137f8e0471bc0bae4491
Parents: 26a88b4
Author: Josh Elser <elserj@apache.org>
Authored: Mon May 19 15:40:49 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Mon May 19 15:40:49 2014 -0400

----------------------------------------------------------------------
 .../DistributedWorkQueueWorkAssigner.java       |   6 +-
 .../master/replication/ReplicationDriver.java   |   1 -
 .../tserver/log/TabletServerLogger.java         |  10 +-
 .../test/replication/MockReplicaSystem.java     |  69 +++++++++++++
 .../test/replication/CyclicReplicationIT.java   |  27 +++--
 .../replication/ReplicationSequentialIT.java    |  60 ++++++++---
 .../test/replication/ReplicationWithGCIT.java   | 102 ++++++-------------
 .../replication/ReplicationWithMakerTest.java   |  10 +-
 test/src/test/resources/log4j.properties        |   1 +
 9 files changed, 179 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 84f9af5..f04f3e8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -217,8 +217,6 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
       return;
     }
 
-    log.info("Creating batchscanner to read Work records from the replication table");
-
     WorkSection.limit(bs);
     bs.setRanges(Collections.singleton(new Range()));
     Text buffer = new Text();
@@ -261,12 +259,12 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner
{
         }
       }
     } finally {
+      log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
+
       if (null != bs) {
         bs.close();
       }
     }
-
-    log.info("Created work entries for {} files", filesWorkWasCreatedFrom);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 75fe5f3..3069c97 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -24,7 +24,6 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.master.Master;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.log4j.Logger;
 
 /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 67127f1..d8c4279 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -239,7 +239,7 @@ public class TabletServerLogger {
     return write(sessions, mincFinish, writer);
   }
 
-  private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer
writer) throws IOException {
+  private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer
writer) throws IOException {
     // Work very hard not to lock this during calls to the outside world
     int currentLogSet = logSetId.get();
 
@@ -268,7 +268,7 @@ public class TabletServerLogger {
               }
 
               // Need to release
-              KeyExtent extent = commitSession.getExtent();tserver.getTableConfiguration(extent).getNamespaceConfiguration();
+              KeyExtent extent = commitSession.getExtent();
               if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent)))
{
                 Set<String> logs = new HashSet<String>();
                 for (DfsLogger logger : copy) {
@@ -328,6 +328,7 @@ public class TabletServerLogger {
           @Override
           void withWriteLock() throws IOException {
             close();
+            closeForReplication(sessions);
           }
         });
       }
@@ -343,11 +344,16 @@ public class TabletServerLogger {
       @Override
       void withWriteLock() throws IOException {
         close();
+        closeForReplication(sessions);
       }
     });
     return seq;
   }
 
+  protected void closeForReplication(Collection<CommitSession> sessions) {
+    // TODO We can close the WAL here for replication purposes
+  }
+
   public int defineTablet(final CommitSession commitSession) throws IOException {
     // scribble this into the metadata tablet, too.
     if (!enabled(commitSession))

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
new file mode 100644
index 0000000..cafd9b7
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Fake ReplicaSystem which returns that the data was fully replicated after some sleep period
(in milliseconds)
+ * <p>
+ * Default sleep amount is 0ms
+ */
+public class MockReplicaSystem implements ReplicaSystem {
+  private static final Logger log = LoggerFactory.getLogger(MockReplicaSystem.class);
+
+  private long sleep = 0;
+
+  @Override
+  public Status replicate(Path p, Status status, ReplicationTarget target) {
+    Status.Builder builder = Status.newBuilder(status);
+    if (status.getInfiniteEnd()) {
+      builder.setBegin(Long.MAX_VALUE);
+    } else {
+      builder.setBegin(status.getEnd());
+    }
+
+    try {
+      Thread.sleep(sleep);
+    } catch (InterruptedException e) {
+      log.error("Interrupted while sleeping, will report no progress", e);
+      Thread.currentThread().interrupt();
+      return status;
+    }
+    
+
+    Status newStatus = builder.build();
+    log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+    return newStatus;
+  }
+
+  @Override
+  public void configure(String configuration) {
+    try {
+      sleep = Long.parseLong(configuration);
+    } catch (NumberFormatException e) {
+      log.warn("Could not parse {} as an integer, using default sleep of {}", configuration,
sleep, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index 75f6a3f..b30dc39 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -48,6 +49,8 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 
@@ -55,6 +58,7 @@ import com.google.common.collect.Iterables;
  * 
  */
 public class CyclicReplicationIT {
+  private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class);
 
   @Rule
   public TestName testName = new TestName();
@@ -80,7 +84,7 @@ public class CyclicReplicationIT {
     out.close();
   }
 
-  @Test
+  @Test(timeout = 5 * 60 * 1000)
   public void dataIsNotOverReplicated() throws Exception {
     File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2");
     String password = "password";
@@ -155,9 +159,12 @@ public class CyclicReplicationIT {
       Mutation m = new Mutation("row");
       m.put("count", "", "1");
       bw.addMutation(m);
-      bw.flush();
       bw.close();
 
+      Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Cluster.getInstanceName());
+
+      log.info("Found {} that need replication from master1", files);
+
       // Kill and restart the tserver to close the WAL on master1
       for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
         master1Cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -165,21 +172,26 @@ public class CyclicReplicationIT {
 
       master1Cluster.exec(TabletServer.class);
 
+      log.info("Restarted tserver on master1");
+
       // Sanity check that the element is there on master1
       Scanner s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
       Entry<Key,Value> entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
-      Thread.sleep(5000);
-
       // Wait for this table to replicate
-      connMaster1.replicationOperations().drain(master1Cluster.getInstanceName());
+      connMaster1.replicationOperations().drain(master1Cluster.getInstanceName(), files);
+
+      Thread.sleep(5000);
 
       // Check that the element made it to master2 only once
       s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
       entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
+      // Wait for master2 to finish replicating it back
+      files = connMaster2.replicationOperations().referencedFiles(master2Cluster.getInstanceName());
+
       // Kill and restart the tserver to close the WAL on master2
       for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
         master2Cluster.killProcess(ServerType.TABLET_SERVER, proc);
@@ -192,10 +204,9 @@ public class CyclicReplicationIT {
       entry = Iterables.getOnlyElement(s);
       Assert.assertEquals("1", entry.getValue().toString());
 
-      Thread.sleep(5000);
+      connMaster2.replicationOperations().drain(master2Cluster.getInstanceName(), files);
 
-      // Wait for master2 to finish replicating it back
-      connMaster2.replicationOperations().drain(master2Cluster.getInstanceName());
+      Thread.sleep(5000);
 
       // Verify that the entry wasn't sent back to master1
       s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index 0683a57..dce4e17 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -44,10 +44,13 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -86,8 +89,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
     cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
     cfg.setProperty(Property.REPLICATION_NAME, "master");
     cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
-    cfg.useMiniDFS(true);
-//    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
   @Test(timeout = 60 * 5000)
@@ -158,6 +160,31 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       }
     }
 
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    cluster.exec(TabletServer.class);
+
+    log.info("TabletServer restarted");
+    for (@SuppressWarnings("unused") Entry<Key,Value> e : ReplicationTable.getScanner(connMaster))
{}
+    log.info("TabletServer is online");
+
+    log.info("");
+    log.info("Fetching metadata records:");
+    for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
{
+      if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+        log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+      } else {
+        log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+      }
+    }
+
+    log.info("");
+    log.info("Fetching replication records:");
+    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
{
+      log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+    }
+
     Future<Boolean> future = executor.submit(new Callable<Boolean>() {
 
       @Override
@@ -169,12 +196,17 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       
     });
 
-    connMaster.tableOperations().compact(masterTable, null, null, true, true);
+    try {
+      future.get(30, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+      Assert.fail("Drain did not finish within 5 seconds");
+    }
+
+    log.info("drain completed");
 
     log.info("");
-    log.info("Compaction completed");
-
-    log.debug("");
+    log.info("Fetching metadata records:");
     for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY))
{
       if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
         log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
@@ -183,13 +215,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
       }
     }
 
-    try {
-      future.get(15, TimeUnit.SECONDS);
-    } catch (TimeoutException e) {
-      Assert.fail("Drain did not finish within 5 seconds");
-    }
-
     log.info("");
+    log.info("Fetching replication records:");
     for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY))
{
       log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
     }
@@ -300,8 +327,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
         Thread.sleep(500);
       }
 
-      connMaster.tableOperations().compact(masterTable1, null, null, true, false);
-      connMaster.tableOperations().compact(masterTable2, null, null, true, false);
+      // Restart the tserver to force a close on the WAL
+      for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER))
{
+        cluster.killProcess(ServerType.TABLET_SERVER, proc);
+      }
+      cluster.exec(TabletServer.class);
+
+      log.info("Restarted the tserver");
 
       // Wait until we fully replicated something
       boolean fullyReplicated = false;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 101001f..23da719 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -58,7 +57,6 @@ import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
@@ -76,33 +74,6 @@ import com.google.protobuf.TextFormat;
 public class ReplicationWithGCIT extends ConfigurableMacIT {
   private static final Logger log = LoggerFactory.getLogger(ReplicationWithGCIT.class);
 
-  /**
-   * Fake ReplicaSystem which immediately returns that the data was fully replicated
-   */
-  public static class MockReplicaSystem implements ReplicaSystem {
-    private static final Logger log = LoggerFactory.getLogger(MockReplicaSystem.class);
-
-    public MockReplicaSystem() {}
-
-    @Override
-    public Status replicate(Path p, Status status, ReplicationTarget target) {
-      Status.Builder builder = Status.newBuilder(status);
-      if (status.getInfiniteEnd()) {
-        builder.setBegin(Long.MAX_VALUE);
-      } else {
-        builder.setBegin(status.getEnd());
-      }
-
-      Status newStatus = builder.build();
-      log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
-      return newStatus;
-    }
-
-    @Override
-    public void configure(String configuration) {}
-
-  }
-
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setNumTservers(1);
@@ -358,10 +329,10 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
 
   }
 
-  @Test
+  @Test(timeout = 5 * 60 * 1000)
   public void replicatedStatusEntriesAreDeleted() throws Exception {
-    Connector conn = getConnector();
-    FileSystem fs = FileSystem.getLocal(new Configuration());
+    final Connector conn = getConnector();
+    log.info("Got connector to MAC");
     String table1 = "table1";
 
     // replication shouldn't exist when we begin
@@ -377,8 +348,9 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
         conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
         // Replicate table1 to cluster1 in the table with id of '4'
         conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey()
+ "cluster1", "4");
+        // Use the MockReplicaSystem impl and sleep for 5seconds
         conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
+            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
         attempts = 0;
       } catch (Exception e) {
         attempts--;
@@ -469,17 +441,23 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     }
 
     /**
-     * By this point, we should have data ingested into a table, with at least one WAL as
a candidate for replication. It may or may not yet be closed.
+     * By this point, we should have data ingested into a table, with at least one WAL as
a candidate for replication. Compacting the table should close all
+     * open WALs, which should ensure all records we're going to replicate have entries in
the replication table, and nothing will exist in the metadata table
+     * anymore
      */
 
+    log.info("Killing tserver");
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
     for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
       cluster.killProcess(ServerType.TABLET_SERVER, proc);
     }
 
+    log.info("Starting tserver");
     cluster.exec(TabletServer.class);
 
+    log.info("Waiting to read tables");
+
     // Make sure we can read all the tables (recovery complete)
     for (String table : new String[] {MetadataTable.NAME, table1}) {
       s = conn.createScanner(table, new Authorizations());
@@ -487,48 +465,39 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
       Entry<Key,Value> entry : s) {}
     }
 
-    // Need to make sure we get the entries in metadata
-    boolean foundResults = false;
-    for (int i = 0; i < 5 && !foundResults; i++) {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.setRange(ReplicationSection.getRange());
-      if (Iterables.size(s) > 0) {
-        foundResults = true;
-      }
-      Thread.sleep(1000);
-    }
-
-    Assert.assertTrue("Did not find any replication entries in the metadata table", foundResults);
-
+    log.info("Checking for replication entries in replication");
     // Then we need to get those records over to the replication table
-    foundResults = false;
-    for (int i = 0; i < 5 && !foundResults; i++) {
+    boolean foundResults = false;
+    for (int i = 0; i < 5; i++) {
       s = ReplicationTable.getScanner(conn);
       if (Iterables.size(s) > 0) {
         foundResults = true;
+        break;
       }
       Thread.sleep(1000);
     }
 
     Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
 
-    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    for (Entry<Key,Value> entry : s) {
-      String row = entry.getKey().getRow().toString();
-      Path file = new Path(row.substring(ReplicationSection.getRowPrefix().length()));
-      Assert.assertTrue(file + " did not exist when it should", fs.exists(file));
+    // We expect no records in the metadata table after compaction. We have to poll
+    // because we have to wait for the StatusMaker's next iteration which will clean
+    // up the dangling record after we create the record in the replication table
+    foundResults = true;
+    for (int i = 0; i < 5; i++) {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      s.setRange(ReplicationSection.getRange());
+      if (Iterables.size(s) == 0) {
+        foundResults = false;
+        break;
+      }
+      Thread.sleep(1000);
     }
 
-    /**
-     * After recovery completes, we should have unreplicated, closed Status messages. The
close happens at the beginning of log recovery.
-     * The MockReplicaSystem we configured will just automatically say the data has been
replicated, so this should then created replicated
-     * and closed Status messages.
-     */
+    Assert.assertFalse("Replication status messages were not cleaned up from metadata table,
check why the StatusMaker didn't delete them", foundResults);
 
     /**
      * After we set the begin to Long.MAX_VALUE, the RemoveCompleteReplicationRecords class
will start deleting the records which have been closed by
-     * CloseWriteAheadLogReferences (which will have been working since we restarted the
tserver(s))
+     * the minor compaction and replicated by the MockReplicaSystem
      */
 
     // Wait for a bit since the GC has to run (should be running after a one second delay)
@@ -552,16 +521,5 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     }
 
     Assert.assertEquals("Found unexpected replication records in the replication table",
0, recordsFound);
-
-    // If the replication table entries were deleted, so should the metadata table replication
entries
-    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    recordsFound = 0;
-    for (Entry<Key,Value> entry : s) {
-      recordsFound++;
-      log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n",
", "));
-    }
-
-    Assert.assertEquals("Found unexpected replication records in the metadata table", 0,
recordsFound);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index aee8a1e..03ac72c 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -27,10 +27,10 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -142,13 +142,13 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
     Scanner s = ReplicationTable.getScanner(conn);
     StatusSection.limit(s);
     Entry<Key,Value> entry = null;
+    Status expectedStatus = StatusUtil.openWithUnknownLength();
     attempts = 5;
     // This record will move from new to new with infinite length because of the minc (flush)
     while (null == entry && attempts > 0) {
       try {
         entry = Iterables.getOnlyElement(s);
-        Status actualStatus = Status.parseFrom(entry.getValue().get());
-        if (!actualStatus.hasClosedTime() || !actualStatus.getClosed()) {
+        if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) {
           entry = null;
           // the master process didn't yet fire and write the new mutation, wait for it to
do
           // so and try to read it again
@@ -171,9 +171,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
     }
 
     Assert.assertNotNull("Could not find expected entry in replication table", entry);
-    Status actualStatus = Status.parseFrom(entry.getValue().get());
-    Assert.assertTrue("Expected to find a replication entry that is closed with infinite
length: " + ProtobufUtil.toString(actualStatus),
-        actualStatus.getClosed() && actualStatus.hasClosedTime());
+    Assert.assertEquals("Expected to find a replication entry that is open with infinite
length", expectedStatus, Status.parseFrom(entry.getValue().get()));
 
     // Try a couple of times to watch for the work record to be created
     boolean notFound = true;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9f779184/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 171d690..11ff405 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -19,6 +19,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
 log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
 
 log4j.logger.org.apache.accumulo.core=DEBUG
+log4j.logger.org.apache.accumulo.core.client.impl.MasterClient=INFO
 log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
 log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
 log4j.logger.org.apache.accumulo.core.util.shell.Shell=FATAL


Mime
View raw message