accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [13/16] accumulo git commit: Merge branch '1.7'
Date Wed, 03 Feb 2016 17:54:06 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/dc087b12/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 3245f63,0000000..02e2fdf
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@@ -1,186 -1,0 +1,186 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.Assert.assertFalse;
 +
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +
 +import com.google.common.collect.Iterators;
 +
 +public class FunctionalTestUtils {
 +
 +  public static int countRFiles(Connector c, String tableName) throws Exception {
 +    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    String tableId = c.tableOperations().tableIdMap().get(tableName);
 +    scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +
 +    return Iterators.size(scanner.iterator());
 +  }
 +
 +  static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception {
 +    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    String tableId = c.tableOperations().tableIdMap().get(tableName);
 +    scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"), true));
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +    MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
 +
 +    HashMap<Text,Integer> tabletFileCounts = new HashMap<Text,Integer>();
 +
 +    for (Entry<Key,Value> entry : scanner) {
 +
 +      Text row = entry.getKey().getRow();
 +
 +      Integer count = tabletFileCounts.get(row);
 +      if (count == null)
 +        count = 0;
 +      if (entry.getKey().getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
 +        count = count + 1;
 +      }
 +
 +      tabletFileCounts.put(row, count);
 +    }
 +
 +    if (tabletFileCounts.size() < minTablets || tabletFileCounts.size() > maxTablets) {
 +      throw new Exception("Did not find expected number of tablets " + tabletFileCounts.size());
 +    }
 +
 +    Set<Entry<Text,Integer>> es = tabletFileCounts.entrySet();
 +    for (Entry<Text,Integer> entry : es) {
 +      if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) {
 +        throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map files");
 +      }
 +    }
 +  }
 +
 +  static public void bulkImport(Connector c, FileSystem fs, String table, String dir) throws Exception {
 +    String failDir = dir + "_failures";
 +    Path failPath = new Path(failDir);
 +    fs.delete(failPath, true);
 +    fs.mkdirs(failPath);
 +
 +    // Ensure server can read/modify files
 +    c.tableOperations().importDirectory(table, dir, failDir, false);
 +
 +    if (fs.listStatus(failPath).length > 0) {
 +      throw new Exception("Some files failed to bulk import");
 +    }
 +
 +  }
 +
 +  static public void checkSplits(Connector c, String table, int min, int max) throws Exception {
 +    Collection<Text> splits = c.tableOperations().listSplits(table);
 +    if (splits.size() < min || splits.size() > max) {
 +      throw new Exception("# of table splits points out of range, #splits=" + splits.size() + " table=" + table + " min=" + min + " max=" + max);
 +    }
 +  }
 +
 +  static public void createRFiles(final Connector c, final FileSystem fs, String path, int rows, int splits, int threads) throws Exception {
 +    fs.delete(new Path(path), true);
 +    ExecutorService threadPool = Executors.newFixedThreadPool(threads);
 +    final AtomicBoolean fail = new AtomicBoolean(false);
 +    for (int i = 0; i < rows; i += rows / splits) {
 +      final TestIngest.Opts opts = new TestIngest.Opts();
 +      opts.outputFile = String.format("%s/mf%s", path, i);
 +      opts.random = 56;
 +      opts.timestamp = 1;
 +      opts.dataSize = 50;
 +      opts.rows = rows / splits;
 +      opts.startRow = i;
 +      opts.cols = 1;
 +      threadPool.execute(new Runnable() {
 +        @Override
 +        public void run() {
 +          try {
 +            TestIngest.ingest(c, fs, opts, new BatchWriterOpts());
 +          } catch (Exception e) {
 +            fail.set(true);
 +          }
 +        }
 +      });
 +    }
 +    threadPool.shutdown();
 +    threadPool.awaitTermination(1, TimeUnit.HOURS);
 +    assertFalse(fail.get());
 +  }
 +
 +  static public String readAll(InputStream is) throws IOException {
 +    byte[] buffer = new byte[4096];
-     StringBuffer result = new StringBuffer();
++    StringBuilder result = new StringBuilder();
 +    while (true) {
 +      int n = is.read(buffer);
 +      if (n <= 0)
 +        break;
 +      result.append(new String(buffer, 0, n));
 +    }
 +    return result.toString();
 +  }
 +
 +  public static String readAll(MiniAccumuloClusterImpl c, Class<?> klass, Process p) throws Exception {
 +    for (LogWriter writer : c.getLogWriters())
 +      writer.flush();
 +    return readAll(new FileInputStream(c.getConfig().getLogDir() + "/" + klass.getSimpleName() + "_" + p.hashCode() + ".out"));
 +  }
 +
 +  static Mutation nm(String row, String cf, String cq, Value value) {
 +    Mutation m = new Mutation(new Text(row));
 +    m.put(new Text(cf), new Text(cq), value);
 +    return m;
 +  }
 +
 +  static Mutation nm(String row, String cf, String cq, String value) {
 +    return nm(row, cf, cq, new Value(value.getBytes()));
 +  }
 +
 +  public static SortedSet<Text> splits(String[] splits) {
 +    SortedSet<Text> result = new TreeSet<Text>();
 +    for (String split : splits)
 +      result.add(new Text(split));
 +    return result;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dc087b12/test/src/main/java/org/apache/accumulo/test/mrit/IntegrationTestMapReduce.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/mrit/IntegrationTestMapReduce.java
index ee8d7b3,0000000..04d7dc7
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/mrit/IntegrationTestMapReduce.java
+++ b/test/src/main/java/org/apache/accumulo/test/mrit/IntegrationTestMapReduce.java
@@@ -1,212 -1,0 +1,212 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.mrit;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.conf.Configured;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.LongWritable;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Job;
 +import org.apache.hadoop.mapreduce.MRJobConfig;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hadoop.mapreduce.Reducer;
 +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
 +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +import org.apache.hadoop.util.Tool;
 +import org.apache.hadoop.util.ToolRunner;
 +import org.junit.runner.Description;
 +import org.junit.runner.JUnitCore;
 +import org.junit.runner.Result;
 +import org.junit.runner.notification.Failure;
 +import org.junit.runner.notification.RunListener;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Run the Integration Tests as a Map-Reduce job.
 + * <p>
 + * Each of the Integration tests takes 30s to 20m to run. Using a larger cluster, all the tests can be run in parallel and finish much faster.
 + * <p>
 + * To run the tests, you first need a list of the tests. A simple way to get a list, is to scan the accumulo-test jar file for them.
 + *
 + * <pre>
 + * $ jar -tf lib/accumulo-test.jar | grep IT.class | tr / . | sed -e 's/.class$//' &gt;tests
 + * </pre>
 + *
 + * Put the list of tests into HDFS:
 + *
 + * <pre>
 + * $ hadoop fs -mkdir /tmp
 + * $ hadoop fs -put tests /tmp/tests
 + * </pre>
 + *
 + * Run the class below as a map-reduce job, giving it the lists of tests, and a place to store the results.
 + *
 + * <pre>
 + * $ yarn jar lib/accumulo-test-mrit.jar -libjars lib/native/libaccumulo.so /tmp/tests /tmp/results
 + * </pre>
 + *
 + * The result is a list of IT classes that pass or fail. Those classes that fail will be annotated with the particular test that failed within the class.
 + */
 +
 +public class IntegrationTestMapReduce extends Configured implements Tool {
 +
 +  private static final Logger log = LoggerFactory.getLogger(IntegrationTestMapReduce.class);
 +
 +  private static boolean isMapReduce = false;
 +
 +  public static boolean isMapReduce() {
 +    return isMapReduce;
 +  }
 +
 +  public static class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
 +
 +    static final Text FAIL = new Text("FAIL");
 +    static final Text PASS = new Text("PASS");
 +    static final Text ERROR = new Text("ERROR");
 +
 +    public static enum TestCounts {
 +      PASS, FAIL, ERROR
 +    }
 +
 +    @Override
 +    protected void map(LongWritable key, Text value, final Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {
 +      isMapReduce = true;
 +      String className = value.toString();
 +      if (className.trim().isEmpty()) {
 +        return;
 +      }
 +      final List<String> failures = new ArrayList<>();
 +      Class<? extends Object> test = null;
 +      try {
 +        test = Class.forName(className);
 +      } catch (ClassNotFoundException e) {
 +        log.debug("Error finding class {}", className, e);
 +        context.getCounter(TestCounts.ERROR).increment(1);
 +        context.write(ERROR, new Text(e.toString()));
 +        return;
 +      }
 +      log.info("Running test {}", className);
 +      JUnitCore core = new JUnitCore();
 +      core.addListener(new RunListener() {
 +
 +        @Override
 +        public void testStarted(Description description) throws Exception {
 +          log.info("Starting {}", description);
 +          context.progress();
 +        }
 +
 +        @Override
 +        public void testFinished(Description description) throws Exception {
 +          log.info("Finished {}", description);
 +          context.progress();
 +        }
 +
 +        @Override
 +        public void testFailure(Failure failure) throws Exception {
 +          log.info("Test failed: {}", failure.getDescription(), failure.getException());
 +          failures.add(failure.getDescription().getMethodName());
 +          context.progress();
 +        }
 +
 +      });
 +      context.setStatus(test.getSimpleName());
 +      try {
 +        Result result = core.run(test);
 +        if (result.wasSuccessful()) {
 +          log.info("{} was successful", className);
 +          context.getCounter(TestCounts.PASS).increment(1);
 +          context.write(PASS, value);
 +        } else {
 +          log.info("{} failed", className);
 +          context.getCounter(TestCounts.FAIL).increment(1);
 +          context.write(FAIL, new Text(className + "(" + StringUtils.join(failures, ", ") + ")"));
 +        }
 +      } catch (Exception e) {
 +        // most likely JUnit issues, like no tests to run
 +        log.info("Test failed: {}", className, e);
 +      }
 +    }
 +  }
 +
 +  public static class TestReducer extends Reducer<Text,Text,Text,Text> {
 +
 +    @Override
 +    protected void reduce(Text code, Iterable<Text> tests, Reducer<Text,Text,Text,Text>.Context context) throws IOException, InterruptedException {
-       StringBuffer result = new StringBuffer("\n");
++      StringBuilder result = new StringBuilder("\n");
 +      for (Text test : tests) {
 +        result.append("   ");
 +        result.append(test.toString());
 +        result.append("\n");
 +      }
 +      context.write(code, new Text(result.toString()));
 +    }
 +  }
 +
 +  @Override
 +  public int run(String[] args) throws Exception {
 +    // read a list of tests from the input, and print out the results
 +    if (args.length != 2) {
 +      System.err.println("Wrong number of args: <input> <output>");
 +      return 1;
 +    }
 +    Configuration conf = getConf();
 +    Job job = Job.getInstance(conf, "accumulo integration test runner");
 +    conf = job.getConfiguration();
 +
 +    // some tests take more than 10 minutes
 +    conf.setLong(MRJobConfig.TASK_TIMEOUT, 20 * 60 * 1000);
 +
 +    // minicluster uses a lot of ram
 +    conf.setInt(MRJobConfig.MAP_MEMORY_MB, 4000);
 +
 +    // hadoop puts an ancient version of jline on the classpath
 +    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
 +
 +    // no need to run a test multiple times
 +    job.setSpeculativeExecution(false);
 +
 +    // read one line at a time
 +    job.setInputFormatClass(NLineInputFormat.class);
 +    NLineInputFormat.setNumLinesPerSplit(job, 1);
 +
 +    // run the test
 +    job.setJarByClass(IntegrationTestMapReduce.class);
 +    job.setMapperClass(TestMapper.class);
 +
 +    // group test by result code
 +    job.setReducerClass(TestReducer.class);
 +    job.setOutputKeyClass(Text.class);
 +    job.setOutputValueClass(Text.class);
 +
 +    FileInputFormat.addInputPath(job, new Path(args[0]));
 +    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +    return job.waitForCompletion(true) ? 0 : 1;
 +  }
 +
 +  public static void main(String[] args) throws Exception {
 +    System.exit(ToolRunner.run(new IntegrationTestMapReduce(), args));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dc087b12/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 0ff8c21,0000000..7f39df3
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@@ -1,1436 -1,0 +1,1436 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.replication;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.net.URI;
 +import java.net.URISyntaxException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.IteratorSetting.Column;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.conf.ColumnSet;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 +import org.apache.accumulo.fate.zookeeper.ZooLock;
 +import org.apache.accumulo.gc.SimpleGarbageCollector;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.log.WalStateManager;
 +import org.apache.accumulo.server.log.WalStateManager.WalState;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 +import org.apache.accumulo.server.replication.StatusCombiner;
 +import org.apache.accumulo.server.replication.StatusFormatter;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
 +import org.apache.accumulo.server.util.ReplicationTableUtil;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Multimap;
 +import com.google.common.collect.Sets;
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import com.google.protobuf.TextFormat;
 +
 +/**
 + * Replication tests which verify expected functionality using a single MAC instance. A MockReplicaSystem is used to "fake" the peer instance that we're
 + * replicating to. This lets us test replication in a functional way without having to worry about two real systems.
 + */
 +public class ReplicationIT extends ConfigurableMacBase {
 +  private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 60 * 10;
 +  }
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    // Run the master replication loop run frequently
 +    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
 +    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
 +    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
 +    cfg.setProperty(Property.GC_CYCLE_START, "1s");
 +    cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
 +    cfg.setProperty(Property.REPLICATION_NAME, "master");
 +    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
 +    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
 +    cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
 +    cfg.setNumTservers(1);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  private Multimap<String,String> getLogs(Connector conn) throws Exception {
 +    // Map of server to tableId
 +    Multimap<TServerInstance,String> serverToTableID = HashMultimap.create();
 +    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    scanner.setRange(MetadataSchema.TabletsSection.getRange());
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
 +    for (Entry<Key,Value> entry : scanner) {
 +      TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
 +      byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
 +      serverToTableID.put(key, new String(tableId, UTF_8));
 +    }
 +    // Map of logs to tableId
 +    Multimap<String,String> logs = HashMultimap.create();
 +    Instance i = conn.getInstance();
 +    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
 +    WalStateManager wals = new WalStateManager(conn.getInstance(), zk);
 +    for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
 +      for (UUID id : entry.getValue()) {
 +        Pair<WalState,Path> state = wals.state(entry.getKey(), id);
 +        for (String tableId : serverToTableID.get(entry.getKey())) {
 +          logs.put(state.getSecond().toString(), tableId);
 +        }
 +      }
 +    }
 +    return logs;
 +  }
 +
 +  private Multimap<String,String> getAllLogs(Connector conn) throws Exception {
 +    Multimap<String,String> logs = getLogs(conn);
 +    try {
 +      Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
 +      StatusSection.limit(scanner);
 +      Text buff = new Text();
 +      for (Entry<Key,Value> entry : scanner) {
 +        if (Thread.interrupted()) {
 +          Thread.currentThread().interrupt();
 +          return logs;
 +        }
 +
 +        StatusSection.getFile(entry.getKey(), buff);
 +        String file = buff.toString();
 +        StatusSection.getTableId(entry.getKey(), buff);
 +        String tableId = buff.toString();
 +
 +        logs.put(file, tableId);
 +      }
 +    } catch (TableOfflineException e) {
 +      log.debug("Replication table isn't online yet");
 +    }
 +    return logs;
 +  }
 +
 +  private void waitForGCLock(Connector conn) throws InterruptedException {
 +    // Check if the GC process has the lock before wasting our retry attempts
 +    ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance();
 +    ZooCacheFactory zcf = new ZooCacheFactory();
 +    ZooCache zcache = zcf.getZooCache(zki.getZooKeepers(), zki.getZooKeepersSessionTimeOut());
 +    String zkPath = ZooUtil.getRoot(conn.getInstance()) + Constants.ZGC_LOCK;
 +    log.info("Looking for GC lock at {}", zkPath);
 +    byte[] data = ZooLock.getLockData(zcache, zkPath, null);
 +    while (null == data) {
 +      log.info("Waiting for GC ZooKeeper lock to be acquired");
 +      Thread.sleep(1000);
 +      data = ZooLock.getLockData(zcache, zkPath, null);
 +    }
 +  }
 +
 +  @Test
 +  public void replicationTableCreated() throws AccumuloException, AccumuloSecurityException {
 +    Assert.assertTrue(getConnector().tableOperations().exists(ReplicationTable.NAME));
 +    Assert.assertEquals(ReplicationTable.ID, getConnector().tableOperations().tableIdMap().get(ReplicationTable.NAME));
 +  }
 +
 +  @Test
 +  public void verifyReplicationTableConfig() throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
 +    TableOperations tops = getConnector().tableOperations();
 +    Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(ReplicationTable.NAME);
 +
 +    // verify combiners are only iterators (no versioning)
 +    Assert.assertEquals(1, iterators.size());
 +
 +    // look for combiner
 +    Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME));
 +    Assert.assertTrue(iterators.get(ReplicationTable.COMBINER_NAME).containsAll(EnumSet.allOf(IteratorScope.class)));
 +    for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) {
 +      IteratorSetting is = tops.getIteratorSetting(ReplicationTable.NAME, ReplicationTable.COMBINER_NAME, scope);
 +      Assert.assertEquals(30, is.getPriority());
 +      Assert.assertEquals(StatusCombiner.class.getName(), is.getIteratorClass());
 +      Assert.assertEquals(1, is.getOptions().size());
 +      Assert.assertTrue(is.getOptions().containsKey("columns"));
 +      String cols = is.getOptions().get("columns");
 +      Column statusSectionCol = new Column(StatusSection.NAME);
 +      Column workSectionCol = new Column(WorkSection.NAME);
 +      Assert.assertEquals(
 +          ColumnSet.encodeColumns(statusSectionCol.getColumnFamily(), statusSectionCol.getColumnQualifier()) + ","
 +              + ColumnSet.encodeColumns(workSectionCol.getColumnFamily(), workSectionCol.getColumnQualifier()), cols);
 +    }
 +
 +    boolean foundLocalityGroups = false;
 +    boolean foundLocalityGroupDef1 = false;
 +    boolean foundLocalityGroupDef2 = false;
 +    boolean foundFormatter = false;
 +    Joiner j = Joiner.on(",");
 +    Function<Text,String> textToString = new Function<Text,String>() {
 +      @Override
 +      public String apply(Text text) {
 +        return text.toString();
 +      }
 +    };
 +    for (Entry<String,String> p : tops.getProperties(ReplicationTable.NAME)) {
 +      String key = p.getKey();
 +      String val = p.getValue();
 +      // STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS
 +      if (key.equals(Property.TABLE_FORMATTER_CLASS.getKey()) && val.equals(StatusFormatter.class.getName())) {
 +        // look for formatter
 +        foundFormatter = true;
 +      } else if (key.equals(Property.TABLE_LOCALITY_GROUPS.getKey()) && val.equals(j.join(ReplicationTable.LOCALITY_GROUPS.keySet()))) {
 +        // look for locality groups enabled
 +        foundLocalityGroups = true;
 +      } else if (key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())) {
 +        // look for locality group column family definitions
 +        if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.STATUS_LG_NAME)
 +            && val.equals(j.join(Iterables.transform(ReplicationTable.STATUS_LG_COLFAMS, textToString)))) {
 +          foundLocalityGroupDef1 = true;
 +        } else if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.WORK_LG_NAME)
 +            && val.equals(j.join(Iterables.transform(ReplicationTable.WORK_LG_COLFAMS, textToString)))) {
 +          foundLocalityGroupDef2 = true;
 +        }
 +      }
 +    }
 +    Assert.assertTrue(foundLocalityGroups);
 +    Assert.assertTrue(foundLocalityGroupDef1);
 +    Assert.assertTrue(foundLocalityGroupDef2);
 +    Assert.assertTrue(foundFormatter);
 +  }
 +
 +  @Test
 +  public void correctRecordsCompleteFile() throws Exception {
 +    Connector conn = getConnector();
 +    String table = "table1";
 +    conn.tableOperations().create(table);
 +    // If we have more than one tserver, this is subject to a race condition.
 +    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
 +
 +    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    for (int i = 0; i < 10; i++) {
 +      Mutation m = new Mutation(Integer.toString(i));
 +      m.put(new byte[0], new byte[0], new byte[0]);
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    // After writing data, we'll get a replication table online
 +    boolean online = ReplicationTable.isOnline(conn);
 +    int attempts = 10;
 +    do {
 +      if (!online) {
 +        sleepUninterruptibly(2, TimeUnit.SECONDS);
 +        online = ReplicationTable.isOnline(conn);
 +        attempts--;
 +      }
 +    } while (!online && attempts > 0);
 +    Assert.assertTrue("Replication table was not online", online);
 +
 +    for (int i = 0; i < 5; i++) {
 +      if (conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)) {
 +        break;
 +      }
 +      log.info("Could not read replication table, waiting and will retry");
 +      Thread.sleep(2000);
 +    }
 +
 +    Assert.assertTrue("'root' user could not read the replication table",
 +        conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
 +
-     Set<String> replRows = Sets.newHashSet();
++    Set<String> replRows = new HashSet<>();
 +    Scanner scanner;
 +    attempts = 5;
 +    while (replRows.isEmpty() && attempts > 0) {
 +      scanner = ReplicationTable.getScanner(conn);
 +      StatusSection.limit(scanner);
 +      for (Entry<Key,Value> entry : scanner) {
 +        Key k = entry.getKey();
 +
 +        String fileUri = k.getRow().toString();
 +        try {
 +          new URI(fileUri);
 +        } catch (URISyntaxException e) {
 +          Assert.fail("Expected a valid URI: " + fileUri);
 +        }
 +
 +        replRows.add(fileUri);
 +      }
 +    }
 +
-     Set<String> wals = Sets.newHashSet();
++    Set<String> wals = new HashSet<>();
 +    attempts = 5;
 +    Instance i = conn.getInstance();
 +    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
 +    while (wals.isEmpty() && attempts > 0) {
 +      WalStateManager markers = new WalStateManager(i, zk);
 +      for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) {
 +        wals.add(entry.getKey().toString());
 +      }
 +      attempts--;
 +    }
 +
 +    // We only have one file that should need replication (no trace table)
 +    // We should find an entry in tablet and in the repl row
 +    Assert.assertEquals("Rows found: " + replRows, 1, replRows.size());
 +
 +    // There should only be one extra WALog that replication doesn't know about
 +    replRows.removeAll(wals);
 +    Assert.assertEquals(2, wals.size());
 +    Assert.assertEquals(0, replRows.size());
 +  }
 +
 +  @Test
 +  public void noRecordsWithoutReplication() throws Exception {
 +    Connector conn = getConnector();
 +    List<String> tables = new ArrayList<>();
 +
 +    // replication shouldn't be online when we begin
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    for (int i = 0; i < 5; i++) {
 +      String name = "table" + i;
 +      tables.add(name);
 +      conn.tableOperations().create(name);
 +    }
 +
 +    // nor after we create some tables (that aren't being replicated)
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    for (String table : tables) {
 +      writeSomeData(conn, table, 5, 5);
 +    }
 +
 +    // After writing data, still no replication table
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    for (String table : tables) {
 +      conn.tableOperations().compact(table, null, null, true, true);
 +    }
 +
 +    // After compacting data, still no replication table
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    for (String table : tables) {
 +      conn.tableOperations().delete(table);
 +    }
 +
 +    // After deleting tables, still no replication table
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +  }
 +
 +  @Test
 +  public void twoEntriesForTwoTables() throws Exception {
 +    Connector conn = getConnector();
 +    String table1 = "table1", table2 = "table2";
 +
 +    // replication shouldn't exist when we begin
 +    Assert.assertFalse("Replication table already online at the beginning of the test", ReplicationTable.isOnline(conn));
 +
 +    // Create two tables
 +    conn.tableOperations().create(table1);
 +    conn.tableOperations().create(table2);
 +    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
 +    // wait for permission to propagate
 +    Thread.sleep(5000);
 +
 +    // Enable replication on table1
 +    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +
 +    // Despite having replication on, we shouldn't have any need to write a record to it (and bring it online)
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    // Write some data to table1
 +    writeSomeData(conn, table1, 50, 50);
 +
 +    // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1
 +    // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
 +
 +    // After writing data, we'll get a replication table online
 +    boolean online = ReplicationTable.isOnline(conn);
 +    int attempts = 10;
 +    do {
 +      if (!online) {
 +        sleepUninterruptibly(5, TimeUnit.SECONDS);
 +        online = ReplicationTable.isOnline(conn);
 +        attempts--;
 +      }
 +    } while (!online && attempts > 0);
 +    Assert.assertTrue("Replication table did not exist", online);
 +
 +    Assert.assertTrue(ReplicationTable.isOnline(conn));
 +
 +    // Verify that we found a single replication record that's for table1
 +    Scanner s = ReplicationTable.getScanner(conn);
 +    StatusSection.limit(s);
 +    for (int i = 0; i < 5; i++) {
 +      if (Iterators.size(s.iterator()) == 1) {
 +        break;
 +      }
 +      Thread.sleep(1000);
 +    }
 +    Entry<Key,Value> entry = Iterators.getOnlyElement(s.iterator());
 +    // We should at least find one status record for this table, we might find a second if another log was started from ingesting the data
 +    Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey()
 +        .getColumnQualifier().toString());
 +    s.close();
 +
 +    // Enable replication on table2
 +    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 +
 +    // Write some data to table2
 +    writeSomeData(conn, table2, 50, 50);
 +
 +    // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2
 +    // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there
 +
 +    Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2));
 +    Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds);
 +
 +    List<Entry<Key,Value>> records = new ArrayList<>();
 +    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.setRange(MetadataSchema.ReplicationSection.getRange());
 +    for (Entry<Key,Value> metadata : s) {
 +      records.add(metadata);
 +      log.debug("Meta: {} => {}", metadata.getKey().toStringNoTruncate(), metadata.getValue().toString());
 +    }
 +
 +    Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size());
 +
 +    for (Entry<Key,Value> metadata : records) {
 +      Assert.assertTrue("Expected record to be in metadata but wasn't " + metadata.getKey().toStringNoTruncate() + ", tableIds remaining "
 +          + tableIdsForMetadata, tableIdsForMetadata.remove(metadata.getKey().getColumnQualifier().toString()));
 +    }
 +
 +    Assert.assertTrue("Expected that we had removed all metadata entries " + tableIdsForMetadata, tableIdsForMetadata.isEmpty());
 +
 +    // Should be creating these records in replication table from metadata table every second
 +    Thread.sleep(5000);
 +
 +    // Verify that we found two replication records: one for table1 and one for table2
 +    s = ReplicationTable.getScanner(conn);
 +    StatusSection.limit(s);
 +    Iterator<Entry<Key,Value>> iter = s.iterator();
 +    Assert.assertTrue("Found no records in replication table", iter.hasNext());
 +    entry = iter.next();
 +    Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString()));
 +    Assert.assertTrue("Expected to find two elements in replication table, only found one ", iter.hasNext());
 +    entry = iter.next();
 +    Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString()));
 +    Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
 +  }
 +
 +  private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception {
 +    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    for (int row = 0; row < rows; row++) {
 +      Mutation m = new Mutation(Integer.toString(row));
 +      for (int col = 0; col < cols; col++) {
 +        String value = Integer.toString(col);
 +        m.put(value, "", value);
 +      }
 +      bw.addMutation(m);
 +    }
 +    bw.close();
 +  }
 +
 +  @Test
 +  public void replicationEntriesPrecludeWalDeletion() throws Exception {
 +    final Connector conn = getConnector();
 +    String table1 = "table1", table2 = "table2", table3 = "table3";
 +    final Multimap<String,String> logs = HashMultimap.create();
 +    final AtomicBoolean keepRunning = new AtomicBoolean(true);
 +
 +    Thread t = new Thread(new Runnable() {
 +      @Override
 +      public void run() {
 +        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
 +        // when that happens
 +        while (keepRunning.get()) {
 +          try {
 +            logs.putAll(getAllLogs(conn));
 +          } catch (Exception e) {
 +            log.error("Error getting logs", e);
 +          }
 +        }
 +      }
 +
 +    });
 +
 +    t.start();
 +
 +    conn.tableOperations().create(table1);
 +    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    Thread.sleep(2000);
 +
 +    // Write some data to table1
 +    writeSomeData(conn, table1, 200, 500);
 +
 +    conn.tableOperations().create(table2);
 +    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    Thread.sleep(2000);
 +
 +    writeSomeData(conn, table2, 200, 500);
 +
 +    conn.tableOperations().create(table3);
 +    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    Thread.sleep(2000);
 +
 +    writeSomeData(conn, table3, 200, 500);
 +
 +    // Force a write to metadata for the data written
 +    for (String table : Arrays.asList(table1, table2, table3)) {
 +      conn.tableOperations().flush(table, null, null, true);
 +    }
 +
 +    keepRunning.set(false);
 +    t.join(5000);
 +
 +    // The master is only running every second to create records in the replication table from the metadata table
 +    // Sleep a sufficient amount of time to ensure that we get the straggling WALs that might have been created at the end
 +    Thread.sleep(5000);
 +
 +    Set<String> replFiles = getReferencesToFilesToBeReplicated(conn);
 +
 +    // We might have a WAL that was use solely for the replication table
 +    // We want to remove that from our list as it should not appear in the replication table
 +    String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
 +    Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
 +    while (observedLogs.hasNext()) {
 +      Entry<String,String> observedLog = observedLogs.next();
 +      if (replicationTableId.equals(observedLog.getValue())) {
 +        log.info("Removing {} because its tableId is for the replication table", observedLog);
 +        observedLogs.remove();
 +      }
 +    }
 +
 +    // We should have *some* reference to each log that was seen in the metadata table
 +    // They might not yet all be closed though (might be newfile)
 +    Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
 +    Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1);
 +
 +    final Configuration conf = new Configuration();
 +    for (String replFile : replFiles) {
 +      Path p = new Path(replFile);
 +      FileSystem fs = p.getFileSystem(conf);
 +      if (!fs.exists(p)) {
 +        // double-check: the garbage collector can be fast
 +        Set<String> currentSet = getReferencesToFilesToBeReplicated(conn);
 +        log.info("Current references {}", currentSet);
 +        log.info("Looking for reference to {}", replFile);
 +        log.info("Contains? {}", currentSet.contains(replFile));
 +        Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, !currentSet.contains(replFile));
 +      }
 +    }
 +  }
 +
 +  private Set<String> getReferencesToFilesToBeReplicated(final Connector conn) throws ReplicationTableOfflineException {
 +    Scanner s = ReplicationTable.getScanner(conn);
 +    StatusSection.limit(s);
 +    Set<String> replFiles = new HashSet<>();
 +    for (Entry<Key,Value> entry : s) {
 +      replFiles.add(entry.getKey().getRow().toString());
 +    }
 +    return replFiles;
 +  }
 +
 +  @Test
 +  public void combinerWorksOnMetadata() throws Exception {
 +    Connector conn = getConnector();
 +
 +    conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
 +
 +    ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
 +
 +    Status stat1 = StatusUtil.fileCreated(100);
 +    Status stat2 = StatusUtil.fileClosed();
 +
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
 +    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.setRange(ReplicationSection.getRange());
 +
 +    Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
 +    Assert.assertEquals(stat1, actual);
 +
 +    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
 +    m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.setRange(ReplicationSection.getRange());
 +
 +    actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
 +    Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
 +
 +    Assert.assertEquals(expected, actual);
 +  }
 +
 +  @Test
 +  public void noDeadlock() throws Exception {
 +    final Connector conn = getConnector();
 +
 +    ReplicationTable.setOnline(conn);
 +    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 +    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 +
 +    String table1 = "table1", table2 = "table2", table3 = "table3";
 +    conn.tableOperations().create(table1);
 +    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    conn.tableOperations().create(table2);
 +    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    conn.tableOperations().create(table3);
 +    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +
 +    writeSomeData(conn, table1, 200, 500);
 +
 +    writeSomeData(conn, table2, 200, 500);
 +
 +    writeSomeData(conn, table3, 200, 500);
 +
 +    // Flush everything to try to make the replication records
 +    for (String table : Arrays.asList(table1, table2, table3)) {
 +      conn.tableOperations().flush(table, null, null, true);
 +    }
 +
 +    // Flush everything to try to make the replication records
 +    for (String table : Arrays.asList(table1, table2, table3)) {
 +      conn.tableOperations().flush(table, null, null, true);
 +    }
 +
 +    for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
 +      Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator());
 +    }
 +  }
 +
 +  @Test
 +  public void filesClosedAfterUnused() throws Exception {
 +    Connector conn = getConnector();
 +
 +    String table = "table";
 +    conn.tableOperations().create(table);
 +    String tableId = conn.tableOperations().tableIdMap().get(table);
 +
 +    Assert.assertNotNull(tableId);
 +
 +    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
 +    conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +    // just sleep
 +    conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
 +        ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
 +
 +    // Write a mutation to make a log file
 +    BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    Mutation m = new Mutation("one");
 +    m.put("", "", "");
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    // Write another to make sure the logger rolls itself?
 +    bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    m = new Mutation("three");
 +    m.put("", "", "");
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
 +    s.setRange(TabletsSection.getRange(tableId));
 +    Set<String> wals = new HashSet<>();
 +    for (Entry<Key,Value> entry : s) {
 +      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
 +      wals.add(new Path(logEntry.filename).toString());
 +    }
 +
 +    log.warn("Found wals {}", wals);
 +
 +    bw = conn.createBatchWriter(table, new BatchWriterConfig());
 +    m = new Mutation("three");
 +    byte[] bytes = new byte[1024 * 1024];
 +    m.put("1".getBytes(), new byte[0], bytes);
 +    m.put("2".getBytes(), new byte[0], bytes);
 +    m.put("3".getBytes(), new byte[0], bytes);
 +    m.put("4".getBytes(), new byte[0], bytes);
 +    m.put("5".getBytes(), new byte[0], bytes);
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    conn.tableOperations().flush(table, null, null, true);
 +
 +    while (!ReplicationTable.isOnline(conn)) {
 +      sleepUninterruptibly(2, TimeUnit.SECONDS);
 +    }
 +
 +    for (int i = 0; i < 10; i++) {
 +      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      s.fetchColumnFamily(LogColumnFamily.NAME);
 +      s.setRange(TabletsSection.getRange(tableId));
 +      for (Entry<Key,Value> entry : s) {
 +        log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
 +      }
 +
 +      try {
 +        s = ReplicationTable.getScanner(conn);
 +        StatusSection.limit(s);
 +        Text buff = new Text();
 +        boolean allReferencedLogsClosed = true;
 +        int recordsFound = 0;
 +        for (Entry<Key,Value> e : s) {
 +          recordsFound++;
 +          allReferencedLogsClosed = true;
 +          StatusSection.getFile(e.getKey(), buff);
 +          String file = buff.toString();
 +          if (wals.contains(file)) {
 +            Status stat = Status.parseFrom(e.getValue().get());
 +            if (!stat.getClosed()) {
 +              log.info("{} wasn't closed", file);
 +              allReferencedLogsClosed = false;
 +            }
 +          }
 +        }
 +
 +        if (recordsFound > 0 && allReferencedLogsClosed) {
 +          return;
 +        }
 +        Thread.sleep(2000);
 +      } catch (RuntimeException e) {
 +        Throwable cause = e.getCause();
 +        if (cause instanceof AccumuloSecurityException) {
 +          AccumuloSecurityException ase = (AccumuloSecurityException) cause;
 +          switch (ase.getSecurityErrorCode()) {
 +            case PERMISSION_DENIED:
 +              // We tried to read the replication table before the GRANT went through
 +              Thread.sleep(2000);
 +              break;
 +            default:
 +              throw e;
 +          }
 +        }
 +      }
 +    }
 +
 +    Assert.fail("We had a file that was referenced but didn't get closed");
 +  }
 +
 +  @Test
 +  public void singleTableWithSingleTarget() throws Exception {
 +    // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
 +    // against expected Status messages.
 +    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 +
 +    Connector conn = getConnector();
 +    String table1 = "table1";
 +
 +    // replication shouldn't be online when we begin
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    // Create a table
 +    conn.tableOperations().create(table1);
 +
 +    int attempts = 10;
 +
 +    // Might think the table doesn't yet exist, retry
 +    while (attempts > 0) {
 +      try {
 +        // Enable replication on table1
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +        // Replicate table1 to cluster1 in the table with id of '4'
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
 +        // Sleep for 100 seconds before saying something is replicated
 +        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
 +            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
 +        break;
 +      } catch (Exception e) {
 +        attempts--;
 +        if (attempts <= 0) {
 +          throw e;
 +        }
 +        sleepUninterruptibly(2, TimeUnit.SECONDS);
 +      }
 +    }
 +
 +    // Write some data to table1
 +    writeSomeData(conn, table1, 2000, 50);
 +
 +    // Make sure the replication table is online at this point
 +    boolean online = ReplicationTable.isOnline(conn);
 +    attempts = 10;
 +    do {
 +      if (!online) {
 +        sleepUninterruptibly(2, TimeUnit.SECONDS);
 +        online = ReplicationTable.isOnline(conn);
 +        attempts--;
 +      }
 +    } while (!online && attempts > 0);
 +    Assert.assertTrue("Replication table was never created", online);
 +
 +    // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
 +    for (int i = 0; i < 10 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
 +      sleepUninterruptibly(2, TimeUnit.SECONDS);
 +    }
 +
 +    Assert.assertTrue("Combiner was never set on replication table",
 +        conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
 +
 +    // Trigger the minor compaction, waiting for it to finish.
 +    // This should write the entry to metadata that the file has data
 +    conn.tableOperations().flush(table1, null, null, true);
 +
 +    // Make sure that we have one status element, should be a new file
 +    Scanner s = ReplicationTable.getScanner(conn);
 +    StatusSection.limit(s);
 +    Entry<Key,Value> entry = null;
 +    Status expectedStatus = StatusUtil.openWithUnknownLength();
 +    attempts = 10;
 +    // This record will move from new to new with infinite length because of the minc (flush)
 +    while (null == entry && attempts > 0) {
 +      try {
 +        entry = Iterables.getOnlyElement(s);
 +        Status actual = Status.parseFrom(entry.getValue().get());
 +        if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
 +          entry = null;
 +          // the master process didn't yet fire and write the new mutation, wait for it to do
 +          // so and try to read it again
 +          Thread.sleep(1000);
 +        }
 +      } catch (NoSuchElementException e) {
 +        entry = null;
 +        Thread.sleep(500);
 +      } catch (IllegalArgumentException e) {
 +        // saw this contain 2 elements once
 +        s = ReplicationTable.getScanner(conn);
 +        StatusSection.limit(s);
 +        for (Entry<Key,Value> content : s) {
 +          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +        }
 +        throw e;
 +      } finally {
 +        attempts--;
 +      }
 +    }
 +
 +    Assert.assertNotNull("Could not find expected entry in replication table", entry);
 +    Status actual = Status.parseFrom(entry.getValue().get());
 +    Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual),
 +        !actual.getClosed() && actual.getInfiniteEnd());
 +
 +    // Try a couple of times to watch for the work record to be created
 +    boolean notFound = true;
 +    for (int i = 0; i < 10 && notFound; i++) {
 +      s = ReplicationTable.getScanner(conn);
 +      WorkSection.limit(s);
 +      int elementsFound = Iterables.size(s);
 +      if (0 < elementsFound) {
 +        Assert.assertEquals(1, elementsFound);
 +        notFound = false;
 +      }
 +      Thread.sleep(500);
 +    }
 +
 +    // If we didn't find the work record, print the contents of the table
 +    if (notFound) {
 +      s = ReplicationTable.getScanner(conn);
 +      for (Entry<Key,Value> content : s) {
 +        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +      }
 +      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
 +    }
 +
 +    // Write some more data so that we over-run the single WAL
 +    writeSomeData(conn, table1, 3000, 50);
 +
 +    log.info("Issued compaction for table");
 +    conn.tableOperations().compact(table1, null, null, true, true);
 +    log.info("Compaction completed");
 +
 +    // Master is creating entries in the replication table from the metadata table every second.
 +    // Compaction should trigger the record to be written to metadata. Wait a bit to ensure
 +    // that the master has time to work.
 +    Thread.sleep(5000);
 +
 +    s = ReplicationTable.getScanner(conn);
 +    StatusSection.limit(s);
 +    int numRecords = 0;
 +    for (Entry<Key,Value> e : s) {
 +      numRecords++;
 +      log.info("Found status record {}\t{}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
 +    }
 +
 +    Assert.assertEquals(2, numRecords);
 +
 +    // We should eventually get 2 work records recorded, need to account for a potential delay though
 +    // might see: status1 -> work1 -> status2 -> (our scans) -> work2
 +    notFound = true;
 +    for (int i = 0; i < 10 && notFound; i++) {
 +      s = ReplicationTable.getScanner(conn);
 +      WorkSection.limit(s);
 +      int elementsFound = Iterables.size(s);
 +      if (2 == elementsFound) {
 +        notFound = false;
 +      }
 +      Thread.sleep(500);
 +    }
 +
 +    // If we didn't find the work record, print the contents of the table
 +    if (notFound) {
 +      s = ReplicationTable.getScanner(conn);
 +      for (Entry<Key,Value> content : s) {
 +        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +      }
 +      Assert.assertFalse("Did not find the work entries for the status entries", notFound);
 +    }
 +  }
 +
 +  @Test
 +  public void correctClusterNameInWorkEntry() throws Exception {
 +    Connector conn = getConnector();
 +    String table1 = "table1";
 +
 +    // replication shouldn't be online when we begin
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    // Create two tables
 +    conn.tableOperations().create(table1);
 +
 +    int attempts = 5;
 +    while (attempts > 0) {
 +      try {
 +        // Enable replication on table1
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +        // Replicate table1 to cluster1 in the table with id of '4'
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
 +        attempts = 0;
 +      } catch (Exception e) {
 +        attempts--;
 +        if (attempts <= 0) {
 +          throw e;
 +        }
 +        sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +      }
 +    }
 +
 +    // Write some data to table1
 +    writeSomeData(conn, table1, 2000, 50);
 +    conn.tableOperations().flush(table1, null, null, true);
 +
 +    String tableId = conn.tableOperations().tableIdMap().get(table1);
 +    Assert.assertNotNull("Table ID was null", tableId);
 +
 +    // Make sure the replication table exists at this point
 +    boolean online = ReplicationTable.isOnline(conn);
 +    attempts = 5;
 +    do {
 +      if (!online) {
 +        sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +        online = ReplicationTable.isOnline(conn);
 +        attempts--;
 +      }
 +    } while (!online && attempts > 0);
 +    Assert.assertTrue("Replication table did not exist", online);
 +
 +    for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
 +      Thread.sleep(1000);
 +    }
 +
 +    Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
 +
 +    boolean notFound = true;
 +    Scanner s;
 +    for (int i = 0; i < 10 && notFound; i++) {
 +      s = ReplicationTable.getScanner(conn);
 +      WorkSection.limit(s);
 +      try {
 +        Entry<Key,Value> e = Iterables.getOnlyElement(s);
 +        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
 +        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
 +        notFound = false;
 +      } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
 +        s = ReplicationTable.getScanner(conn);
 +        for (Entry<Key,Value> content : s) {
 +          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +        }
 +        Assert.fail("Found more than one work section entry");
 +      }
 +
 +      Thread.sleep(500);
 +    }
 +
 +    if (notFound) {
 +      s = ReplicationTable.getScanner(conn);
 +      for (Entry<Key,Value> content : s) {
 +        log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +      }
 +      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
 +    }
 +  }
 +
 +  @Test
 +  public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
 +    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 +
 +    final Connector conn = getConnector();
 +
 +    ReplicationTable.setOnline(conn);
 +    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 +    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 +
 +    final AtomicBoolean keepRunning = new AtomicBoolean(true);
 +    final Set<String> metadataWals = new HashSet<>();
 +
 +    Thread t = new Thread(new Runnable() {
 +      @Override
 +      public void run() {
 +        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
 +        // when that happens
 +        while (keepRunning.get()) {
 +          try {
 +            metadataWals.addAll(getLogs(conn).keySet());
 +          } catch (Exception e) {
 +            log.error("Metadata table doesn't exist");
 +          }
 +        }
 +      }
 +
 +    });
 +
 +    t.start();
 +
 +    String table1 = "table1", table2 = "table2", table3 = "table3";
 +
 +    try {
 +      conn.tableOperations().create(table1);
 +      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +      conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
 +          ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
 +
 +      // Write some data to table1
 +      writeSomeData(conn, table1, 200, 500);
 +
 +      conn.tableOperations().create(table2);
 +      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
 +      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +
 +      writeSomeData(conn, table2, 200, 500);
 +
 +      conn.tableOperations().create(table3);
 +      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
 +      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 +
 +      writeSomeData(conn, table3, 200, 500);
 +
 +      // Flush everything to try to make the replication records
 +      for (String table : Arrays.asList(table1, table2, table3)) {
 +        conn.tableOperations().compact(table, null, null, true, true);
 +      }
 +    } finally {
 +      keepRunning.set(false);
 +      t.join(5000);
 +      Assert.assertFalse(t.isAlive());
 +    }
 +
 +    // Kill the tserver(s) and restart them
 +    // to ensure that the WALs we previously observed all move to closed.
 +    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
 +    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
 +
 +    // Make sure we can read all the tables (recovery complete)
 +    for (String table : Arrays.asList(table1, table2, table3)) {
 +      Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator());
 +    }
 +
 +    // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
 +    // in the metadata table, and then in the replication table
 +    Process gc = cluster.exec(SimpleGarbageCollector.class);
 +
 +    waitForGCLock(conn);
 +
 +    Thread.sleep(1000);
 +
 +    log.info("GC is up and should have had time to run at least once by now");
 +
 +    try {
 +      boolean allClosed = true;
 +
 +      // We should either find all closed records or no records
 +      // After they're closed, they are candidates for deletion
 +      for (int i = 0; i < 10; i++) {
 +        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
 +        Iterator<Entry<Key,Value>> iter = s.iterator();
 +
 +        long recordsFound = 0l;
 +        while (allClosed && iter.hasNext()) {
 +          Entry<Key,Value> entry = iter.next();
 +          String wal = entry.getKey().getRow().toString();
 +          if (metadataWals.contains(wal)) {
 +            Status status = Status.parseFrom(entry.getValue().get());
 +            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
 +            allClosed &= status.getClosed();
 +            recordsFound++;
 +          }
 +        }
 +
 +        log.info("Found {} records from the metadata table", recordsFound);
 +        if (allClosed) {
 +          break;
 +        }
 +
 +        sleepUninterruptibly(2, TimeUnit.SECONDS);
 +      }
 +
 +      if (!allClosed) {
 +        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +        s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
 +        for (Entry<Key,Value> entry : s) {
 +          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
 +        }
 +        Assert.fail("Expected all replication records in the metadata table to be closed");
 +      }
 +
 +      for (int i = 0; i < 10; i++) {
 +        allClosed = true;
 +
 +        Scanner s = ReplicationTable.getScanner(conn);
 +        Iterator<Entry<Key,Value>> iter = s.iterator();
 +
 +        long recordsFound = 0l;
 +        while (allClosed && iter.hasNext()) {
 +          Entry<Key,Value> entry = iter.next();
 +          String wal = entry.getKey().getRow().toString();
 +          if (metadataWals.contains(wal)) {
 +            Status status = Status.parseFrom(entry.getValue().get());
 +            log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
 +            allClosed &= status.getClosed();
 +            recordsFound++;
 +          }
 +        }
 +
 +        log.info("Found {} records from the replication table", recordsFound);
 +        if (allClosed) {
 +          break;
 +        }
 +
 +        sleepUninterruptibly(3, TimeUnit.SECONDS);
 +      }
 +
 +      if (!allClosed) {
 +        Scanner s = ReplicationTable.getScanner(conn);
 +        StatusSection.limit(s);
 +        for (Entry<Key,Value> entry : s) {
 +          log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
 +        }
 +        Assert.fail("Expected all replication records in the replication table to be closed");
 +      }
 +
 +    } finally {
 +      gc.destroy();
 +      gc.waitFor();
 +    }
 +
 +  }
 +
 +  @Test
 +  public void replicatedStatusEntriesAreDeleted() throws Exception {
 +    // Just stop it now, we'll restart it after we restart the tserver
 +    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
 +
 +    final Connector conn = getConnector();
 +    log.info("Got connector to MAC");
 +    String table1 = "table1";
 +
 +    // replication shouldn't be online when we begin
 +    Assert.assertFalse(ReplicationTable.isOnline(conn));
 +
 +    // Create two tables
 +    conn.tableOperations().create(table1);
 +
 +    int attempts = 5;
 +    while (attempts > 0) {
 +      try {
 +        // Enable replication on table1
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
 +        // Replicate table1 to cluster1 in the table with id of '4'
 +        conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4");
 +        // Use the MockReplicaSystem impl and sleep for 5seconds
 +        conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
 +            ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000"));
 +        attempts = 0;
 +      } catch (Exception e) {
 +        attempts--;
 +        if (attempts <= 0) {
 +          throw e;
 +        }
 +        sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +      }
 +    }
 +
 +    String tableId = conn.tableOperations().tableIdMap().get(table1);
 +    Assert.assertNotNull("Could not determine table id for " + table1, tableId);
 +
 +    // Write some data to table1
 +    writeSomeData(conn, table1, 2000, 50);
 +    conn.tableOperations().flush(table1, null, null, true);
 +
 +    // Make sure the replication table exists at this point
 +    boolean online = ReplicationTable.isOnline(conn);
 +    attempts = 10;
 +    do {
 +      if (!online) {
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +        online = ReplicationTable.isOnline(conn);
 +        attempts--;
 +      }
 +    } while (!online && attempts > 0);
 +    Assert.assertTrue("Replication table did not exist", online);
 +
 +    // Grant ourselves the write permission for later
 +    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
 +
 +    log.info("Checking for replication entries in replication");
 +    // Then we need to get those records over to the replication table
 +    Scanner s;
 +    Set<String> entries = new HashSet<>();
 +    for (int i = 0; i < 5; i++) {
 +      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      s.setRange(ReplicationSection.getRange());
 +      entries.clear();
 +      for (Entry<Key,Value> entry : s) {
 +        entries.add(entry.getKey().getRow().toString());
 +        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
 +      }
 +      if (!entries.isEmpty()) {
 +        log.info("Replication entries {}", entries);
 +        break;
 +      }
 +      Thread.sleep(1000);
 +    }
 +
 +    Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty());
 +
 +    // Find the WorkSection record that will be created for that data we ingested
 +    boolean notFound = true;
 +    for (int i = 0; i < 10 && notFound; i++) {
 +      try {
 +        s = ReplicationTable.getScanner(conn);
 +        WorkSection.limit(s);
 +        Entry<Key,Value> e = Iterables.getOnlyElement(s);
 +        log.info("Found entry: " + e.getKey().toStringNoTruncate());
 +        Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
 +        Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
 +        notFound = false;
 +      } catch (NoSuchElementException e) {
 +
 +      } catch (IllegalArgumentException e) {
 +        // Somehow we got more than one element. Log what they were
 +        s = ReplicationTable.getScanner(conn);
 +        for (Entry<Key,Value> content : s) {
 +          log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
 +        }
 +        Assert.fail("Found more than one work section entry");
 +      } catch (RuntimeException e) {
 +        // Catch a propagation issue, fail if it's not what we expect
 +        Throwable cause = e.getCause();
 +        if (cause instanceof AccumuloSecurityException) {
 +          AccumuloSecurityException sec = (AccumuloSecurityException) cause;
 +          switch (sec.getSecurityErrorCode()) {
 +            case PERMISSION_DENIED:
 +              // retry -- the grant didn't happen yet
 +              log.warn("Sleeping because permission was denied");
 +              break;
 +            default:
 +              throw e;
 +          }
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      Thread.sleep(2000);
 +    }
 +
 +    if (notFound) {
 +      s = ReplicationTable.getScanner(conn);
 +      for (Entry<Key,Value> content : s) {
 +        log.info(content.getKey().toStringNoTruncate() + " => " + ProtobufUtil.toString(Status.parseFrom(content.getValue().get())));
 +      }
 +      Assert.assertFalse("Did not find the work entry for the status entry", notFound);
 +    }
 +
 +    /**
 +     * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
 +     * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
 +     * anymore
 +     */
 +
 +    log.info("Killing tserver");
 +    // Kill the tserver(s) and restart them
 +    // to ensure that the WALs we previously observed all move to closed.
 +    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
 +
 +    log.info("Starting tserver");
 +    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
 +
 +    log.info("Waiting to read tables");
 +    sleepUninterruptibly(2 * 3, TimeUnit.SECONDS);
 +
 +    // Make sure we can read all the tables (recovery complete)
 +    for (String table : new String[] {MetadataTable.NAME, table1}) {
 +      Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator());
 +    }
 +
 +    log.info("Recovered metadata:");
 +    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    for (Entry<Key,Value> entry : s) {
 +      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
 +    }
 +
 +    cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
 +
 +    // Wait for a bit since the GC has to run (should be running after a one second delay)
 +    waitForGCLock(conn);
 +
 +    Thread.sleep(1000);
 +
 +    log.info("After GC");
 +    s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    for (Entry<Key,Value> entry : s) {
 +      log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
 +    }
 +
 +    // We expect no records in the metadata table after compaction. We have to poll
 +    // because we have to wait for the StatusMaker's next iteration which will clean
 +    // up the dangling *closed* records after we create the record in the replication table.
 +    // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
 +    log.info("Checking metadata table for replication entries");
 +    Set<String> remaining = new HashSet<>();
 +    for (int i = 0; i < 10; i++) {
 +      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +      s.setRange(ReplicationSection.getRange());
 +      remaining.clear();
 +      for (Entry<Key,Value> e : s) {
 +        remaining.add(e.getKey().getRow().toString());
 +      }
 +      remaining.retainAll(entries);
 +      if (remaining.isEmpty()) {
 +        break;
 +      }
 +      log.info("remaining {}", remaining);
 +      Thread.sleep(2000);
 +      log.info("");
 +    }
 +
 +    Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty());
 +
 +    /**
 +     * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
 +     * deleted after replication occurs
 +     */
 +
 +    int recordsFound = 0;
 +    for (int i = 0; i < 30; i++) {
 +      s = ReplicationTable.getScanner(conn);
 +      recordsFound = 0;
 +      for (Entry<Key,Value> entry : s) {
 +        recordsFound++;
 +        log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
 +      }
 +
 +      if (recordsFound <= 2) {
 +        break;
 +      } else {
 +        Thread.sleep(1000);
 +        log.info("");
 +      }
 +    }
 +
 +    Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/dc087b12/test/src/test/java/org/apache/accumulo/test/iterator/SummingCombinerTest.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/iterator/SummingCombinerTest.java
index 7bd06fe,0000000..cac2334
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/iterator/SummingCombinerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/iterator/SummingCombinerTest.java
@@@ -1,132 -1,0 +1,132 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to you under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.iterator;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
++import static java.util.Objects.requireNonNull;
 +
 +import java.util.List;
 +import java.util.Map.Entry;
- import java.util.Objects;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.Combiner;
 +import org.apache.accumulo.core.iterators.LongCombiner;
 +import org.apache.accumulo.core.iterators.user.SummingCombiner;
 +import org.apache.accumulo.iteratortest.IteratorTestCaseFinder;
 +import org.apache.accumulo.iteratortest.IteratorTestInput;
 +import org.apache.accumulo.iteratortest.IteratorTestOutput;
 +import org.apache.accumulo.iteratortest.junit4.BaseJUnit4IteratorTest;
 +import org.apache.accumulo.iteratortest.testcases.IteratorTestCase;
 +import org.junit.runners.Parameterized.Parameters;
 +
 +/**
 + * Iterator test harness tests for SummingCombiner
 + */
 +public class SummingCombinerTest extends BaseJUnit4IteratorTest {
 +
 +  @Parameters
 +  public static Object[][] parameters() {
 +    IteratorTestInput input = getIteratorInput();
 +    IteratorTestOutput output = getIteratorOutput();
 +    List<IteratorTestCase> tests = IteratorTestCaseFinder.findAllTestCases();
 +    return BaseJUnit4IteratorTest.createParameters(input, output, tests);
 +  }
 +
 +  private static final TreeMap<Key,Value> INPUT_DATA = createInputData();
 +  private static final TreeMap<Key,Value> OUTPUT_DATA = createOutputData();
 +
 +  private static TreeMap<Key,Value> createInputData() {
 +    TreeMap<Key,Value> data = new TreeMap<>();
 +
 +    // 3
 +    data.put(new Key("1", "a", "a", 1), new Value(bytes("1")));
 +    data.put(new Key("1", "a", "a", 5), new Value(bytes("1")));
 +    data.put(new Key("1", "a", "a", 10), new Value(bytes("1")));
 +    // 7
 +    data.put(new Key("1", "a", "b", 1), new Value(bytes("5")));
 +    data.put(new Key("1", "a", "b", 5), new Value(bytes("2")));
 +    // 0
 +    data.put(new Key("1", "a", "f", 1), new Value(bytes("0")));
 +    // -10
 +    data.put(new Key("1", "a", "g", 5), new Value(bytes("1")));
 +    data.put(new Key("1", "a", "g", 10), new Value(bytes("-11")));
 +    // -5
 +    data.put(new Key("1", "b", "d", 10), new Value(bytes("-5")));
 +    // MAX_VALUE
 +    data.put(new Key("1", "b", "e", 10), new Value(bytes(Long.toString(Long.MAX_VALUE))));
 +    // MIN_VALUE
 +    data.put(new Key("1", "d", "d", 10), new Value(bytes(Long.toString(Long.MIN_VALUE))));
 +    // 30
 +    data.put(new Key("2", "a", "a", 1), new Value(bytes("5")));
 +    data.put(new Key("2", "a", "a", 5), new Value(bytes("10")));
 +    data.put(new Key("2", "a", "a", 10), new Value(bytes("15")));
 +
 +    return data;
 +  }
 +
 +  private static final byte[] bytes(String value) {
-     return Objects.requireNonNull(value).getBytes(UTF_8);
++    return requireNonNull(value).getBytes(UTF_8);
 +  }
 +
 +  private static TreeMap<Key,Value> createOutputData() {
 +    TreeMap<Key,Value> data = new TreeMap<>();
 +
 +    Key lastKey = null;
 +    long sum = 0;
 +    for (Entry<Key,Value> entry : INPUT_DATA.entrySet()) {
 +      if (null == lastKey) {
 +        lastKey = entry.getKey();
 +        sum += Long.parseLong(entry.getValue().toString());
 +      } else {
 +        if (0 != lastKey.compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
 +          // Different key, store the running sum.
 +          data.put(lastKey, new Value(Long.toString(sum).getBytes(UTF_8)));
 +          // Reset lastKey and the sum
 +          lastKey = entry.getKey();
 +          sum = 0;
 +        }
 +
 +        sum += Long.parseLong(entry.getValue().toString());
 +      }
 +    }
 +
 +    data.put(lastKey, new Value(Long.toString(sum).getBytes(UTF_8)));
 +
 +    return data;
 +  }
 +
 +  private static IteratorTestInput getIteratorInput() {
 +    IteratorSetting setting = new IteratorSetting(50, SummingCombiner.class);
 +    LongCombiner.setEncodingType(setting, LongCombiner.Type.STRING);
 +    Combiner.setCombineAllColumns(setting, true);
 +    return new IteratorTestInput(SummingCombiner.class, setting.getOptions(), new Range(), INPUT_DATA);
 +  }
 +
 +  private static IteratorTestOutput getIteratorOutput() {
 +    return new IteratorTestOutput(OUTPUT_DATA);
 +  }
 +
 +  public SummingCombinerTest(IteratorTestInput input, IteratorTestOutput expectedOutput, IteratorTestCase testCase) {
 +    super(input, expectedOutput, testCase);
 +  }
 +
 +}


Mime
View raw message