accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [01/34] accumulo git commit: ACCUMULO-3625 use log markers against tservers, not tablets
Date Fri, 24 Apr 2015 23:20:47 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 6df71693e -> 51f39d292


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 185a33a..1e508e8 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -16,11 +16,12 @@
  */
 package org.apache.accumulo.test.replication;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,6 +47,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -67,6 +69,7 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -75,11 +78,10 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -123,25 +125,38 @@ public class ReplicationIT extends ConfigurableMacIT {
     cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
     cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
     cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
     cfg.setNumTservers(1);
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
   private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException
{
-    Multimap<String,String> logs = HashMultimap.create();
+    // Map of server to tableId
+    Multimap<TServerInstance, String> serverToTableID = HashMultimap.create();
     Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.fetchColumnFamily(LogColumnFamily.NAME);
-    scanner.setRange(new Range());
+    scanner.setRange(MetadataSchema.TabletsSection.getRange());
+    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+    for (Entry<Key,Value> entry : scanner) {
+      TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
+      byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
+      serverToTableID.put(key, new String(tableId, UTF_8));
+    }
+    // Map of logs to tableId
+    Multimap<String,String> logs = HashMultimap.create();
+    scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
     for (Entry<Key,Value> entry : scanner) {
       if (Thread.interrupted()) {
         return logs;
       }
-
-      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
-      for (String log : logEntry.logSet) {
-        // Need to normalize the log file from LogEntry
-        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
+      Text path = new Text();
+      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+      Text session = new Text();
+      Text hostPort = new Text();
+      MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort , session);
+      TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString()),
session.toString());
+      for (String tableId : serverToTableID.get(server)) {
+        logs.put(new Path(path.toString()).toString(), tableId);
       }
     }
     return logs;
@@ -296,10 +311,12 @@ public class ReplicationIT extends ConfigurableMacIT {
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
       s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+      s.setRange(MetadataSchema.CurrentLogsSection.getRange());
+      s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
       for (Entry<Key,Value> entry : s) {
-        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-        wals.add(new Path(logEntry.filename).toString());
+        Text path = new Text();
+        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+        wals.add(new Path(path.toString()).toString());
       }
       attempts--;
     }
@@ -330,18 +347,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (String table : tables) {
-      BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-
-      for (int j = 0; j < 5; j++) {
-        Mutation m = new Mutation(Integer.toString(j));
-        for (int k = 0; k < 5; k++) {
-          String value = Integer.toString(k);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
+      writeSomeData(conn, table, 5, 5);
     }
 
     // After writing data, still no replication table
@@ -381,18 +387,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-
-    for (int rows = 0; rows < 50; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 50, 50);
 
     // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata
for table1
     // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata
to be removed before we can verify it's there
@@ -439,18 +434,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 
     // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-
-    for (int rows = 0; rows < 50; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table2, 50, 50);
 
     // After the commit on these mutations, we'll get a replication entry in accumulo.metadata
for table2
     // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata
to be removed before we can verify it's there
@@ -498,6 +482,19 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
   }
 
+  private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception
{
+    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+    for (int row = 0; row < rows; row++) {
+      Mutation m = new Mutation(Integer.toString(row));
+      for (int col = 0; col < cols; col++) {
+        String value = Integer.toString(col);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+
   @Test
   public void replicationEntriesPrecludeWalDeletion() throws Exception {
     final Connector conn = getConnector();
@@ -529,53 +526,21 @@ public class ReplicationIT extends ConfigurableMacIT {
     Thread.sleep(2000);
 
     // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 200, 500);
 
     conn.tableOperations().create(table2);
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
     Thread.sleep(2000);
 
-    // Write some data to table2
-    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table2, 200, 500);
 
     conn.tableOperations().create(table3);
     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
     Thread.sleep(2000);
 
-    // Write some data to table3
-    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-    for (int rows = 0; rows < 200; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 500; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table3, 200, 500);
 
     // Force a write to metadata for the data written
     for (String table : Arrays.asList(table1, table2, table3)) {
@@ -609,7 +574,8 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // We should have *some* reference to each log that was seen in the metadata table
     // They might not yet all be closed though (might be newfile)
-    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
+    Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
+    Assert.assertTrue("Difference between replication entries and current logs is bigger
than one", logs.keySet().size() - replFiles.size() <= 1);
 
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
@@ -697,44 +663,11 @@ public class ReplicationIT extends ConfigurableMacIT {
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
 
-      // Write some data to table1
-      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
+      writeSomeData(conn, table1, 200, 500);
 
-      bw.close();
+      writeSomeData(conn, table2, 200, 500);
 
-      // Write some data to table2
-      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
-
-      // Write some data to table3
-      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
+      writeSomeData(conn, table3, 200, 500);
 
       // Flush everything to try to make the replication records
       for (String table : Arrays.asList(table1, table2, table3)) {
@@ -789,10 +722,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Set<String> wals = new HashSet<>();
     for (Entry<Key,Value> entry : s) {
       LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-      for (String file : logEntry.logSet) {
-        Path p = new Path(file);
-        wals.add(p.toString());
-      }
+      wals.add(new Path(logEntry.filename).toString());
     }
 
     log.warn("Found wals {}", wals);
@@ -869,9 +799,7 @@ public class ReplicationIT extends ConfigurableMacIT {
   public void singleTableWithSingleTarget() throws Exception {
     // We want to kill the GC so it doesn't come along and close Status records and mess
up the comparisons
     // against expected Status messages.
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR))
{
-      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
-    }
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 
     Connector conn = getConnector();
     String table1 = "table1";
@@ -905,17 +833,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 2000, 50);
 
     // Make sure the replication table is online at this point
     boolean online = ReplicationTable.isOnline(conn);
@@ -1002,17 +920,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some more data so that we over-run the single WAL
-    bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 3000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 3000, 50);
 
     log.info("Issued compaction for table");
     conn.tableOperations().compact(table1, null, null, true, true);
@@ -1085,17 +993,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 2000, 50);
 
     String tableId = conn.tableOperations().tableIdMap().get(table1);
     Assert.assertNotNull("Table ID was null", tableId);
@@ -1150,10 +1048,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
   @Test
   public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
-    Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
-    for (ProcessReference ref : gcProcs) {
-      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
-    }
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 
     final Connector conn = getConnector();
 
@@ -1184,7 +1079,6 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     String table1 = "table1", table2 = "table2", table3 = "table3";
 
-    BatchWriter bw;
     try {
       conn.tableOperations().create(table1);
       conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -1193,51 +1087,19 @@ public class ReplicationIT extends ConfigurableMacIT {
           ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
 
       // Write some data to table1
-      bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
+      writeSomeData(conn, table1, 200, 500);
 
       conn.tableOperations().create(table2);
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
 
-      // Write some data to table2
-      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
+      writeSomeData(conn, table2, 200, 500);
 
       conn.tableOperations().create(table3);
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey()
+ "cluster1", "1");
 
-      // Write some data to table3
-      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
-      }
-
-      bw.close();
+      writeSomeData(conn, table3, 200, 500);
 
       // Flush everything to try to make the replication records
       for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1251,11 +1113,8 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-      cluster.killProcess(ServerType.TABLET_SERVER, proc);
-    }
-
-    cluster.exec(TabletServer.class);
+    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1358,9 +1217,7 @@ public class ReplicationIT extends ConfigurableMacIT {
   @Test
   public void replicatedStatusEntriesAreDeleted() throws Exception {
     // Just stop it now, we'll restart it after we restart the tserver
-    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR))
{
-      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
-    }
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 
     final Connector conn = getConnector();
     log.info("Got connector to MAC");
@@ -1396,17 +1253,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertNotNull("Could not determine table id for " + table1, tableId);
 
     // Write some data to table1
-    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-    for (int rows = 0; rows < 2000; rows++) {
-      Mutation m = new Mutation(Integer.toString(rows));
-      for (int cols = 0; cols < 50; cols++) {
-        String value = Integer.toString(cols);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-
-    bw.close();
+    writeSomeData(conn, table1, 2000, 50);
 
     // Make sure the replication table exists at this point
     boolean online = ReplicationTable.isOnline(conn);
@@ -1423,14 +1270,35 @@ public class ReplicationIT extends ConfigurableMacIT {
     // Grant ourselves the write permission for later
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
+    log.info("Checking for replication entries in replication");
+    // Then we need to get those records over to the replication table
+    Scanner s;
+    Set<String> entries = new HashSet<>();
+    for (int i = 0; i < 5; i++) {
+      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      s.setRange(ReplicationSection.getRange());
+      entries.clear();
+      for (Entry<Key,Value> entry : s) {
+        entries.add(entry.getKey().getRow().toString());
+        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      }
+      if (!entries.isEmpty()) {
+        log.info("Replication entries {}", entries);
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty());
+
     // Find the WorkSection record that will be created for that data we ingested
     boolean notFound = true;
-    Scanner s;
     for (int i = 0; i < 10 && notFound; i++) {
       try {
         s = ReplicationTable.getScanner(conn);
         WorkSection.limit(s);
         Entry<Key,Value> e = Iterables.getOnlyElement(s);
+        log.info("Found entry: " + e.getKey().toStringNoTruncate());
         Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
         Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
         notFound = false;
@@ -1481,14 +1349,13 @@ public class ReplicationIT extends ConfigurableMacIT {
     log.info("Killing tserver");
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
-    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
-      cluster.killProcess(ServerType.TABLET_SERVER, proc);
-    }
+    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
 
     log.info("Starting tserver");
-    cluster.exec(TabletServer.class);
+    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
 
     log.info("Waiting to read tables");
+    UtilWaitThread.sleep(2 * 3 * 1000);
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : new String[] {MetadataTable.NAME, table1}) {
@@ -1497,55 +1364,48 @@ public class ReplicationIT extends ConfigurableMacIT {
       Entry<Key,Value> entry : s) {}
     }
 
-    log.info("Checking for replication entries in replication");
-    // Then we need to get those records over to the replication table
-    boolean foundResults = false;
-    for (int i = 0; i < 5; i++) {
-      s = ReplicationTable.getScanner(conn);
-      int count = 0;
-      for (Entry<Key,Value> entry : s) {
-        count++;
-        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-      }
-      if (count > 0) {
-        foundResults = true;
-        break;
-      }
-      Thread.sleep(1000);
+    log.info("Recovered metadata:");
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    for (Entry<Key,Value> entry : s) {
+      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
     }
 
-    Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
-    getCluster().exec(SimpleGarbageCollector.class);
+    cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
 
     // Wait for a bit since the GC has to run (should be running after a one second delay)
     waitForGCLock(conn);
 
     Thread.sleep(1000);
 
+    log.info("After GC");
+    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    for (Entry<Key,Value> entry : s) {
+      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+    }
+
     // We expect no records in the metadata table after compaction. We have to poll
     // because we have to wait for the StatusMaker's next iteration which will clean
     // up the dangling *closed* records after we create the record in the replication table.
     // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove
the record
     log.info("Checking metadata table for replication entries");
-    foundResults = true;
+    Set<String> remaining = new HashSet<>();
     for (int i = 0; i < 10; i++) {
       s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       s.setRange(ReplicationSection.getRange());
-      long size = 0;
+      remaining.clear();
       for (Entry<Key,Value> e : s) {
-        size++;
-        log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+        remaining.add(e.getKey().getRow().toString());
       }
-      if (size == 0) {
-        foundResults = false;
+      remaining.retainAll(entries);
+      if (remaining.isEmpty()) {
         break;
       }
+      log.info("remaining {}", remaining);
       Thread.sleep(2000);
       log.info("");
     }
 
-    Assert.assertFalse("Replication status messages were not cleaned up from metadata table",
foundResults);
+    Assert.assertTrue("Replication status messages were not cleaned up from metadata table",
remaining.isEmpty());
 
     /**
      * After we close out and subsequently delete the metadata record, this will propagate
to the replication table, which will cause those records to be
@@ -1558,10 +1418,10 @@ public class ReplicationIT extends ConfigurableMacIT {
       recordsFound = 0;
       for (Entry<Key,Value> entry : s) {
         recordsFound++;
-        log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+        log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
       }
 
-      if (0 == recordsFound) {
+      if (recordsFound <= 2) {
         break;
       } else {
         Thread.sleep(1000);
@@ -1569,6 +1429,6 @@ public class ReplicationIT extends ConfigurableMacIT {
       }
     }
 
-    Assert.assertEquals("Found unexpected replication records in the replication table",
0, recordsFound);
+    Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound
<= 2);
   }
 }


Mime
View raw message