accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [13/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL metadata table updates"
Date Sun, 10 May 2015 21:05:59 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
deleted file mode 100644
index e8d276f..0000000
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.functional;
-
-import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
-import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
-import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
-import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
-import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
-import static org.apache.accumulo.core.security.Authorizations.EMPTY;
-import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
-import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.master.state.SetGoalState;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterators;
-
-public class WALSunnyDayIT extends ConfigurableMacIT {
-
-  private static final Text CF = new Text(new byte[0]);
-
-  @Override
-  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(GC_CYCLE_DELAY, "1s");
-    cfg.setProperty(GC_CYCLE_START, "0s");
-    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
-    cfg.setProperty(TSERV_WAL_REPLICATION, "1");
-    cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
-    cfg.setNumTservers(1);
-    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
-  }
-
-  int countTrue(Collection<Boolean> bools) {
-    int result = 0;
-    for (Boolean b : bools) {
-      if (b.booleanValue())
-        result++;
-    }
-    return result;
-  }
-
-  @Test
-  public void test() throws Exception {
-    MiniAccumuloClusterImpl mac = getCluster();
-    MiniAccumuloClusterControl control = mac.getClusterControl();
-    control.stop(GARBAGE_COLLECTOR);
-    Connector c = getConnector();
-    ZooKeeper zoo = new ZooKeeper(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut(), new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        log.info(event.toString());
-      }
-    });
-    String tableName = getUniqueNames(1)[0];
-    c.tableOperations().create(tableName);
-    writeSomeData(c, tableName, 1, 1);
-
-    // wal markers are added lazily
-    Map<String,Boolean> wals = getWals(c, zoo);
-    assertEquals(wals.toString(), 1, wals.size());
-    for (Boolean b : wals.values()) {
-      assertTrue("logs should be in use", b.booleanValue());
-    }
-
-    // roll log, get a new next
-    writeSomeData(c, tableName, 1000, 50);
-    Map<String,Boolean> walsAfterRoll = getWals(c, zoo);
-    assertEquals("should have 3 WALs after roll", 2, walsAfterRoll.size());
-    assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet()));
-    assertEquals("all WALs should be in use", 2, countTrue(walsAfterRoll.values()));
-
-    // flush the tables
-    for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
-      c.tableOperations().flush(table, null, null, true);
-    }
-    UtilWaitThread.sleep(1000);
-    // rolled WAL is no longer in use, but needs to be GC'd
-    Map<String,Boolean> walsAfterflush = getWals(c, zoo);
-    assertEquals(walsAfterflush.toString(), 2, walsAfterflush.size());
-    assertEquals("inUse should be 1", 1, countTrue(walsAfterflush.values()));
-
-    // let the GC run for a little bit
-    control.start(GARBAGE_COLLECTOR);
-    UtilWaitThread.sleep(5 * 1000);
-    // make sure the unused WAL goes away
-    Map<String,Boolean> walsAfterGC = getWals(c, zoo);
-    assertEquals(walsAfterGC.toString(), 1, walsAfterGC.size());
-    control.stop(GARBAGE_COLLECTOR);
-    // restart the tserver, but don't run recovery on all tablets
-    control.stop(TABLET_SERVER);
-    // this delays recovery on the normal tables
-    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
-    control.start(TABLET_SERVER);
-
-    // wait for the metadata table to go back online
-    getRecoveryMarkers(c);
-    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
-    UtilWaitThread.sleep(5 * 1000);
-    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
-    // log.debug("markers " + markers);
-    assertEquals("one tablet should have markers", 1, markers.keySet().size());
-    assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1"));
-
-    // put some data in the WAL
-    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
-    verifySomeData(c, tableName, 1000 * 50 + 1);
-    writeSomeData(c, tableName, 100, 100);
-
-    Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
-    // log.debug("wals after " + walsAfterRestart);
-    assertEquals("used WALs after restart should be 1", 1, countTrue(walsAfterRestart.values()));
-    control.start(GARBAGE_COLLECTOR);
-    UtilWaitThread.sleep(5 * 1000);
-    Map<String,Boolean> walsAfterRestartAndGC = getWals(c, zoo);
-    assertEquals("wals left should be 1", 1, walsAfterRestartAndGC.size());
-    assertEquals("logs in use should be 1", 1, countTrue(walsAfterRestartAndGC.values()));
-  }
-
-  private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
-    Scanner scan = c.createScanner(tableName, EMPTY);
-    int result = Iterators.size(scan.iterator());
-    scan.close();
-    Assert.assertEquals(expected, result);
-  }
-
-  private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
-    Random rand = new Random();
-    BatchWriter bw = conn.createBatchWriter(tableName, null);
-    byte[] rowData = new byte[10];
-    byte[] cq = new byte[10];
-    byte[] value = new byte[10];
-
-    for (int r = 0; r < row; r++) {
-      rand.nextBytes(rowData);
-      Mutation m = new Mutation(rowData);
-      for (int c = 0; c < col; c++) {
-        rand.nextBytes(cq);
-        rand.nextBytes(value);
-        m.put(CF, new Text(cq), new Value(value));
-      }
-      bw.addMutation(m);
-      if (r % 100 == 0) {
-        bw.flush();
-      }
-    }
-    bw.close();
-  }
-
-  private Map<String,Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception {
-    Map<String,Boolean> result = new HashMap<>();
-    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
-    root.setRange(CurrentLogsSection.getRange());
-    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
-    meta.setRange(root.getRange());
-    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
-    while (both.hasNext()) {
-      Entry<Key,Value> entry = both.next();
-      Text path = new Text();
-      CurrentLogsSection.getPath(entry.getKey(), path);
-      result.put(path.toString(), entry.getValue().get().length == 0);
-    }
-    String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-    List<String> children = zoo.getChildren(zpath, null);
-    for (String child : children) {
-      byte[] data = zoo.getData(zpath + "/" + child, null, null);
-      result.put(new String(data), true);
-    }
-    return result;
-  }
-
-  private Map<KeyExtent,List<String>> getRecoveryMarkers(Connector c) throws Exception {
-    Map<KeyExtent,List<String>> result = new HashMap<>();
-    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
-    root.setRange(TabletsSection.getRange());
-    root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
-    TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
-
-    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
-    meta.setRange(TabletsSection.getRange());
-    meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
-    TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
-
-    List<String> logs = new ArrayList<>();
-    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
-    while (both.hasNext()) {
-      Entry<Key,Value> entry = both.next();
-      Key key = entry.getKey();
-      if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
-        logs.add(key.getColumnQualifier().toString());
-      }
-      if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
-        KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
-        result.put(extent, logs);
-        logs = new ArrayList<String>();
-      }
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
index 140fd59..3b1dd2f 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@ -52,7 +52,7 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT {
       String response = new String(buffer, 0, n);
       long total = Long.parseLong(response.split(":")[1].trim());
       assertTrue("Total watches was not greater than 500, but was " + total, total > 500);
-      assertTrue("Total watches was not less than 675, but was " + total, total < 675);
+      assertTrue("Total watches was not less than 650, but was " + total, total < 600);
     } finally {
       socket.close();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
deleted file mode 100644
index 5418e1c..0000000
--- a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.performance;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.test.continuous.ContinuousIngest;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class RollWALPerformanceIT extends ConfigurableMacIT {
-
-  @Override
-  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1");
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M");
-    cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100");
-    cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
-    cfg.setProperty(Property.GC_CYCLE_START, "1s");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
-    cfg.useMiniDFS(true);
-  }
-
-  private long ingest() throws Exception {
-    final Connector c = getConnector();
-    final String tableName = getUniqueNames(1)[0];
-
-    log.info("Creating the table");
-    c.tableOperations().create(tableName);
-
-    log.info("Splitting the table");
-    final long SPLIT_COUNT = 100;
-    final long distance = Long.MAX_VALUE / SPLIT_COUNT;
-    final SortedSet<Text> splits = new TreeSet<Text>();
-    for (int i = 1; i < SPLIT_COUNT; i++) {
-      splits.add(new Text(String.format("%016x", i * distance)));
-    }
-    c.tableOperations().addSplits(tableName, splits);
-
-    log.info("Waiting for balance");
-    c.instanceOperations().waitForBalance();
-
-    final Instance inst = c.getInstance();
-
-    log.info("Starting ingest");
-    final long start = System.currentTimeMillis();
-    final String args[] = {"-i", inst.getInstanceName(), "-z", inst.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--batchThreads", "2", "--table",
-        tableName, "--num", Long.toString(1000 * 1000), // 1M 100 byte entries
-    };
-
-    ContinuousIngest.main(args);
-    final long result = System.currentTimeMillis() - start;
-    log.debug(String.format("Finished in %,d ms", result));
-    log.debug("Dropping table");
-    c.tableOperations().delete(tableName);
-    return result;
-  }
-
-  private long getAverage() throws Exception {
-    final int REPEAT = 3;
-    long totalTime = 0;
-    for (int i = 0; i < REPEAT; i++) {
-      totalTime += ingest();
-    }
-    return totalTime / REPEAT;
-  }
-
-  private void testWalPerformanceOnce() throws Exception {
-    // get time with a small WAL, which will cause many WAL roll-overs
-    long avg1 = getAverage();
-    // use a bigger WAL max size to eliminate WAL roll-overs
-    Connector c = getConnector();
-    c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G");
-    c.tableOperations().flush(MetadataTable.NAME, null, null, true);
-    c.tableOperations().flush(RootTable.NAME, null, null, true);
-    for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
-      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
-    }
-    getCluster().start();
-    long avg2 = getAverage();
-    log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2));
-    assertTrue(avg1 > avg2);
-    double percent = (100. * avg1) / avg2;
-    log.info(String.format("Percent of large log: %.2f%%", percent));
-    assertTrue(percent < 125.);
-  }
-
-  @Test(timeout = 20 * 60 * 1000)
-  public void testWalPerformance() throws Exception {
-    testWalPerformanceOnce();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 62ed9c2..75f61f1 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -79,7 +78,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
     cfg.setNumTservers(1);
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
     cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s");
     // Wait longer to try to let the replication table come online before a cycle runs
     cfg.setProperty(Property.GC_CYCLE_START, "10s");
@@ -104,14 +102,18 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
     Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    s.fetchColumnFamily(CurrentLogsSection.COLF);
+    Range r = MetadataSchema.TabletsSection.getRange(tableId);
+    s.setRange(r);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
 
     Set<String> wals = new HashSet<String>();
     for (Entry<Key,Value> entry : s) {
       log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
       // hostname:port/uri://path/to/wal
-      String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
+      String cq = entry.getKey().getColumnQualifier().toString();
+      int index = cq.indexOf('/');
+      // Normalize the path
+      String path = new Path(cq.substring(index + 1)).toString();
       log.debug("Extracted file: " + path);
       wals.add(path);
     }
@@ -226,6 +228,11 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 
+    log.info("Checking to see that log entries are removed from tablet section after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
+
     Set<String> filesForTable = getFilesForTable(table);
     Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
     log.info("Files for table before MajC: {}", filesForTable);
@@ -251,6 +258,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
       fileExists = fs.exists(fileToBeDeleted);
     }
 
+    // At this point in time, we *know* that the GarbageCollector has run which means that the Status
+    // for our WAL should not be altered.
+
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
+
     Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
     Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 
@@ -311,6 +326,11 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed());
 
+    log.info("Checking to see that log entries are removed from tablet section after MinC");
+    // After compaction, the log column should be gone from the tablet
+    Set<String> walsAfterMinc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size());
+
     Set<String> filesForTable = getFilesForTable(table);
     Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size());
     log.info("Files for table before MajC: {}", filesForTable);
@@ -339,6 +359,11 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
     // At this point in time, we *know* that the GarbageCollector has run which means that the Status
     // for our WAL should not be altered.
 
+    log.info("Re-checking that WALs are still not referenced for our table");
+
+    Set<String> walsAfterMajc = getWalsForTable(table);
+    Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size());
+
     Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table);
     Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index fea41fa..3912e98 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@ -148,7 +148,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT {
     }
   }
 
-  @Test(timeout = 10 * 60 * 1000)
+  @Test
   public void dataWasReplicatedToThePeer() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
         ROOT_PASSWORD);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index e86f653..34c699e 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -16,12 +16,11 @@
  */
 package org.apache.accumulo.test.replication;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,7 +45,6 @@ import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
@@ -65,7 +63,6 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -74,7 +71,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.replication.StatusFormatter;
@@ -82,6 +79,7 @@ import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -126,38 +124,25 @@ public class ReplicationIT extends ConfigurableMacIT {
     cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
     cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
     cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
-    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
     cfg.setNumTservers(1);
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
   private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
-    // Map of server to tableId
-    Multimap<TServerInstance,String> serverToTableID = HashMultimap.create();
-    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.setRange(MetadataSchema.TabletsSection.getRange());
-    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
-    for (Entry<Key,Value> entry : scanner) {
-      TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
-      byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
-      serverToTableID.put(key, new String(tableId, UTF_8));
-    }
-    // Map of logs to tableId
     Multimap<String,String> logs = HashMultimap.create();
-    scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(LogColumnFamily.NAME);
+    scanner.setRange(new Range());
     for (Entry<Key,Value> entry : scanner) {
       if (Thread.interrupted()) {
         return logs;
       }
-      Text path = new Text();
-      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-      Text session = new Text();
-      Text hostPort = new Text();
-      MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort, session);
-      TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString());
-      for (String tableId : serverToTableID.get(server)) {
-        logs.put(new Path(path.toString()).toString(), tableId);
+
+      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+      for (String log : logEntry.logSet) {
+        // Need to normalize the log file from LogEntry
+        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
       }
     }
     return logs;
@@ -312,12 +297,10 @@ public class ReplicationIT extends ConfigurableMacIT {
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
       s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
+      s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
       for (Entry<Key,Value> entry : s) {
-        Text path = new Text();
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        wals.add(new Path(path.toString()).toString());
+        LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+        wals.add(new Path(logEntry.filename).toString());
       }
       attempts--;
     }
@@ -348,7 +331,18 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     for (String table : tables) {
-      writeSomeData(conn, table, 5, 5);
+      BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+
+      for (int j = 0; j < 5; j++) {
+        Mutation m = new Mutation(Integer.toString(j));
+        for (int k = 0; k < 5; k++) {
+          String value = Integer.toString(k);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
     }
 
     // After writing data, still no replication table
@@ -388,7 +382,18 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse(ReplicationTable.isOnline(conn));
 
     // Write some data to table1
-    writeSomeData(conn, table1, 50, 50);
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+
+    for (int rows = 0; rows < 50; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1
     // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -435,7 +440,18 @@ public class ReplicationIT extends ConfigurableMacIT {
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 
     // Write some data to table2
-    writeSomeData(conn, table2, 50, 50);
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+
+    for (int rows = 0; rows < 50; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2
     // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
@@ -483,19 +499,6 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
   }
 
-  private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception {
-    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
-    for (int row = 0; row < rows; row++) {
-      Mutation m = new Mutation(Integer.toString(row));
-      for (int col = 0; col < cols; col++) {
-        String value = Integer.toString(col);
-        m.put(value, "", value);
-      }
-      bw.addMutation(m);
-    }
-    bw.close();
-  }
-
   @Test
   public void replicationEntriesPrecludeWalDeletion() throws Exception {
     final Connector conn = getConnector();
@@ -527,21 +530,53 @@ public class ReplicationIT extends ConfigurableMacIT {
     Thread.sleep(2000);
 
     // Write some data to table1
-    writeSomeData(conn, table1, 200, 500);
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     conn.tableOperations().create(table2);
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
     Thread.sleep(2000);
 
-    writeSomeData(conn, table2, 200, 500);
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     conn.tableOperations().create(table3);
     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
     Thread.sleep(2000);
 
-    writeSomeData(conn, table3, 200, 500);
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     // Force a write to metadata for the data written
     for (String table : Arrays.asList(table1, table2, table3)) {
@@ -575,8 +610,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // We should have *some* reference to each log that was seen in the metadata table
     // They might not yet all be closed though (might be newfile)
-    Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
-    Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1);
+    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
@@ -664,11 +698,44 @@ public class ReplicationIT extends ConfigurableMacIT {
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      writeSomeData(conn, table1, 200, 500);
+      // Write some data to table1
+      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
 
-      writeSomeData(conn, table2, 200, 500);
+      bw.close();
 
-      writeSomeData(conn, table3, 200, 500);
+      // Write some data to table2
+      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      // Write some data to table3
+      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
 
       // Flush everything to try to make the replication records
       for (String table : Arrays.asList(table1, table2, table3)) {
@@ -721,7 +788,10 @@ public class ReplicationIT extends ConfigurableMacIT {
     Set<String> wals = new HashSet<>();
     for (Entry<Key,Value> entry : s) {
       LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-      wals.add(new Path(logEntry.filename).toString());
+      for (String file : logEntry.logSet) {
+        Path p = new Path(file);
+        wals.add(p.toString());
+      }
     }
 
     log.warn("Found wals {}", wals);
@@ -798,7 +868,9 @@ public class ReplicationIT extends ConfigurableMacIT {
   public void singleTableWithSingleTarget() throws Exception {
     // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
     // against expected Status messages.
-    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
 
     Connector conn = getConnector();
     String table1 = "table1";
@@ -832,7 +904,17 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some data to table1
-    writeSomeData(conn, table1, 2000, 50);
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     // Make sure the replication table is online at this point
     boolean online = ReplicationTable.isOnline(conn);
@@ -919,7 +1001,17 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some more data so that we over-run the single WAL
-    writeSomeData(conn, table1, 3000, 50);
+    bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 3000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
 
     log.info("Issued compaction for table");
     conn.tableOperations().compact(table1, null, null, true, true);
@@ -992,7 +1084,17 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     // Write some data to table1
-    writeSomeData(conn, table1, 2000, 50);
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
     conn.tableOperations().flush(table1, null, null, true);
 
     String tableId = conn.tableOperations().tableIdMap().get(table1);
@@ -1048,7 +1150,10 @@ public class ReplicationIT extends ConfigurableMacIT {
 
   @Test
   public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
-    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+    Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
+    for (ProcessReference ref : gcProcs) {
+      cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
+    }
 
     final Connector conn = getConnector();
 
@@ -1079,6 +1184,7 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     String table1 = "table1", table2 = "table2", table3 = "table3";
 
+    BatchWriter bw;
     try {
       conn.tableOperations().create(table1);
       conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -1087,19 +1193,51 @@ public class ReplicationIT extends ConfigurableMacIT {
           ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
 
       // Write some data to table1
-      writeSomeData(conn, table1, 200, 500);
+      bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
 
       conn.tableOperations().create(table2);
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      writeSomeData(conn, table2, 200, 500);
+      // Write some data to table2
+      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
 
       conn.tableOperations().create(table3);
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      writeSomeData(conn, table3, 200, 500);
+      // Write some data to table3
+      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+      for (int rows = 0; rows < 200; rows++) {
+        Mutation m = new Mutation(Integer.toString(rows));
+        for (int cols = 0; cols < 500; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
 
       // Flush everything to try to make the replication records
       for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1113,8 +1251,11 @@ public class ReplicationIT extends ConfigurableMacIT {
 
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
-    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
-    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+
+    cluster.exec(TabletServer.class);
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : Arrays.asList(table1, table2, table3)) {
@@ -1215,7 +1356,9 @@ public class ReplicationIT extends ConfigurableMacIT {
   @Test
   public void replicatedStatusEntriesAreDeleted() throws Exception {
     // Just stop it now, we'll restart it after we restart the tserver
-    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
 
     final Connector conn = getConnector();
     log.info("Got connector to MAC");
@@ -1251,7 +1394,17 @@ public class ReplicationIT extends ConfigurableMacIT {
     Assert.assertNotNull("Could not determine table id for " + table1, tableId);
 
     // Write some data to table1
-    writeSomeData(conn, table1, 2000, 50);
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 2000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 50; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
     conn.tableOperations().flush(table1, null, null, true);
 
     // Make sure the replication table exists at this point
@@ -1269,35 +1422,14 @@ public class ReplicationIT extends ConfigurableMacIT {
     // Grant ourselves the write permission for later
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 
-    log.info("Checking for replication entries in replication");
-    // Then we need to get those records over to the replication table
-    Scanner s;
-    Set<String> entries = new HashSet<>();
-    for (int i = 0; i < 5; i++) {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.setRange(ReplicationSection.getRange());
-      entries.clear();
-      for (Entry<Key,Value> entry : s) {
-        entries.add(entry.getKey().getRow().toString());
-        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-      }
-      if (!entries.isEmpty()) {
-        log.info("Replication entries {}", entries);
-        break;
-      }
-      Thread.sleep(1000);
-    }
-
-    Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty());
-
     // Find the WorkSection record that will be created for that data we ingested
     boolean notFound = true;
+    Scanner s;
     for (int i = 0; i < 10 && notFound; i++) {
       try {
         s = ReplicationTable.getScanner(conn);
         WorkSection.limit(s);
         Entry<Key,Value> e = Iterables.getOnlyElement(s);
-        log.info("Found entry: " + e.getKey().toStringNoTruncate());
         Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
         Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
         notFound = false;
@@ -1348,61 +1480,69 @@ public class ReplicationIT extends ConfigurableMacIT {
     log.info("Killing tserver");
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
-    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
 
     log.info("Starting tserver");
-    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+    cluster.exec(TabletServer.class);
 
     log.info("Waiting to read tables");
-    UtilWaitThread.sleep(2 * 3 * 1000);
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : new String[] {MetadataTable.NAME, table1}) {
       Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator());
     }
 
-    log.info("Recovered metadata:");
-    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    for (Entry<Key,Value> entry : s) {
-      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+    log.info("Checking for replication entries in replication");
+    // Then we need to get those records over to the replication table
+    boolean foundResults = false;
+    for (int i = 0; i < 5; i++) {
+      s = ReplicationTable.getScanner(conn);
+      int count = 0;
+      for (Entry<Key,Value> entry : s) {
+        count++;
+        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      }
+      if (count > 0) {
+        foundResults = true;
+        break;
+      }
+      Thread.sleep(1000);
     }
 
-    cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+    Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
+
+    getCluster().exec(SimpleGarbageCollector.class);
 
     // Wait for a bit since the GC has to run (should be running after a one second delay)
     waitForGCLock(conn);
 
     Thread.sleep(1000);
 
-    log.info("After GC");
-    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    for (Entry<Key,Value> entry : s) {
-      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-    }
-
     // We expect no records in the metadata table after compaction. We have to poll
     // because we have to wait for the StatusMaker's next iteration which will clean
     // up the dangling *closed* records after we create the record in the replication table.
     // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
     log.info("Checking metadata table for replication entries");
-    Set<String> remaining = new HashSet<>();
+    foundResults = true;
     for (int i = 0; i < 10; i++) {
       s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
       s.setRange(ReplicationSection.getRange());
-      remaining.clear();
+      long size = 0;
       for (Entry<Key,Value> e : s) {
-        remaining.add(e.getKey().getRow().toString());
+        size++;
+        log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
       }
-      remaining.retainAll(entries);
-      if (remaining.isEmpty()) {
+      if (size == 0) {
+        foundResults = false;
         break;
       }
-      log.info("remaining {}", remaining);
       Thread.sleep(2000);
       log.info("");
     }
 
-    Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty());
+    Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
 
     /**
      * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
@@ -1415,10 +1555,10 @@ public class ReplicationIT extends ConfigurableMacIT {
       recordsFound = 0;
       for (Entry<Key,Value> entry : s) {
         recordsFound++;
-        log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+        log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
       }
 
-      if (recordsFound <= 2) {
+      if (0 == recordsFound) {
         break;
       } else {
         Thread.sleep(1000);
@@ -1426,6 +1566,6 @@ public class ReplicationIT extends ConfigurableMacIT {
       }
     }
 
-    Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2);
+    Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
   }
 }


Mime
View raw message