accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [30/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar, stop building test jar
Date Thu, 04 Jun 2015 18:53:11 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
new file mode 100644
index 0000000..4ef2958
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.PrintInfo;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MonitorUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestMultiTableIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Iterators;
+
+public class ReadWriteIT extends AccumuloClusterHarness {
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class);
+
+  static final int ROWS = 200000;
+  static final int COLS = 1;
+  static final String COLF = "colf";
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 6 * 60;
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void invalidInstanceName() throws Exception {
+    final Connector conn = getConnector();
+    new ZooKeeperInstance("fake_instance_name", conn.getInstance().getZooKeepers());
+  }
+
+  @Test
+  public void sunnyDay() throws Exception {
+    // Start accumulo, create a table, insert some data, verify we can read it out.
+    // Shutdown cleanly.
+    log.debug("Starting Monitor");
+    cluster.getClusterControl().startAllServers(ServerType.MONITOR);
+    Connector connector = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName);
+    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS, COLS, 50, 0, tableName);
+    String monitorLocation = null;
+    while (null == monitorLocation) {
+      monitorLocation = MonitorUtil.getLocation(getConnector().getInstance());
+      if (null == monitorLocation) {
+        log.debug("Could not fetch monitor HTTP address from zookeeper");
+        Thread.sleep(2000);
+      }
+    }
+    URL url = new URL("http://" + monitorLocation);
+    log.debug("Fetching web page " + url);
+    String result = FunctionalTestUtils.readAll(url.openStream());
+    assertTrue(result.length() > 100);
+    log.debug("Stopping accumulo cluster");
+    ClusterControl control = cluster.getClusterControl();
+    control.adminStopAll();
+    ZooReader zreader = new ZooReader(connector.getInstance().getZooKeepers(), connector.getInstance().getZooKeepersSessionTimeOut());
+    ZooCache zcache = new ZooCache(zreader, null);
+    byte[] masterLockData;
+    do {
+      masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(connector.getInstance()) + Constants.ZMASTER_LOCK, null);
+      if (null != masterLockData) {
+        log.info("Master lock is still held");
+        Thread.sleep(1000);
+      }
+    } while (null != masterLockData);
+
+    control.stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    control.stopAllServers(ServerType.MONITOR);
+    control.stopAllServers(ServerType.TRACER);
+    log.debug("success!");
+    // Restarting everything
+    cluster.start();
+  }
+
+  public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName)
+      throws Exception {
+    ingest(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName);
+  }
+
+  public static void ingest(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf,
+      String tableName) throws Exception {
+    TestIngest.Opts opts = new TestIngest.Opts();
+    opts.rows = rows;
+    opts.cols = cols;
+    opts.dataSize = width;
+    opts.startRow = offset;
+    opts.columnFamily = colf;
+    opts.createTable = true;
+    opts.setTableName(tableName);
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(clientConfig);
+    } else {
+      opts.setPrincipal(principal);
+    }
+
+    TestIngest.ingest(connector, opts, new BatchWriterOpts());
+  }
+
+  public static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String tableName)
+      throws Exception {
+    verify(connector, clientConfig, principal, rows, cols, width, offset, COLF, tableName);
+  }
+
+  private static void verify(Connector connector, ClientConfiguration clientConfig, String principal, int rows, int cols, int width, int offset, String colf,
+      String tableName) throws Exception {
+    ScannerOpts scannerOpts = new ScannerOpts();
+    VerifyIngest.Opts opts = new VerifyIngest.Opts();
+    opts.rows = rows;
+    opts.cols = cols;
+    opts.dataSize = width;
+    opts.startRow = offset;
+    opts.columnFamily = colf;
+    opts.setTableName(tableName);
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(clientConfig);
+    } else {
+      opts.setPrincipal(principal);
+    }
+
+    VerifyIngest.verifyIngest(connector, opts, scannerOpts);
+  }
+
+  public static String[] args(String... args) {
+    return args;
+  }
+
+  @Test
+  public void multiTableTest() throws Exception {
+    // Write to multiple tables
+    final String instance = cluster.getInstanceName();
+    final String keepers = cluster.getZooKeepers();
+    final ClusterControl control = cluster.getClusterControl();
+    final String prefix = getClass().getSimpleName() + "_" + testName.getMethodName();
+    ExecutorService svc = Executors.newFixedThreadPool(2);
+    Future<Integer> p1 = svc.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() {
+        try {
+          ClientConfiguration clientConf = cluster.getClientConfig();
+          // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk)
+          // Need to pass along the keytab because of that.
+          if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+            String principal = getAdminPrincipal();
+            AuthenticationToken token = getAdminToken();
+            assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken);
+            KerberosToken kt = (KerberosToken) token;
+            assertNotNull("Expected keytab in token", kt.getKeytab());
+            return control.exec(
+                TestMultiTableIngest.class,
+                args("--count", Integer.toString(ROWS), "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab().getAbsolutePath(),
+                    "-u", principal));
+          }
+
+          return control.exec(
+              TestMultiTableIngest.class,
+              args("--count", Integer.toString(ROWS), "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String(
+                  ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix));
+        } catch (IOException e) {
+          log.error("Error running MultiTableIngest", e);
+          return -1;
+        }
+      }
+    });
+    Future<Integer> p2 = svc.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() {
+        try {
+          ClientConfiguration clientConf = cluster.getClientConfig();
+          // Invocation is different for SASL. We're only logged in via this processes memory (not via some credentials cache on disk)
+          // Need to pass along the keytab because of that.
+          if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+            String principal = getAdminPrincipal();
+            AuthenticationToken token = getAdminToken();
+            assertTrue("Expected KerberosToken, but was " + token.getClass(), token instanceof KerberosToken);
+            KerberosToken kt = (KerberosToken) token;
+            assertNotNull("Expected keytab in token", kt.getKeytab());
+            return control.exec(
+                TestMultiTableIngest.class,
+                args("--count", Integer.toString(ROWS), "--readonly", "-i", instance, "-z", keepers, "--tablePrefix", prefix, "--keytab", kt.getKeytab()
+                    .getAbsolutePath(), "-u", principal));
+          }
+
+          return control.exec(
+              TestMultiTableIngest.class,
+              args("--count", Integer.toString(ROWS), "--readonly", "-u", getAdminPrincipal(), "-i", instance, "-z", keepers, "-p", new String(
+                  ((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8), "--tablePrefix", prefix));
+        } catch (IOException e) {
+          log.error("Error running MultiTableIngest", e);
+          return -1;
+        }
+      }
+    });
+    svc.shutdown();
+    while (!svc.isTerminated()) {
+      svc.awaitTermination(15, TimeUnit.SECONDS);
+    }
+    assertEquals(0, p1.get().intValue());
+    assertEquals(0, p2.get().intValue());
+  }
+
+  @Test
+  public void largeTest() throws Exception {
+    // write a few large values
+    Connector connector = getConnector();
+    String table = getUniqueNames(1)[0];
+    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table);
+    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2, 1, 500000, 0, table);
+  }
+
+  @Test
+  public void interleaved() throws Exception {
+    // read and write concurrently
+    final Connector connector = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    interleaveTest(connector, tableName);
+  }
+
+  static void interleaveTest(final Connector connector, final String tableName) throws Exception {
+    final AtomicBoolean fail = new AtomicBoolean(false);
+    final int CHUNKSIZE = ROWS / 10;
+    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, 0, tableName);
+    int i;
+    for (i = 0; i < ROWS; i += CHUNKSIZE) {
+      final int start = i;
+      Thread verify = new Thread() {
+        @Override
+        public void run() {
+          try {
+            verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, start, tableName);
+          } catch (Exception ex) {
+            fail.set(true);
+          }
+        }
+      };
+      verify.start();
+      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i + CHUNKSIZE, tableName);
+      verify.join();
+      assertFalse(fail.get());
+    }
+    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), CHUNKSIZE, 1, 50, i, tableName);
+  }
+
+  public static Text t(String s) {
+    return new Text(s);
+  }
+
+  public static Mutation m(String row, String cf, String cq, String value) {
+    Mutation m = new Mutation(t(row));
+    m.put(t(cf), t(cq), new Value(value.getBytes()));
+    return m;
+  }
+
+  @Test
+  public void localityGroupPerf() throws Exception {
+    // verify that locality groups can make look-ups faster
+    final Connector connector = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    connector.tableOperations().setProperty(tableName, "table.group.g1", "colf");
+    connector.tableOperations().setProperty(tableName, "table.groups.enabled", "g1");
+    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName);
+    connector.tableOperations().compact(tableName, null, null, true, true);
+    BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+    bw.addMutation(m("zzzzzzzzzzz", "colf2", "cq", "value"));
+    bw.close();
+    long now = System.currentTimeMillis();
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(new Text("colf"));
+    Iterators.size(scanner.iterator());
+    long diff = System.currentTimeMillis() - now;
+    now = System.currentTimeMillis();
+    scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    scanner.fetchColumnFamily(new Text("colf2"));
+    Iterators.size(scanner.iterator());
+    bw.close();
+    long diff2 = System.currentTimeMillis() - now;
+    assertTrue(diff2 < diff);
+  }
+
+  @Test
+  public void sunnyLG() throws Exception {
+    // create a locality group, write to it and ensure it exists in the RFiles that result
+    final Connector connector = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>();
+    groups.put("g1", Collections.singleton(t("colf")));
+    connector.tableOperations().setLocalityGroups(tableName, groups);
+    ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName);
+    verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName);
+    connector.tableOperations().flush(tableName, null, null, true);
+    BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1);
+    String tableId = connector.tableOperations().tableIdMap().get(tableName);
+    bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<"))));
+    bscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    boolean foundFile = false;
+    for (Entry<Key,Value> entry : bscanner) {
+      foundFile = true;
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      PrintStream newOut = new PrintStream(baos);
+      PrintStream oldOut = System.out;
+      try {
+        System.setOut(newOut);
+        List<String> args = new ArrayList<>();
+        args.add(entry.getKey().getColumnQualifier().toString());
+        if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+          args.add("--config");
+          StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster;
+          String hadoopConfDir = sac.getHadoopConfDir();
+          args.add(new Path(hadoopConfDir, "core-site.xml").toString());
+          args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString());
+        }
+        log.info("Invoking PrintInfo with " + args);
+        PrintInfo.main(args.toArray(new String[args.size()]));
+        newOut.flush();
+        String stdout = baos.toString();
+        assertTrue(stdout.contains("Locality group         : g1"));
+        assertTrue(stdout.contains("families      : [colf]"));
+      } finally {
+        newOut.close();
+        System.setOut(oldOut);
+      }
+    }
+    bscanner.close();
+    assertTrue(foundFile);
+  }
+
+  @Test
+  public void localityGroupChange() throws Exception {
+    // Make changes to locality groups and ensure nothing is lostssh
+    final Connector connector = getConnector();
+    String table = getUniqueNames(1)[0];
+    TableOperations to = connector.tableOperations();
+    to.create(table);
+    String[] config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf,xyz;lg2:c1,c2"};
+    int i = 0;
+    for (String cfg : config) {
+      to.setLocalityGroups(table, getGroups(cfg));
+      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * (i + 1), 1, 50, ROWS * i, table);
+      to.flush(table, null, null, true);
+      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 0, 1, 50, ROWS * (i + 1), table);
+      i++;
+    }
+    to.delete(table);
+    to.create(table);
+    config = new String[] {"lg1:colf", null, "lg1:colf,xyz", "lg1:colf;lg2:colf",};
+    i = 1;
+    for (String cfg : config) {
+      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table);
+      ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table);
+      to.setLocalityGroups(table, getGroups(cfg));
+      to.flush(table, null, null, true);
+      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, table);
+      verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), ROWS * i, 1, 50, 0, "xyz", table);
+      i++;
+    }
+  }
+
+  private Map<String,Set<Text>> getGroups(String cfg) {
+    Map<String,Set<Text>> groups = new TreeMap<String,Set<Text>>();
+    if (cfg != null) {
+      for (String group : cfg.split(";")) {
+        String[] parts = group.split(":");
+        Set<Text> cols = new HashSet<Text>();
+        for (String col : parts[1].split(",")) {
+          cols.add(t(col));
+        }
+        groups.put(parts[1], cols);
+      }
+    }
+    return groups;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
new file mode 100644
index 0000000..0408aa0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.CreateEmpty;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * XXX As a part of verifying lossy recovery via inserting an empty rfile, this test deletes test table tablets. This will require write access to the backing
+ * files of the test Accumulo mini cluster.
+ *
+ * This test should read the file location from the test harness and that file should be on the local filesystem. If you want to take a paranoid approach just
+ * make sure the test user doesn't have write access to the HDFS files of any colocated live Accumulo instance or any important local filesystem files..
+ */
+public class RecoveryWithEmptyRFileIT extends ConfigurableMacBase {
+  private static final Logger log = LoggerFactory.getLogger(RecoveryWithEmptyRFileIT.class);
+
+  static final int ROWS = 200000;
+  static final int COLS = 1;
+  static final String COLF = "colf";
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.useMiniDFS(true);
+  }
+
+  @Test
+  public void replaceMissingRFile() throws Exception {
+    log.info("Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan.");
+    Connector connector = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    ReadWriteIT.ingest(connector, cluster.getClientConfig(), "root", ROWS, COLS, 50, 0, tableName);
+    ReadWriteIT.verify(connector, cluster.getClientConfig(), "root", ROWS, COLS, 50, 0, tableName);
+
+    connector.tableOperations().flush(tableName, null, null, true);
+    connector.tableOperations().offline(tableName, true);
+
+    log.debug("Replacing rfile(s) with empty");
+    Scanner meta = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    String tableId = connector.tableOperations().tableIdMap().get(tableName);
+    meta.setRange(new Range(new Text(tableId + ";"), new Text(tableId + "<")));
+    meta.fetchColumnFamily(DataFileColumnFamily.NAME);
+    boolean foundFile = false;
+    for (Entry<Key,Value> entry : meta) {
+      foundFile = true;
+      Path rfile = new Path(entry.getKey().getColumnQualifier().toString());
+      log.debug("Removing rfile '" + rfile + "'");
+      cluster.getFileSystem().delete(rfile, false);
+      Process info = cluster.exec(CreateEmpty.class, rfile.toString());
+      assertEquals(0, info.waitFor());
+    }
+    meta.close();
+    assertTrue(foundFile);
+
+    log.trace("invalidate cached file handles by issuing a compaction");
+    connector.tableOperations().online(tableName, true);
+    connector.tableOperations().compact(tableName, null, null, false, true);
+
+    log.debug("make sure we can still scan");
+    Scanner scan = connector.createScanner(tableName, Authorizations.EMPTY);
+    scan.setRange(new Range());
+    long cells = 0l;
+    for (Entry<Key,Value> entry : scan) {
+      if (entry != null)
+        cells++;
+    }
+    scan.close();
+    assertEquals(0l, cells);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java
new file mode 100644
index 0000000..a8c5bca
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.balancer.RegexGroupBalancer;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+public class RegexGroupBalanceIT extends ConfigurableMacBase {
+
+  @Override
+  public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {
+    cfg.setNumTservers(4);
+  }
+
+  @Test(timeout = 120000)
+  public void testBalancing() throws Exception {
+    Connector conn = getConnector();
+    String tablename = getUniqueNames(1)[0];
+    conn.tableOperations().create(tablename);
+
+    SortedSet<Text> splits = new TreeSet<>();
+    splits.add(new Text("01a"));
+    splits.add(new Text("01m"));
+    splits.add(new Text("01z"));
+
+    splits.add(new Text("02a"));
+    splits.add(new Text("02f"));
+    splits.add(new Text("02r"));
+    splits.add(new Text("02z"));
+
+    splits.add(new Text("03a"));
+    splits.add(new Text("03f"));
+    splits.add(new Text("03m"));
+    splits.add(new Text("03r"));
+
+    conn.tableOperations().setProperty(tablename, RegexGroupBalancer.REGEX_PROPERTY, "(\\d\\d).*");
+    conn.tableOperations().setProperty(tablename, RegexGroupBalancer.DEFAUT_GROUP_PROPERTY, "03");
+    conn.tableOperations().setProperty(tablename, RegexGroupBalancer.WAIT_TIME_PROPERTY, "50ms");
+    conn.tableOperations().setProperty(tablename, Property.TABLE_LOAD_BALANCER.getKey(), RegexGroupBalancer.class.getName());
+
+    conn.tableOperations().addSplits(tablename, splits);
+
+    while (true) {
+      Thread.sleep(250);
+
+      Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename);
+
+      boolean allGood = true;
+      allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 3);
+      allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4);
+      allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4);
+      allGood &= checkTabletsPerTserver(groupLocationCounts, 3, 3, 4);
+
+      if (allGood) {
+        break;
+      }
+    }
+
+    splits.clear();
+    splits.add(new Text("01b"));
+    splits.add(new Text("01f"));
+    splits.add(new Text("01l"));
+    splits.add(new Text("01r"));
+    conn.tableOperations().addSplits(tablename, splits);
+
+    while (true) {
+      Thread.sleep(250);
+
+      Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename);
+
+      boolean allGood = true;
+      allGood &= checkGroup(groupLocationCounts, "01", 1, 2, 4);
+      allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4);
+      allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4);
+      allGood &= checkTabletsPerTserver(groupLocationCounts, 4, 4, 4);
+
+      if (allGood) {
+        break;
+      }
+    }
+
+    // merge group 01 down to one tablet
+    conn.tableOperations().merge(tablename, null, new Text("01z"));
+
+    while (true) {
+      Thread.sleep(250);
+
+      Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename);
+
+      boolean allGood = true;
+      allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 1);
+      allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4);
+      allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4);
+      allGood &= checkTabletsPerTserver(groupLocationCounts, 2, 3, 4);
+
+      if (allGood) {
+        break;
+      }
+    }
+  }
+
+  private boolean checkTabletsPerTserver(Table<String,String,MutableInt> groupLocationCounts, int minTabletPerTserver, int maxTabletsPerTserver,
+      int totalTservser) {
+    // check that each tserver has between min and max tablets
+    for (Map<String,MutableInt> groups : groupLocationCounts.columnMap().values()) {
+      int sum = 0;
+      for (MutableInt mi : groups.values()) {
+        sum += mi.intValue();
+      }
+
+      if (sum < minTabletPerTserver || sum > maxTabletsPerTserver) {
+        return false;
+      }
+    }
+
+    return groupLocationCounts.columnKeySet().size() == totalTservser;
+  }
+
+  private boolean checkGroup(Table<String,String,MutableInt> groupLocationCounts, String group, int min, int max, int tsevers) {
+    Collection<MutableInt> counts = groupLocationCounts.row(group).values();
+    if (counts.size() == 0) {
+      return min == 0 && max == 0 && tsevers == 0;
+    }
+    return min == Collections.min(counts).intValue() && max == Collections.max(counts).intValue() && counts.size() == tsevers;
+  }
+
+  private Table<String,String,MutableInt> getCounts(Connector conn, String tablename) throws TableNotFoundException {
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+    String tableId = conn.tableOperations().tableIdMap().get(tablename);
+    s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+
+    Table<String,String,MutableInt> groupLocationCounts = HashBasedTable.create();
+
+    for (Entry<Key,Value> entry : s) {
+      String group = entry.getKey().getRow().toString();
+      if (group.endsWith("<")) {
+        group = "03";
+      } else {
+        group = group.substring(tableId.length() + 1).substring(0, 2);
+      }
+      String loc = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()).toString();
+
+      MutableInt count = groupLocationCounts.get(group, loc);
+      if (count == null) {
+        count = new MutableInt(0);
+        groupLocationCounts.put(group, loc, count);
+      }
+
+      count.increment();
+    }
+    return groupLocationCounts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
new file mode 100644
index 0000000..0c22196
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.junit.Test;
+
+public class RenameIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void renameTest() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String name1 = tableNames[0];
+    String name2 = tableNames[1];
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    TestIngest.Opts opts = new TestIngest.Opts();
+    opts.createTable = true;
+    opts.setTableName(name1);
+
+    final ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(clientConfig);
+    } else {
+      opts.setPrincipal(getAdminPrincipal());
+    }
+
+    Connector c = getConnector();
+    TestIngest.ingest(c, opts, bwOpts);
+    c.tableOperations().rename(name1, name2);
+    TestIngest.ingest(c, opts, bwOpts);
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      vopts.updateKerberosCredentials(clientConfig);
+    } else {
+      vopts.setPrincipal(getAdminPrincipal());
+    }
+
+    vopts.setTableName(name2);
+    VerifyIngest.verifyIngest(c, vopts, scanOpts);
+    c.tableOperations().delete(name1);
+    c.tableOperations().rename(name2, name1);
+    vopts.setTableName(name1);
+    VerifyIngest.verifyIngest(c, vopts, scanOpts);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
new file mode 100644
index 0000000..39e9bed
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+public class RestartIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(RestartIT.class);
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  private static final ScannerOpts SOPTS = new ScannerOpts();
+  private static final VerifyIngest.Opts VOPTS = new VerifyIngest.Opts();
+  private static final TestIngest.Opts OPTS = new TestIngest.Opts();
+  private static final BatchWriterOpts BWOPTS = new BatchWriterOpts();
+  static {
+    OPTS.rows = VOPTS.rows = 10 * 1000;
+  }
+
+  private ExecutorService svc;
+
+  @Before
+  public void setup() throws Exception {
+    svc = Executors.newFixedThreadPool(1);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (null == svc) {
+      return;
+    }
+
+    if (!svc.isShutdown()) {
+      svc.shutdown();
+    }
+
+    while (!svc.awaitTermination(10, TimeUnit.SECONDS)) {
+      log.info("Waiting for threadpool to terminate");
+    }
+  }
+
+  @Test
+  public void restartMaster() throws Exception {
+    Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    OPTS.setTableName(tableName);
+    VOPTS.setTableName(tableName);
+    c.tableOperations().create(tableName);
+    final AuthenticationToken token = getAdminToken();
+    final ClusterControl control = getCluster().getClusterControl();
+
+    final String[] args;
+    if (token instanceof PasswordToken) {
+      byte[] password = ((PasswordToken) token).getPassword();
+      args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName};
+      OPTS.setPrincipal(getAdminPrincipal());
+      VOPTS.setPrincipal(getAdminPrincipal());
+    } else if (token instanceof KerberosToken) {
+      ClusterUser rootUser = getAdminUser();
+      args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", "" + OPTS.rows, "--table", tableName};
+      ClientConfiguration clientConfig = cluster.getClientConfig();
+      OPTS.updateKerberosCredentials(clientConfig);
+      VOPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      throw new RuntimeException("Unknown token");
+    }
+
+    Future<Integer> ret = svc.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() {
+        try {
+          return control.exec(TestIngest.class, args);
+        } catch (IOException e) {
+          log.error("Error running TestIngest", e);
+          return -1;
+        }
+      }
+    });
+
+    control.stopAllServers(ServerType.MASTER);
+    control.startAllServers(ServerType.MASTER);
+    assertEquals(0, ret.get().intValue());
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+  }
+
+  @Test
+  public void restartMasterRecovery() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    OPTS.setTableName(tableName);
+    VOPTS.setTableName(tableName);
+    ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      OPTS.updateKerberosCredentials(clientConfig);
+      VOPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      OPTS.setPrincipal(getAdminPrincipal());
+      VOPTS.setPrincipal(getAdminPrincipal());
+    }
+    TestIngest.ingest(c, OPTS, BWOPTS);
+    ClusterControl control = getCluster().getClusterControl();
+
+    // TODO implement a kill all too?
+    // cluster.stop() would also stop ZooKeeper
+    control.stopAllServers(ServerType.MASTER);
+    control.stopAllServers(ServerType.TRACER);
+    control.stopAllServers(ServerType.TABLET_SERVER);
+    control.stopAllServers(ServerType.GARBAGE_COLLECTOR);
+    control.stopAllServers(ServerType.MONITOR);
+
+    ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut());
+    ZooCache zcache = new ZooCache(zreader, null);
+    byte[] masterLockData;
+    do {
+      masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
+      if (null != masterLockData) {
+        log.info("Master lock is still held");
+        Thread.sleep(1000);
+      }
+    } while (null != masterLockData);
+
+    cluster.start();
+    UtilWaitThread.sleep(5);
+    control.stopAllServers(ServerType.MASTER);
+
+    masterLockData = new byte[0];
+    do {
+      masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
+      if (null != masterLockData) {
+        log.info("Master lock is still held");
+        Thread.sleep(1000);
+      }
+    } while (null != masterLockData);
+    cluster.start();
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+  }
+
+  @Test
+  public void restartMasterSplit() throws Exception {
+    Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    final AuthenticationToken token = getAdminToken();
+    final ClusterControl control = getCluster().getClusterControl();
+    VOPTS.setTableName(tableName);
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+
+    final String[] args;
+    if (token instanceof PasswordToken) {
+      byte[] password = ((PasswordToken) token).getPassword();
+      args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName};
+      OPTS.setPrincipal(getAdminPrincipal());
+      VOPTS.setPrincipal(getAdminPrincipal());
+    } else if (token instanceof KerberosToken) {
+      ClusterUser rootUser = getAdminUser();
+      args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", Integer.toString(VOPTS.rows), "--table", tableName};
+      ClientConfiguration clientConfig = cluster.getClientConfig();
+      OPTS.updateKerberosCredentials(clientConfig);
+      VOPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      throw new RuntimeException("Unknown token");
+    }
+
+    Future<Integer> ret = svc.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() {
+        try {
+          return control.exec(TestIngest.class, args);
+        } catch (Exception e) {
+          log.error("Error running TestIngest", e);
+          return -1;
+        }
+      }
+    });
+
+    control.stopAllServers(ServerType.MASTER);
+
+    ZooReader zreader = new ZooReader(c.getInstance().getZooKeepers(), c.getInstance().getZooKeepersSessionTimeOut());
+    ZooCache zcache = new ZooCache(zreader, null);
+    byte[] masterLockData;
+    do {
+      masterLockData = ZooLock.getLockData(zcache, ZooUtil.getRoot(c.getInstance()) + Constants.ZMASTER_LOCK, null);
+      if (null != masterLockData) {
+        log.info("Master lock is still held");
+        Thread.sleep(1000);
+      }
+    } while (null != masterLockData);
+
+    cluster.start();
+    assertEquals(0, ret.get().intValue());
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+  }
+
+  @Test
+  public void killedTabletServer() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    OPTS.setTableName(tableName);
+    VOPTS.setTableName(tableName);
+    ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      OPTS.updateKerberosCredentials(clientConfig);
+      VOPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      OPTS.setPrincipal(getAdminPrincipal());
+      VOPTS.setPrincipal(getAdminPrincipal());
+    }
+    TestIngest.ingest(c, OPTS, BWOPTS);
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+    cluster.getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    cluster.start();
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+  }
+
+  @Test
+  public void killedTabletServer2() throws Exception {
+    final Connector c = getConnector();
+    final String[] names = getUniqueNames(2);
+    final String tableName = names[0];
+    final ClusterControl control = getCluster().getClusterControl();
+    c.tableOperations().create(tableName);
+    // Original test started and then stopped a GC. Not sure why it did this. The GC was
+    // already running by default, and it would have nothing to do after only creating a table
+    control.stopAllServers(ServerType.TABLET_SERVER);
+
+    cluster.start();
+    c.tableOperations().create(names[1]);
+  }
+
+  @Test
+  public void killedTabletServerDuringShutdown() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    OPTS.setTableName(tableName);
+    ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      OPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      OPTS.setPrincipal(getAdminPrincipal());
+    }
+    TestIngest.ingest(c, OPTS, BWOPTS);
+    try {
+      getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getCluster().getClusterControl().adminStopAll();
+    } finally {
+      getCluster().start();
+    }
+  }
+
+  @Test
+  public void shutdownDuringCompactingSplitting() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    VOPTS.setTableName(tableName);
+    ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      OPTS.updateKerberosCredentials(clientConfig);
+      VOPTS.updateKerberosCredentials(clientConfig);
+    } else {
+      OPTS.setPrincipal(getAdminPrincipal());
+      VOPTS.setPrincipal(getAdminPrincipal());
+    }
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+    String splitThreshold = null;
+    for (Entry<String,String> entry : c.tableOperations().getProperties(tableName)) {
+      if (entry.getKey().equals(Property.TABLE_SPLIT_THRESHOLD.getKey())) {
+        splitThreshold = entry.getValue();
+        break;
+      }
+    }
+    Assert.assertNotNull(splitThreshold);
+    try {
+      c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), "20K");
+      TestIngest.Opts opts = new TestIngest.Opts();
+      opts.setTableName(tableName);
+      if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+        opts.updateKerberosCredentials(clientConfig);
+      } else {
+        opts.setPrincipal(getAdminPrincipal());
+      }
+      TestIngest.ingest(c, opts, BWOPTS);
+      c.tableOperations().flush(tableName, null, null, false);
+      VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+      getCluster().stop();
+    } finally {
+      if (getClusterType() == ClusterType.STANDALONE) {
+        getCluster().start();
+        c.tableOperations().setProperty(MetadataTable.NAME, Property.TABLE_SPLIT_THRESHOLD.getKey(), splitThreshold);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
new file mode 100644
index 0000000..abfd5d8
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+public class RestartStressIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(RestartStressIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> opts = cfg.getSiteConfig();
+    opts.put(Property.TSERV_MAXMEM.getKey(), "100K");
+    opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
+    opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M");
+    opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
+    opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
+    cfg.setSiteConfig(opts);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  private ExecutorService svc;
+
+  @Before
+  public void setup() throws Exception {
+    svc = Executors.newFixedThreadPool(1);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (null == svc) {
+      return;
+    }
+
+    if (!svc.isShutdown()) {
+      svc.shutdown();
+    }
+
+    while (!svc.awaitTermination(10, TimeUnit.SECONDS)) {
+      log.info("Waiting for threadpool to terminate");
+    }
+  }
+
+  private static final VerifyIngest.Opts VOPTS;
+  static {
+    VOPTS = new VerifyIngest.Opts();
+    VOPTS.rows = 10 * 1000;
+  }
+  private static final ScannerOpts SOPTS = new ScannerOpts();
+
+  @Test
+  public void test() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    final AuthenticationToken token = getAdminToken();
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K");
+    final ClusterControl control = getCluster().getClusterControl();
+    final String[] args;
+    if (token instanceof PasswordToken) {
+      byte[] password = ((PasswordToken) token).getPassword();
+      args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName};
+    } else if (token instanceof KerberosToken) {
+      ClusterUser rootUser = getAdminUser();
+      args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z",
+          cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName};
+    } else {
+      throw new RuntimeException("Unrecognized token");
+    }
+
+    Future<Integer> retCode = svc.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() {
+        try {
+          return control.exec(TestIngest.class, args);
+        } catch (Exception e) {
+          log.error("Error running TestIngest", e);
+          return -1;
+        }
+      }
+    });
+
+    for (int i = 0; i < 2; i++) {
+      UtilWaitThread.sleep(10 * 1000);
+      control.stopAllServers(ServerType.TABLET_SERVER);
+      control.startAllServers(ServerType.TABLET_SERVER);
+    }
+    assertEquals(0, retCode.get().intValue());
+    VOPTS.setTableName(tableName);
+
+    if (token instanceof PasswordToken) {
+      VOPTS.setPrincipal(getAdminPrincipal());
+    } else if (token instanceof KerberosToken) {
+      VOPTS.updateKerberosCredentials(cluster.getClientConfig());
+    } else {
+      throw new RuntimeException("Unrecognized token");
+    }
+
+    VerifyIngest.verifyIngest(c, VOPTS, SOPTS);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
new file mode 100644
index 0000000..75c66bd
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RowDeleteIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.test.functional.FunctionalTestUtils.checkRFiles;
+import static org.apache.accumulo.test.functional.FunctionalTestUtils.nm;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.user.RowDeletingIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class RowDeleteIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void run() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    Map<String,Set<Text>> groups = new HashMap<String,Set<Text>>();
+    groups.put("lg1", Collections.singleton(new Text("foo")));
+    groups.put("dg", Collections.<Text> emptySet());
+    c.tableOperations().setLocalityGroups(tableName, groups);
+    IteratorSetting setting = new IteratorSetting(30, RowDeletingIterator.class);
+    c.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.majc));
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+    bw.addMutation(nm("r1", "foo", "cf1", "v1"));
+    bw.addMutation(nm("r1", "bar", "cf1", "v2"));
+
+    bw.flush();
+    c.tableOperations().flush(tableName, null, null, true);
+
+    checkRFiles(c, tableName, 1, 1, 1, 1);
+
+    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    int count = Iterators.size(scanner.iterator());
+    assertEquals("count == " + count, 2, count);
+
+    bw.addMutation(nm("r1", "", "", RowDeletingIterator.DELETE_ROW_VALUE));
+
+    bw.flush();
+    c.tableOperations().flush(tableName, null, null, true);
+
+    checkRFiles(c, tableName, 1, 1, 2, 2);
+
+    scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    count = Iterators.size(scanner.iterator());
+    assertEquals("count == " + count, 3, count);
+
+    c.tableOperations().compact(tableName, null, null, false, true);
+
+    checkRFiles(c, tableName, 1, 1, 0, 0);
+
+    scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    count = Iterators.size(scanner.iterator());
+    assertEquals("count == " + count, 0, count);
+    bw.close();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
new file mode 100644
index 0000000..863ac78
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
+ * returns a unique scan id.
+ * <p>
+ * <p/>
+ * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions. The test exercises multiple
+ * tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers for completeness.
+ * <p/>
+ * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added:
+ * <p/>
+ * private static final long serialVersionUID = -4659975753252858243l;
+ * <p/>
+ * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
+ */
+public class ScanIdIT extends AccumuloClusterHarness {
+
+  private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
+
+  private static final int NUM_SCANNERS = 8;
+
+  private static final int NUM_DATA_ROWS = 100;
+
+  private static final Random random = new Random();
+
+  private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
+
+  private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
+
+  private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>();
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  /**
+   * @throws Exception
+   *           any exception is a test failure.
+   */
+  @Test
+  public void testScanId() throws Exception {
+
+    final String tableName = getUniqueNames(1)[0];
+    Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+
+    addSplits(conn, tableName);
+
+    log.info("Splits added");
+
+    generateSampleData(conn, tableName);
+
+    log.info("Generated data for {}", tableName);
+
+    attachSlowIterator(conn, tableName);
+
+    CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
+
+    for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+      ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch);
+      pool.submit(st);
+    }
+
+    // wait for scanners to report a result.
+    while (testInProgress.get()) {
+
+      if (resultsByWorker.size() < NUM_SCANNERS) {
+        log.trace("Results reported {}", resultsByWorker.size());
+        UtilWaitThread.sleep(750);
+      } else {
+        // each worker has reported at least one result.
+        testInProgress.set(false);
+
+        log.debug("Final result count {}", resultsByWorker.size());
+
+        // delay to allow scanners to react to end of test and cleanly close.
+        UtilWaitThread.sleep(1000);
+      }
+
+    }
+
+    // all scanner have reported at least 1 result, so check for unique scan ids.
+    Set<Long> scanIds = new HashSet<Long>();
+
+    List<String> tservers = conn.instanceOperations().getTabletServers();
+
+    log.debug("tablet servers {}", tservers.toString());
+
+    for (String tserver : tservers) {
+
+      List<ActiveScan> activeScans = null;
+      for (int i = 0; i < 10; i++) {
+        try {
+          activeScans = conn.instanceOperations().getActiveScans(tserver);
+          break;
+        } catch (AccumuloException e) {
+          if (e.getCause() instanceof TableNotFoundException) {
+            log.debug("Got TableNotFoundException, will retry");
+            Thread.sleep(200);
+            continue;
+          }
+          throw e;
+        }
+      }
+
+      assertNotNull("Repeatedly got exception trying to active scans", activeScans);
+
+      log.debug("TServer {} has {} active scans", tserver, activeScans.size());
+
+      for (ActiveScan scan : activeScans) {
+        log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
+        scanIds.add(scan.getScanid());
+      }
+    }
+
+    assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(), NUM_SCANNERS <= scanIds.size());
+
+  }
+
+  /**
+   * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
+   * <p/>
+   * The thread run method is terminated when the testInProgress flag is set to false.
+   */
+  private static class ScannerThread implements Runnable {
+
+    private final Connector connector;
+    private Scanner scanner = null;
+    private final int workerIndex;
+    private final String tablename;
+    private final CountDownLatch latch;
+
+    public ScannerThread(final Connector connector, final int workerIndex, final String tablename, final CountDownLatch latch) {
+      this.connector = connector;
+      this.workerIndex = workerIndex;
+      this.tablename = tablename;
+      this.latch = latch;
+    }
+
+    /**
+     * execute the scan across the sample data and put scan result into result map until testInProgress flag is set to false.
+     */
+    @Override
+    public void run() {
+
+      latch.countDown();
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        log.error("Thread interrupted with id {}", workerIndex);
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      log.debug("Creating scanner in worker thread {}", workerIndex);
+
+      try {
+
+        scanner = connector.createScanner(tablename, new Authorizations());
+
+        // Never start readahead
+        scanner.setReadaheadThreshold(Long.MAX_VALUE);
+        scanner.setBatchSize(1);
+
+        // create different ranges to try to hit more than one tablet.
+        scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
+
+      } catch (TableNotFoundException e) {
+        throw new IllegalStateException("Initialization failure. Could not create scanner", e);
+      }
+
+      scanner.fetchColumnFamily(new Text("fam1"));
+
+      for (Map.Entry<Key,Value> entry : scanner) {
+
+        // exit when success condition is met.
+        if (!testInProgress.get()) {
+          scanner.clearScanIterators();
+          scanner.close();
+
+          return;
+        }
+
+        Text row = entry.getKey().getRow();
+
+        log.debug("worker {}, row {}", workerIndex, row.toString());
+
+        if (entry.getValue() != null) {
+
+          Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
+
+          // value should always being increasing
+          if (prevValue != null) {
+
+            log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue()));
+
+            assertTrue(prevValue.compareTo(entry.getValue()) > 0);
+          }
+        } else {
+          log.info("Scanner returned null");
+          fail("Scanner returned unexpected null value");
+        }
+
+      }
+
+      log.debug("Scanner ran out of data. (info only, not an error) ");
+
+    }
+  }
+
+  /**
+   * Create splits on table and force migration by taking table offline and then bring back online for test.
+   *
+   * @param conn
+   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void addSplits(final Connector conn, final String tableName) {
+
+    SortedSet<Text> splits = createSplits();
+
+    try {
+
+      conn.tableOperations().addSplits(tableName, splits);
+
+      conn.tableOperations().offline(tableName, true);
+
+      UtilWaitThread.sleep(2000);
+      conn.tableOperations().online(tableName, true);
+
+      for (Text split : conn.tableOperations().listSplits(tableName)) {
+        log.trace("Split {}", split);
+      }
+
+    } catch (AccumuloSecurityException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    } catch (AccumuloException e) {
+      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
+    }
+
+  }
+
+  /**
+   * Create splits to distribute data across multiple tservers.
+   *
+   * @return splits in sorted set for addSplits.
+   */
+  private SortedSet<Text> createSplits() {
+
+    SortedSet<Text> splits = new TreeSet<Text>();
+
+    for (int split = 0; split < 10; split++) {
+      splits.add(new Text(Integer.toString(split)));
+    }
+
+    return splits;
+  }
+
+  /**
+   * Generate some sample data using random row id to distribute across splits.
+   * <p/>
+   * The primary goal is to determine that each scanner is assigned a unique scan id. This test does check that the count value for fam1 increases if a scanner
+   * reads multiple value, but this is secondary consideration for this test, that is included for completeness.
+   *
+   * @param connector
+   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void generateSampleData(Connector connector, final String tablename) {
+
+    try {
+
+      BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
+
+      ColumnVisibility vis = new ColumnVisibility("public");
+
+      for (int i = 0; i < NUM_DATA_ROWS; i++) {
+
+        Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
+
+        Mutation m = new Mutation(rowId);
+        m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
+        m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8)));
+        m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8)));
+
+        log.trace("Added row {}", rowId);
+
+        bw.addMutation(m);
+      }
+
+      bw.close();
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+    } catch (MutationsRejectedException ex) {
+      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
+    }
+  }
+
+  /**
+   * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a fairly large sleep and delay times because
+   * we are not concerned with how much data is read and we do not read all of the data - the test stops once each scanner reports a scan id.
+   *
+   * @param connector
+   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
+   */
+  private void attachSlowIterator(Connector connector, final String tablename) {
+    try {
+
+      IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
+      slowIter.addOption("sleepTime", "200");
+      slowIter.addOption("seekSleepTime", "200");
+
+      connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
+
+    } catch (AccumuloException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    } catch (AccumuloSecurityException ex) {
+      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
new file mode 100644
index 0000000..3453303
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class ScanIteratorIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 30;
+  }
+
+  @Test
+  public void run() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(tableName);
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (int i = 0; i < 1000; i++) {
+      Mutation m = new Mutation(new Text(String.format("%06d", i)));
+      m.put(new Text("cf1"), new Text("cq1"), new Value(Integer.toString(1000 - i).getBytes(UTF_8)));
+      m.put(new Text("cf1"), new Text("cq2"), new Value(Integer.toString(i - 1000).getBytes(UTF_8)));
+
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    Scanner scanner = c.createScanner(tableName, new Authorizations());
+
+    setupIter(scanner);
+    verify(scanner, 1, 999);
+
+    BatchScanner bscanner = c.createBatchScanner(tableName, new Authorizations(), 3);
+    bscanner.setRanges(Collections.singleton(new Range((Key) null, null)));
+
+    setupIter(bscanner);
+    verify(bscanner, 1, 999);
+
+    ArrayList<Range> ranges = new ArrayList<Range>();
+    ranges.add(new Range(new Text(String.format("%06d", 1))));
+    ranges.add(new Range(new Text(String.format("%06d", 6)), new Text(String.format("%06d", 16))));
+    ranges.add(new Range(new Text(String.format("%06d", 20))));
+    ranges.add(new Range(new Text(String.format("%06d", 23))));
+    ranges.add(new Range(new Text(String.format("%06d", 56)), new Text(String.format("%06d", 61))));
+    ranges.add(new Range(new Text(String.format("%06d", 501)), new Text(String.format("%06d", 504))));
+    ranges.add(new Range(new Text(String.format("%06d", 998)), new Text(String.format("%06d", 1000))));
+
+    HashSet<Integer> got = new HashSet<Integer>();
+    HashSet<Integer> expected = new HashSet<Integer>();
+    for (int i : new int[] {1, 7, 9, 11, 13, 15, 23, 57, 59, 61, 501, 503, 999}) {
+      expected.add(i);
+    }
+
+    bscanner.setRanges(ranges);
+
+    for (Entry<Key,Value> entry : bscanner) {
+      got.add(Integer.parseInt(entry.getKey().getRow().toString()));
+    }
+
+    System.out.println("got : " + got);
+
+    if (!got.equals(expected)) {
+      throw new Exception(got + " != " + expected);
+    }
+
+    bscanner.close();
+
+  }
+
+  private void verify(Iterable<Entry<Key,Value>> scanner, int start, int finish) throws Exception {
+
+    int expected = start;
+    for (Entry<Key,Value> entry : scanner) {
+      if (Integer.parseInt(entry.getKey().getRow().toString()) != expected) {
+        throw new Exception("Saw unexpexted " + entry.getKey().getRow() + " " + expected);
+      }
+
+      if (entry.getKey().getColumnQualifier().toString().equals("cq2")) {
+        expected += 2;
+      }
+    }
+
+    if (expected != finish + 2) {
+      throw new Exception("Ended at " + expected + " not " + (finish + 2));
+    }
+  }
+
+  private void setupIter(ScannerBase scanner) throws Exception {
+    IteratorSetting dropMod = new IteratorSetting(50, "dropMod", "org.apache.accumulo.test.functional.DropModIter");
+    dropMod.addOption("mod", "2");
+    dropMod.addOption("drop", "0");
+    scanner.addScanIterator(dropMod);
+  }
+
+}


Mime
View raw message