accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [19/24] accumulo git commit: Merge branch '1.7' into 1.8
Date Tue, 25 Jul 2017 23:03:07 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
index a83b0e2,0000000..8455a40
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@@ -1,382 -1,0 +1,382 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.DiskUsage;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.PartialKey;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.test.functional.BadIterator;
 +import org.apache.accumulo.test.functional.FunctionalTestUtils;
 +import org.apache.hadoop.io.Text;
 +import org.apache.thrift.TException;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Sets;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class TableOperationsIT extends AccumuloClusterHarness {
 +
 +  static TabletClientService.Client client;
 +
 +  private Connector connector;
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 30;
 +  }
 +
 +  @Before
 +  public void setup() throws Exception {
 +    connector = getConnector();
 +  }
 +
 +  @After
 +  public void checkForDanglingFateLocks() {
 +    FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
 +  }
 +
 +  @Test
 +  public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
 +    assertEquals(1, diskUsage.size());
 +    assertEquals(0, (long) diskUsage.get(0).getUsage());
 +    assertEquals(tableName, diskUsage.get(0).getTables().iterator().next());
 +
 +    connector.securityOperations().revokeTablePermission(getAdminPrincipal(), tableName, TablePermission.READ);
 +    try {
 +      connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
 +      fail("Should throw securityexception");
 +    } catch (AccumuloSecurityException e) {}
 +
 +    connector.tableOperations().delete(tableName);
 +    try {
 +      connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
 +      fail("Should throw tablenotfound");
 +    } catch (TableNotFoundException e) {}
 +  }
 +
 +  @Test
 +  public void getDiskUsage() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
 +    final String[] names = getUniqueNames(2);
 +    String tableName = names[0];
 +    connector.tableOperations().create(tableName);
 +
 +    // verify 0 disk usage
 +    List<DiskUsage> diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
 +    assertEquals(1, diskUsages.size());
 +    assertEquals(1, diskUsages.get(0).getTables().size());
 +    assertEquals(Long.valueOf(0), diskUsages.get(0).getUsage());
 +    assertEquals(tableName, diskUsages.get(0).getTables().first());
 +
 +    // add some data
 +    BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
 +    Mutation m = new Mutation("a");
 +    m.put("b", "c", new Value("abcde".getBytes()));
 +    bw.addMutation(m);
 +    bw.flush();
 +    bw.close();
 +
 +    connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
 +
 +    // verify we have usage
 +    diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
 +    assertEquals(1, diskUsages.size());
 +    assertEquals(1, diskUsages.get(0).getTables().size());
 +    assertTrue(diskUsages.get(0).getUsage() > 0);
 +    assertEquals(tableName, diskUsages.get(0).getTables().first());
 +
 +    String newTable = names[1];
 +
 +    // clone table
 +    connector.tableOperations().clone(tableName, newTable, false, null, null);
 +
 +    // verify tables are exactly the same
 +    Set<String> tables = new HashSet<>();
 +    tables.add(tableName);
 +    tables.add(newTable);
 +    diskUsages = connector.tableOperations().getDiskUsage(tables);
 +    assertEquals(1, diskUsages.size());
 +    assertEquals(2, diskUsages.get(0).getTables().size());
 +    assertTrue(diskUsages.get(0).getUsage() > 0);
 +
 +    connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
 +    connector.tableOperations().compact(newTable, new Text("A"), new Text("z"), true, true);
 +
 +    // verify tables have differences
 +    diskUsages = connector.tableOperations().getDiskUsage(tables);
 +    assertEquals(2, diskUsages.size());
 +    assertEquals(1, diskUsages.get(0).getTables().size());
 +    assertEquals(1, diskUsages.get(1).getTables().size());
 +    assertTrue(diskUsages.get(0).getUsage() > 0);
 +    assertTrue(diskUsages.get(1).getUsage() > 0);
 +
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  @Test
 +  public void createTable() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    Iterable<Map.Entry<String,String>> itrProps = connector.tableOperations().getProperties(tableName);
 +    Map<String,String> props = propsToMap(itrProps);
 +    assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1"));
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  @Test
 +  public void createMergeClonedTable() throws Exception {
 +    String[] names = getUniqueNames(2);
 +    String originalTable = names[0];
 +    TableOperations tops = connector.tableOperations();
 +
 +    TreeSet<Text> splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d")));
 +
 +    tops.create(originalTable);
 +    tops.addSplits(originalTable, splits);
 +
 +    BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig());
 +    for (Text row : splits) {
 +      Mutation m = new Mutation(row);
 +      for (int i = 0; i < 10; i++) {
 +        for (int j = 0; j < 10; j++) {
 +          m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j));
 +        }
 +      }
 +
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    String clonedTable = names[1];
 +
 +    tops.clone(originalTable, clonedTable, true, null, null);
 +    tops.merge(clonedTable, null, new Text("b"));
 +
 +    Map<String,Integer> rowCounts = new HashMap<>();
 +    Scanner s = connector.createScanner(clonedTable, new Authorizations());
 +    for (Entry<Key,Value> entry : s) {
 +      final Key key = entry.getKey();
 +      String row = key.getRow().toString();
 +      String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString();
 +      String value = entry.getValue().toString();
 +
 +      if (rowCounts.containsKey(row)) {
 +        rowCounts.put(row, rowCounts.get(row) + 1);
 +      } else {
 +        rowCounts.put(row, 1);
 +      }
 +
 +      Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value));
 +    }
 +
 +    Collection<Text> clonedSplits = tops.listSplits(clonedTable);
 +    Set<Text> expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d"));
 +    for (Text clonedSplit : clonedSplits) {
 +      Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit));
 +    }
 +
 +    Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty());
 +  }
 +
 +  private Map<String,String> propsToMap(Iterable<Map.Entry<String,String>> props) {
 +    Map<String,String> map = new HashMap<>();
 +    for (Map.Entry<String,String> prop : props) {
 +      map.put(prop.getKey(), prop.getValue());
 +    }
 +    return map;
 +  }
 +
 +  @Test
 +  public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +
 +    List<IteratorSetting> list = new ArrayList<>();
 +    list.add(new IteratorSetting(15, HardListIterator.class));
 +    connector.tableOperations().compact(tableName, null, null, list, true, true);
 +
 +    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
 +    for (Map.Entry<Key,Value> entry : scanner)
 +      actual.put(entry.getKey(), entry.getValue());
 +    assertEquals(HardListIterator.allEntriesToInject, actual);
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  /** Compare only the row, column family and column qualifier. */
 +  static class KeyRowColFColQComparator implements Comparator<Key> {
 +    @Override
 +    public int compare(Key k1, Key k2) {
 +      return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL);
 +    }
 +  }
 +
 +  static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator();
 +
 +  @Test
 +  public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException,
 +      TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    SortedSet<Text> splitset = new TreeSet<>();
 +    splitset.add(new Text("f"));
 +    connector.tableOperations().addSplits(tableName, splitset);
 +
 +    List<IteratorSetting> list = new ArrayList<>();
 +    list.add(new IteratorSetting(15, HardListIterator.class));
 +    connector.tableOperations().compact(tableName, null, null, list, true, true);
 +
 +    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
 +    for (Map.Entry<Key,Value> entry : scanner)
 +      actual.put(entry.getKey(), entry.getValue());
 +    assertEquals(HardListIterator.allEntriesToInject, actual);
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  @Test
 +  public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
 +      TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    SortedSet<Text> splitset = new TreeSet<>();
 +    splitset.add(new Text("f"));
 +    connector.tableOperations().addSplits(tableName, splitset);
 +
 +    List<IteratorSetting> list = new ArrayList<>();
 +    list.add(new IteratorSetting(15, HardListIterator.class));
 +    connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
 +    connector.tableOperations().cancelCompaction(tableName);
 +    // depending on timing, compaction will finish or be canceled
 +
 +    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
 +    for (Map.Entry<Key,Value> entry : scanner)
 +      actual.put(entry.getKey(), entry.getValue());
 +    switch (actual.size()) {
 +      case 3:
 +        // Compaction cancel didn't happen in time
 +        assertTrue(HardListIterator.allEntriesToInject.equals(actual));
 +        break;
 +      case 2:
 +        // Compacted the first tablet (-inf, f)
 +        assertEquals(HardListIterator.allEntriesToInject.headMap(new Key("f")), actual);
 +        break;
 +      case 1:
 +        // Compacted the second tablet [f, +inf)
 +        assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key("f")), actual);
 +        break;
 +      case 0:
 +        // Cancelled the compaction before it ran. No generated entries.
 +        break;
 +      default:
 +        Assert.fail("Unexpected number of entries");
 +        break;
 +    }
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  @Test
 +  public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException,
 +      TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +    Text splitRow = new Text("f");
 +    SortedSet<Text> splitset = new TreeSet<>();
 +    splitset.add(splitRow);
 +    connector.tableOperations().addSplits(tableName, splitset);
 +
 +    List<IteratorSetting> list = new ArrayList<>();
 +    list.add(new IteratorSetting(15, HardListIterator.class));
 +    // compact the second tablet, not the first
 +    connector.tableOperations().compact(tableName, splitRow, null, list, true, true);
 +
 +    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
 +    for (Map.Entry<Key,Value> entry : scanner)
 +      actual.put(entry.getKey(), entry.getValue());
 +    // only expect the entries in the second tablet
 +    assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual);
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +  /** Test recovery from bad majc iterator via compaction cancel. */
 +  @Test
 +  public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
 +      TableNotFoundException {
 +    String tableName = getUniqueNames(1)[0];
 +    connector.tableOperations().create(tableName);
 +
 +    List<IteratorSetting> list = new ArrayList<>();
 +    list.add(new IteratorSetting(15, BadIterator.class));
 +    connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
 +    sleepUninterruptibly(2, TimeUnit.SECONDS); // start compaction
 +    connector.tableOperations().cancelCompaction(tableName);
 +
 +    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +    Map<Key,Value> actual = new TreeMap<>();
 +    for (Map.Entry<Key,Value> entry : scanner)
 +      actual.put(entry.getKey(), entry.getValue());
 +    assertTrue("Should be empty. Actual is " + actual, actual.isEmpty());
 +    connector.tableOperations().delete(tableName);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index 33c1798,0000000..bad1a55
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@@ -1,77 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +// ACCUMULO-2480
 +public class TabletServerGivesUpIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.useMiniDFS(true);
 +    cfg.setNumTservers(1);
 +    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15");
 +    cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s");
 +  }
 +
 +  @Test(timeout = 30 * 1000)
 +  public void test() throws Exception {
 +    final Connector conn = this.getConnector();
 +    // Yes, there's a tabletserver
 +    assertEquals(1, conn.instanceOperations().getTabletServers().size());
 +    final String tableName = getUniqueNames(1)[0];
 +    conn.tableOperations().create(tableName);
 +    // Kill dfs
 +    cluster.getMiniDfs().shutdown();
 +    // ask the tserver to do something
 +    final AtomicReference<Exception> ex = new AtomicReference<>();
 +    Thread splitter = new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          TreeSet<Text> splits = new TreeSet<>();
 +          splits.add(new Text("X"));
 +          conn.tableOperations().addSplits(tableName, splits);
 +        } catch (Exception e) {
 +          ex.set(e);
 +        }
 +      }
 +    };
 +    splitter.start();
 +    // wait for the tserver to give up on writing to the WAL
 +    while (conn.instanceOperations().getTabletServers().size() == 1) {
 +      sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
index be800ad,0000000..ea3f680
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
@@@ -1,131 -1,0 +1,131 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.Random;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.minicluster.MemoryUnit;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.hadoop.conf.Configuration;
 +import org.junit.Test;
 +
 +import com.google.common.net.HostAndPort;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +// see ACCUMULO-1950
 +public class TotalQueuedIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
 +    cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
 +    cfg.useMiniDFS();
 +  }
 +
 +  int SMALL_QUEUE_SIZE = 100000;
 +  int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10;
 +  static final long N = 1000000;
 +
 +  @Test(timeout = 4 * 60 * 1000)
 +  public void test() throws Exception {
 +    Random random = new Random();
 +    Connector c = getConnector();
 +    c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE);
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999");
 +    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999");
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    // get an idea of how fast the syncs occur
 +    byte row[] = new byte[250];
 +    BatchWriterConfig cfg = new BatchWriterConfig();
 +    cfg.setMaxWriteThreads(10);
 +    cfg.setMaxLatency(1, TimeUnit.SECONDS);
 +    cfg.setMaxMemory(1024 * 1024);
 +    long realSyncs = getSyncs();
 +    BatchWriter bw = c.createBatchWriter(tableName, cfg);
 +    long now = System.currentTimeMillis();
 +    long bytesSent = 0;
 +    for (int i = 0; i < N; i++) {
 +      random.nextBytes(row);
 +      Mutation m = new Mutation(row);
 +      m.put("", "", "");
 +      bw.addMutation(m);
 +      bytesSent += m.estimatedMemoryUsed();
 +    }
 +    bw.close();
 +    long diff = System.currentTimeMillis() - now;
 +    double secs = diff / 1000.;
 +    double syncs = bytesSent / SMALL_QUEUE_SIZE;
 +    double syncsPerSec = syncs / secs;
 +    System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
 +    long update = getSyncs();
 +    System.out.println("Syncs " + (update - realSyncs));
 +    realSyncs = update;
 +
 +    // Now with a much bigger total queue
 +    c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE);
 +    c.tableOperations().flush(tableName, null, null, true);
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    bw = c.createBatchWriter(tableName, cfg);
 +    now = System.currentTimeMillis();
 +    bytesSent = 0;
 +    for (int i = 0; i < N; i++) {
 +      random.nextBytes(row);
 +      Mutation m = new Mutation(row);
 +      m.put("", "", "");
 +      bw.addMutation(m);
 +      bytesSent += m.estimatedMemoryUsed();
 +    }
 +    bw.close();
 +    diff = System.currentTimeMillis() - now;
 +    secs = diff / 1000.;
 +    syncs = bytesSent / LARGE_QUEUE_SIZE;
 +    syncsPerSec = syncs / secs;
 +    System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
 +    update = getSyncs();
 +    System.out.println("Syncs " + (update - realSyncs));
 +    assertTrue(update - realSyncs < realSyncs);
 +  }
 +
 +  private long getSyncs() throws Exception {
 +    Connector c = getConnector();
 +    ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance());
 +    AccumuloServerContext context = new AccumuloServerContext(confFactory);
 +    for (String address : c.instanceOperations().getTabletServers()) {
 +      TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context);
 +      TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds());
 +      return status.syncs;
 +    }
 +    return 0;
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
index 15609f6,0000000..2c4d970
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
@@@ -1,130 -1,0 +1,129 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.trace.DistributedTrace;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.ConfigurableMacBase;
 +import org.apache.accumulo.tracer.TraceDump;
 +import org.apache.accumulo.tracer.TraceDump.Printer;
 +import org.apache.accumulo.tracer.TraceServer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +/**
 + *
 + */
 +public class TracerRecoversAfterOfflineTableIT extends ConfigurableMacBase {
 +
 +  @Override
 +  public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
 +    cfg.setNumTservers(1);
 +  }
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Process tracer = null;
 +    Connector conn = getConnector();
 +    if (!conn.tableOperations().exists("trace")) {
 +      MiniAccumuloClusterImpl mac = cluster;
 +      tracer = mac.exec(TraceServer.class);
 +      while (!conn.tableOperations().exists("trace")) {
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      }
 +      sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    }
 +
 +    log.info("Taking table offline");
 +    conn.tableOperations().offline("trace", true);
 +
 +    String tableName = getUniqueNames(1)[0];
 +    conn.tableOperations().create(tableName);
 +
 +    log.info("Start a distributed trace span");
 +
 +    DistributedTrace.enable("localhost", "testTrace", getClientConfig());
 +    Span root = Trace.on("traceTest");
 +    BatchWriter bw = conn.createBatchWriter(tableName, null);
 +    Mutation m = new Mutation("m");
 +    m.put("a", "b", "c");
 +    bw.addMutation(m);
 +    bw.close();
 +    root.stop();
 +
 +    log.info("Bringing trace table back online");
 +    conn.tableOperations().online("trace", true);
 +
 +    log.info("Trace table is online, should be able to find trace");
 +
 +    final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY);
 +    scanner.setRange(new Range(new Text(Long.toHexString(root.traceId()))));
 +    while (true) {
 +      final StringBuilder finalBuffer = new StringBuilder();
 +      int traceCount = TraceDump.printTrace(scanner, new Printer() {
 +        @Override
 +        public void print(final String line) {
 +          try {
 +            finalBuffer.append(line).append("\n");
 +          } catch (Exception ex) {
 +            throw new RuntimeException(ex);
 +          }
 +        }
 +      });
 +      String traceOutput = finalBuffer.toString();
 +      log.info("Trace output:" + traceOutput);
 +      if (traceCount > 0) {
 +        int lastPos = 0;
 +        for (String part : "traceTest,close,binMutations".split(",")) {
 +          log.info("Looking in trace output for '" + part + "'");
 +          int pos = traceOutput.indexOf(part);
 +          assertTrue("Did not find '" + part + "' in output", pos > 0);
 +          assertTrue("'" + part + "' occurred earlier than the previous element unexpectedly", pos > lastPos);
 +          lastPos = pos;
 +        }
 +        break;
 +      } else {
 +        log.info("Ignoring trace output as traceCount not greater than zero: " + traceCount);
 +        Thread.sleep(1000);
 +      }
 +    }
 +    if (tracer != null) {
 +      tracer.destroy();
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
index 4d86dd3,0000000..07a7c40
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/YieldScannersIT.java
@@@ -1,161 -1,0 +1,161 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import java.util.Collections;
++import java.util.Iterator;
++import java.util.Map;
++
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.functional.YieldingIterator;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Test;
- 
- import java.util.Collections;
- import java.util.Iterator;
- import java.util.Map;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +// ACCUMULO-4643
 +public class YieldScannersIT extends AccumuloClusterHarness {
 +  Logger log = LoggerFactory.getLogger(YieldScannersIT.class);
 +  private static final char START_ROW = 'a';
 +
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setNumTservers(1);
 +  }
 +
 +  @Test
 +  public void testScan() throws Exception {
 +    // make a table
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +    final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig());
 +    for (int i = 0; i < 10; i++) {
 +      byte[] row = new byte[] {(byte) (START_ROW + i)};
 +      Mutation m = new Mutation(new Text(row));
 +      m.put(new Text(), new Text(), new Value());
 +      writer.addMutation(m);
 +    }
 +    writer.flush();
 +    writer.close();
 +
 +    log.info("Creating scanner");
 +    // make a scanner for a table with 10 keys
 +    final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
 +    final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class);
 +    scanner.addScanIterator(cfg);
 +
 +    log.info("iterating");
 +    Iterator<Map.Entry<Key,Value>> it = scanner.iterator();
 +    int keyCount = 0;
 +    int yieldNextCount = 0;
 +    int yieldSeekCount = 0;
 +    while (it.hasNext()) {
 +      Map.Entry<Key,Value> next = it.next();
 +      log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue());
 +
 +      // verify we got the expected key
 +      char expected = (char) (START_ROW + keyCount);
 +      Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString());
 +
 +      // determine whether we yielded on a next and seek
 +      if ((keyCount & 1) != 0) {
 +        yieldNextCount++;
 +        yieldSeekCount++;
 +      }
 +      String[] value = StringUtils.split(next.getValue().toString(), ',');
 +      Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]);
 +      Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]);
 +      Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]);
 +
 +      keyCount++;
 +    }
 +    Assert.assertEquals("Did not get the expected number of results", 10, keyCount);
 +  }
 +
 +  @Test
 +  public void testBatchScan() throws Exception {
 +    // make a table
 +    final String tableName = getUniqueNames(1)[0];
 +    final Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +    final BatchWriter writer = conn.createBatchWriter(tableName, new BatchWriterConfig());
 +    for (int i = 0; i < 10; i++) {
 +      byte[] row = new byte[] {(byte) (START_ROW + i)};
 +      Mutation m = new Mutation(new Text(row));
 +      m.put(new Text(), new Text(), new Value());
 +      writer.addMutation(m);
 +    }
 +    writer.flush();
 +    writer.close();
 +
 +    log.info("Creating batch scanner");
 +    // make a scanner for a table with 10 keys
 +    final BatchScanner scanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 1);
 +    final IteratorSetting cfg = new IteratorSetting(100, YieldingIterator.class);
 +    scanner.addScanIterator(cfg);
 +    scanner.setRanges(Collections.singleton(new Range()));
 +
 +    log.info("iterating");
 +    Iterator<Map.Entry<Key,Value>> it = scanner.iterator();
 +    int keyCount = 0;
 +    int yieldNextCount = 0;
 +    int yieldSeekCount = 0;
 +    while (it.hasNext()) {
 +      Map.Entry<Key,Value> next = it.next();
 +      log.info(Integer.toString(keyCount) + ": Got key " + next.getKey() + " with value " + next.getValue());
 +
 +      // verify we got the expected key
 +      char expected = (char) (START_ROW + keyCount);
 +      Assert.assertEquals("Unexpected row", Character.toString(expected), next.getKey().getRow().toString());
 +
 +      // determine whether we yielded on a next and seek
 +      if ((keyCount & 1) != 0) {
 +        yieldNextCount++;
 +        yieldSeekCount++;
 +      }
 +      String[] value = StringUtils.split(next.getValue().toString(), ',');
 +      Assert.assertEquals("Unexpected yield next count", Integer.toString(yieldNextCount), value[0]);
 +      Assert.assertEquals("Unexpected yield seek count", Integer.toString(yieldSeekCount), value[1]);
 +      Assert.assertEquals("Unexpected rebuild count", Integer.toString(yieldNextCount + yieldSeekCount), value[2]);
 +
 +      keyCount++;
 +    }
 +    Assert.assertEquals("Did not get the expected number of results", 10, keyCount);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
index e08be10,50595d7..73fe806
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousBatchWalker.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.test.continuous;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.util.ArrayList;
  import java.util.HashSet;
  import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
index 63709df,a77de3d..fa53a64
--- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
+++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousScanner.java
@@@ -16,6 -16,6 +16,7 @@@
   */
  package org.apache.accumulo.test.continuous;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
  
  import java.util.Iterator;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
index 0558c7f,0000000..7f9ac6d
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
@@@ -1,144 -1,0 +1,143 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.TreeSet;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class AddSplitIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  @Test
 +  public void addSplitTest() throws Exception {
 +
 +    String tableName = getUniqueNames(1)[0];
 +    Connector c = getConnector();
 +    c.tableOperations().create(tableName);
 +
 +    insertData(tableName, 1l);
 +
 +    TreeSet<Text> splits = new TreeSet<>();
 +    splits.add(new Text(String.format("%09d", 333)));
 +    splits.add(new Text(String.format("%09d", 666)));
 +
 +    c.tableOperations().addSplits(tableName, splits);
 +
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +
 +    Collection<Text> actualSplits = c.tableOperations().listSplits(tableName);
 +
 +    if (!splits.equals(new TreeSet<>(actualSplits))) {
 +      throw new Exception(splits + " != " + actualSplits);
 +    }
 +
 +    verifyData(tableName, 1l);
 +    insertData(tableName, 2l);
 +
 +    // did not clear splits on purpose, it should ignore existing split points
 +    // and still create the three additional split points
 +
 +    splits.add(new Text(String.format("%09d", 200)));
 +    splits.add(new Text(String.format("%09d", 500)));
 +    splits.add(new Text(String.format("%09d", 800)));
 +
 +    c.tableOperations().addSplits(tableName, splits);
 +
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +
 +    actualSplits = c.tableOperations().listSplits(tableName);
 +
 +    if (!splits.equals(new TreeSet<>(actualSplits))) {
 +      throw new Exception(splits + " != " + actualSplits);
 +    }
 +
 +    verifyData(tableName, 2l);
 +  }
 +
 +  private void verifyData(String tableName, long ts) throws Exception {
 +    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
 +
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +    for (int i = 0; i < 10000; i++) {
 +      if (!iter.hasNext()) {
 +        throw new Exception("row " + i + " not found");
 +      }
 +
 +      Entry<Key,Value> entry = iter.next();
 +
 +      String row = String.format("%09d", i);
 +
 +      if (!entry.getKey().getRow().equals(new Text(row))) {
 +        throw new Exception("unexpected row " + entry.getKey() + " " + i);
 +      }
 +
 +      if (entry.getKey().getTimestamp() != ts) {
 +        throw new Exception("unexpected ts " + entry.getKey() + " " + ts);
 +      }
 +
 +      if (Integer.parseInt(entry.getValue().toString()) != i) {
 +        throw new Exception("unexpected value " + entry + " " + i);
 +      }
 +    }
 +
 +    if (iter.hasNext()) {
 +      throw new Exception("found more than expected " + iter.next());
 +    }
 +
 +  }
 +
 +  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException {
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
 +
 +    for (int i = 0; i < 10000; i++) {
 +      String row = String.format("%09d", i);
 +
 +      Mutation m = new Mutation(new Text(row));
 +      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(UTF_8)));
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
index c730f9b,0000000..64fc905
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
@@@ -1,108 -1,0 +1,108 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.EnumSet;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class BadIteratorMincIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    IteratorSetting is = new IteratorSetting(30, BadIterator.class);
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    Mutation m = new Mutation(new Text("r1"));
 +    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
 +
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    c.tableOperations().flush(tableName, null, null, false);
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +    // minc should fail, so there should be no files
 +    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
 +
 +    // try to scan table
 +    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
 +    int count = Iterators.size(scanner.iterator());
 +    assertEquals("Did not see expected # entries " + count, 1, count);
 +
 +    // remove the bad iterator
 +    c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc));
 +
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +
 +    // minc should complete
 +    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
 +
 +    count = Iterators.size(scanner.iterator());
 +
 +    if (count != 1)
 +      throw new Exception("Did not see expected # entries " + count);
 +
 +    // now try putting bad iterator back and deleting the table
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
 +    bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    m = new Mutation(new Text("r2"));
 +    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    // make sure property is given time to propagate
 +    sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +
 +    c.tableOperations().flush(tableName, null, null, false);
 +
 +    // make sure the flush has time to start
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +    // this should not hang
 +    c.tableOperations().delete(tableName);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
index 48ce3fe,0000000..528f486
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
@@@ -1,131 -1,0 +1,130 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class BatchScanSplitIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(BatchScanSplitIT.class);
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +
 +    int numRows = 1 << 18;
 +
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    for (int i = 0; i < numRows; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%09x", i)));
 +      m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(UTF_8)));
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    getConnector().tableOperations().flush(tableName, null, null, true);
 +
 +    getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K");
 +
 +    Collection<Text> splits = getConnector().tableOperations().listSplits(tableName);
 +    while (splits.size() < 2) {
 +      sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
 +      splits = getConnector().tableOperations().listSplits(tableName);
 +    }
 +
 +    System.out.println("splits : " + splits);
 +
 +    Random random = new Random(19011230);
 +    HashMap<Text,Value> expected = new HashMap<>();
 +    ArrayList<Range> ranges = new ArrayList<>();
 +    for (int i = 0; i < 100; i++) {
 +      int r = random.nextInt(numRows);
 +      Text row = new Text(String.format("%09x", r));
 +      expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(UTF_8)));
 +      ranges.add(new Range(row));
 +    }
 +
 +    // logger.setLevel(Level.TRACE);
 +
 +    HashMap<Text,Value> found = new HashMap<>();
 +
 +    for (int i = 0; i < 20; i++) {
 +      BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4);
 +
 +      found.clear();
 +
 +      long t1 = System.currentTimeMillis();
 +
 +      bs.setRanges(ranges);
 +
 +      for (Entry<Key,Value> entry : bs) {
 +        found.put(entry.getKey().getRow(), entry.getValue());
 +      }
 +      bs.close();
 +
 +      long t2 = System.currentTimeMillis();
 +
 +      log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0)));
 +
 +      if (!found.equals(expected))
 +        throw new Exception("Found and expected differ " + found + " " + expected);
 +    }
 +
 +    splits = getConnector().tableOperations().listSplits(tableName);
 +    log.info("splits : " + splits);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index f243562,0000000..21539b2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@@ -1,144 -1,0 +1,143 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.cli.ClientOpts.Password;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.ServerType;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +/**
 + * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets.
 + */
 +
 +public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s");
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  private String majcDelay;
 +
 +  @Before
 +  public void alterConfig() throws Exception {
 +    Connector conn = getConnector();
 +    majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey());
 +    if (!"1s".equals(majcDelay)) {
 +      conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s");
 +      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 +    }
 +  }
 +
 +  @After
 +  public void resetConfig() throws Exception {
 +    if (null != majcDelay) {
 +      Connector conn = getConnector();
 +      conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
 +      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 +      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
 +    }
 +  }
 +
 +  static final int ROWS = 100000;
 +  static final int SPLITS = 99;
 +
 +  @Test
 +  public void testBulkSplitOptimization() throws Exception {
 +    final Connector c = getConnector();
 +    final String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
 +    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
 +    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
 +    FileSystem fs = cluster.getFileSystem();
 +    Path testDir = new Path(getUsableDir(), "testmf");
 +    FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
 +    FileStatus[] stats = fs.listStatus(testDir);
 +
 +    System.out.println("Number of generated files: " + stats.length);
 +    FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString());
 +    FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
 +    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
 +
 +    // initiate splits
 +    getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
 +
 +    sleepUninterruptibly(2, TimeUnit.SECONDS);
 +
 +    // wait until over split threshold -- should be 78 splits
 +    while (getConnector().tableOperations().listSplits(tableName).size() < 75) {
 +      sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +    }
 +
 +    FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
 +    VerifyIngest.Opts opts = new VerifyIngest.Opts();
 +    opts.timestamp = 1;
 +    opts.dataSize = 50;
 +    opts.random = 56;
 +    opts.rows = 100000;
 +    opts.startRow = 0;
 +    opts.cols = 1;
 +    opts.setTableName(tableName);
 +
 +    AuthenticationToken adminToken = getAdminToken();
 +    if (adminToken instanceof PasswordToken) {
 +      PasswordToken token = (PasswordToken) getAdminToken();
 +      opts.setPassword(new Password(new String(token.getPassword(), UTF_8)));
 +      opts.setPrincipal(getAdminPrincipal());
 +    } else if (adminToken instanceof KerberosToken) {
 +      ClientConfiguration clientConf = cluster.getClientConfig();
 +      opts.updateKerberosCredentials(clientConf);
 +    } else {
 +      Assert.fail("Unknown token type");
 +    }
 +
 +    VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
 +
 +    // ensure each tablet does not have all map files, should be ~2.5 files per tablet
 +    FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
index 0988795,5abae1e..0703694
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java
@@@ -16,6 -16,6 +16,7 @@@
   */
  package org.apache.accumulo.test.functional;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
  
  import java.io.File;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
index 50a0b0e,84e55a5..9f1dc67
--- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestWriter.java
@@@ -16,6 -16,6 +16,7 @@@
   */
  package org.apache.accumulo.test.functional;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
  import static java.nio.charset.StandardCharsets.UTF_8;
  
  import java.io.File;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
index c553c14,0000000..9c2b71f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
@@@ -1,124 -1,0 +1,123 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.Iterator;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.Combiner;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
- import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
++import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.hamcrest.CoreMatchers;
 +import org.junit.Assume;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +@Category(MiniClusterOnlyTests.class)
 +public class ClassLoaderIT extends AccumuloClusterHarness {
 +
 +  private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000;
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  private String rootPath;
 +
 +  @Before
 +  public void checkCluster() {
 +    Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
 +    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
 +    rootPath = mac.getConfig().getDir().getAbsolutePath();
 +  }
 +
 +  private static void copyStreamToFileSystem(FileSystem fs, String jarName, Path path) throws IOException {
 +    byte[] buffer = new byte[10 * 1024];
 +    try (FSDataOutputStream dest = fs.create(path); InputStream stream = ClassLoaderIT.class.getResourceAsStream(jarName)) {
 +      while (true) {
 +        int n = stream.read(buffer, 0, buffer.length);
 +        if (n <= 0) {
 +          break;
 +        }
 +        dest.write(buffer, 0, n);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    Mutation m = new Mutation("row1");
 +    m.put("cf", "col1", "Test");
 +    bw.addMutation(m);
 +    bw.close();
 +    scanCheck(c, tableName, "Test");
 +    FileSystem fs = getCluster().getFileSystem();
 +    Path jarPath = new Path(rootPath + "/lib/ext/Test.jar");
 +    copyStreamToFileSystem(fs, "/TestCombinerX.jar", jarPath);
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    IteratorSetting is = new IteratorSetting(10, "TestCombiner", "org.apache.accumulo.test.functional.TestCombiner");
 +    Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("cf")));
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.scan));
 +    sleepUninterruptibly(ZOOKEEPER_PROPAGATION_TIME, TimeUnit.MILLISECONDS);
 +    scanCheck(c, tableName, "TestX");
 +    fs.delete(jarPath, true);
 +    copyStreamToFileSystem(fs, "/TestCombinerY.jar", jarPath);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    scanCheck(c, tableName, "TestY");
 +    fs.delete(jarPath, true);
 +  }
 +
 +  private void scanCheck(Connector c, String tableName, String expected) throws Exception {
 +    Scanner bs = c.createScanner(tableName, Authorizations.EMPTY);
 +    Iterator<Entry<Key,Value>> iterator = bs.iterator();
 +    assertTrue(iterator.hasNext());
 +    Entry<Key,Value> next = iterator.next();
 +    assertFalse(iterator.hasNext());
 +    assertEquals(expected, next.getValue().toString());
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
index d462b53,0000000..929bb61
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrencyIT.java
@@@ -1,159 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.EnumSet;
 +import java.util.Map;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class ConcurrencyIT extends AccumuloClusterHarness {
 +
 +  static class ScanTask extends Thread {
 +
 +    int count = 0;
 +    Scanner scanner;
 +
 +    ScanTask(Connector conn, String tableName, long time) throws Exception {
 +      scanner = conn.createScanner(tableName, Authorizations.EMPTY);
 +      IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
 +      SlowIterator.setSleepTime(slow, time);
 +      scanner.addScanIterator(slow);
 +    }
 +
 +    @Override
 +    public void run() {
 +      count = Iterators.size(scanner.iterator());
 +    }
 +
 +  }
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
 +    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
 +    cfg.setSiteConfig(siteConfig);
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  // @formatter:off
 +  // Below is a diagram of the operations in this test over time.
 +  //
 +  // Scan 0 |------------------------------|
 +  // Scan 1 |----------|
 +  // Minc 1  |-----|
 +  // Scan 2   |----------|
 +  // Scan 3               |---------------|
 +  // Minc 2                |-----|
 +  // Majc 1                       |-----|
 +  // @formatter:on
 +  @Test
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    runTest(c, getUniqueNames(1)[0]);
 +  }
 +
 +  static void runTest(Connector c, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
 +      MutationsRejectedException, Exception, InterruptedException {
 +    c.tableOperations().create(tableName);
 +    IteratorSetting is = new IteratorSetting(10, SlowIterator.class);
 +    SlowIterator.setSleepTime(is, 50);
 +    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc, IteratorScope.majc));
 +    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
 +
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +    for (int i = 0; i < 50; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%06d", i)));
 +      m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8)));
 +      bw.addMutation(m);
 +    }
 +    bw.flush();
 +
 +    ScanTask st0 = new ScanTask(c, tableName, 300);
 +    st0.start();
 +
 +    ScanTask st1 = new ScanTask(c, tableName, 100);
 +    st1.start();
 +
 +    sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
 +    c.tableOperations().flush(tableName, null, null, true);
 +
 +    for (int i = 0; i < 50; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%06d", i)));
 +      m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8)));
 +      bw.addMutation(m);
 +    }
 +
 +    bw.flush();
 +
 +    ScanTask st2 = new ScanTask(c, tableName, 100);
 +    st2.start();
 +
 +    st1.join();
 +    st2.join();
 +    if (st1.count != 50)
 +      throw new Exception("Thread 1 did not see 50, saw " + st1.count);
 +
 +    if (st2.count != 50)
 +      throw new Exception("Thread 2 did not see 50, saw " + st2.count);
 +
 +    ScanTask st3 = new ScanTask(c, tableName, 150);
 +    st3.start();
 +
 +    sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
 +    c.tableOperations().flush(tableName, null, null, false);
 +
 +    st3.join();
 +    if (st3.count != 50)
 +      throw new Exception("Thread 3 did not see 50, saw " + st3.count);
 +
 +    st0.join();
 +    if (st0.count != 50)
 +      throw new Exception("Thread 0 did not see 50, saw " + st0.count);
 +
 +    bw.close();
 +  }
 +}


Mime
View raw message