accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [28/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar, stop building test jar
Date Thu, 04 Jun 2015 18:53:09 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
new file mode 100644
index 0000000..0efb1aa
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.MasterState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletStateChangeIterator;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+
+/**
+ * Test to ensure that the {@link TabletStateChangeIterator} properly skips over tablet information in the metadata table when there is no work to be done on
+ * the tablet (see ACCUMULO-3580)
+ */
+public class TabletStateChangeIteratorIT extends SharedMiniClusterBase {
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+    String[] tables = getUniqueNames(4);
+    final String t1 = tables[0];
+    final String t2 = tables[1];
+    final String t3 = tables[2];
+    final String cloned = tables[3];
+
+    // create some metadata
+    createTable(t1, true);
+    createTable(t2, false);
+    createTable(t3, true);
+
+    // examine a clone of the metadata table, so we can manipulate it
+    cloneMetadataTable(cloned);
+
+    assertEquals("No tables should need attention", 0, findTabletsNeedingAttention(cloned));
+
+    // test the assigned case (no location)
+    removeLocation(cloned, t3);
+    assertEquals("Should have one tablet without a loc", 1, findTabletsNeedingAttention(cloned));
+
+    // TODO test the cases where the assignment is to a dead tserver
+    // TODO test the cases where there is ongoing merges
+    // TODO test the bad tablet location state case (active split, inconsistent metadata)
+
+    // clean up
+    dropTables(t1, t2, t3);
+  }
+
+  private void removeLocation(String table, String tableNameToModify) throws TableNotFoundException, MutationsRejectedException {
+    String tableIdToModify = getConnector().tableOperations().tableIdMap().get(tableNameToModify);
+    BatchDeleter deleter = getConnector().createBatchDeleter(table, Authorizations.EMPTY, 1, new BatchWriterConfig());
+    deleter.setRanges(Collections.singleton(new KeyExtent(new Text(tableIdToModify), null, null).toMetadataRange()));
+    deleter.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+    deleter.delete();
+    deleter.close();
+  }
+
+  private int findTabletsNeedingAttention(String table) throws TableNotFoundException {
+    int results = 0;
+    Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY);
+    MetaDataTableScanner.configureScanner(scanner, new State());
+    scanner.updateScanIteratorOption("tabletChange", "debug", "1");
+    for (Entry<Key,Value> e : scanner) {
+      if (e != null)
+        results++;
+    }
+    return results;
+  }
+
+  private void createTable(String t, boolean online) throws AccumuloSecurityException, AccumuloException, TableNotFoundException, TableExistsException {
+    Connector conn = getConnector();
+    conn.tableOperations().create(t);
+    conn.tableOperations().online(t, true);
+    if (!online) {
+      conn.tableOperations().offline(t, true);
+    }
+  }
+
+  private void cloneMetadataTable(String cloned) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException {
+    getConnector().tableOperations().clone(MetadataTable.NAME, cloned, true, null, null);
+  }
+
+  private void dropTables(String... tables) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    for (String t : tables) {
+      getConnector().tableOperations().delete(t);
+    }
+  }
+
+  private final class State implements CurrentState {
+
+    @Override
+    public Set<TServerInstance> onlineTabletServers() {
+      HashSet<TServerInstance> tservers = new HashSet<TServerInstance>();
+      for (String tserver : getConnector().instanceOperations().getTabletServers()) {
+        try {
+          String zPath = ZooUtil.getRoot(getConnector().getInstance()) + Constants.ZTSERVERS + "/" + tserver;
+          long sessionId = ZooLock.getSessionId(new ZooCache(getCluster().getZooKeepers(), getConnector().getInstance().getZooKeepersSessionTimeOut()), zPath);
+          tservers.add(new TServerInstance(tserver, sessionId));
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return tservers;
+    }
+
+    @Override
+    public Set<String> onlineTables() {
+      HashSet<String> onlineTables = new HashSet<String>(getConnector().tableOperations().tableIdMap().values());
+      return Sets.filter(onlineTables, new Predicate<String>() {
+        @Override
+        public boolean apply(String tableId) {
+          return Tables.getTableState(getConnector().getInstance(), tableId) == TableState.ONLINE;
+        }
+      });
+    }
+
+    @Override
+    public Collection<MergeInfo> merges() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Collection<KeyExtent> migrations() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public MasterState getMasterState() {
+      return MasterState.NORMAL;
+    }
+
+    @Override
+    public Set<TServerInstance> shutdownServers() {
+      return Collections.emptySet();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
new file mode 100644
index 0000000..ffadd22
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TimeoutIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 75;
+  }
+
+  @Test
+  public void run() throws Exception {
+    Connector conn = getConnector();
+    String[] tableNames = getUniqueNames(2);
+    testBatchWriterTimeout(conn, tableNames[0]);
+    testBatchScannerTimeout(conn, tableNames[1]);
+  }
+
+  public void testBatchWriterTimeout(Connector conn, String tableName) throws Exception {
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().addConstraint(tableName, SlowConstraint.class.getName());
+
+    // give constraint time to propagate through zookeeper
+    UtilWaitThread.sleep(1000);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig().setTimeout(3, TimeUnit.SECONDS));
+
+    Mutation mut = new Mutation("r1");
+    mut.put("cf1", "cq1", "v1");
+
+    bw.addMutation(mut);
+    try {
+      bw.close();
+      fail("batch writer did not timeout");
+    } catch (MutationsRejectedException mre) {
+      if (mre.getCause() instanceof TimedOutException)
+        return;
+      throw mre;
+    }
+  }
+
+  public void testBatchScannerTimeout(Connector conn, String tableName) throws Exception {
+    getConnector().tableOperations().create(tableName);
+
+    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+
+    Mutation m = new Mutation("r1");
+    m.put("cf1", "cq1", "v1");
+    m.put("cf1", "cq2", "v2");
+    m.put("cf1", "cq3", "v3");
+    m.put("cf1", "cq4", "v4");
+
+    bw.addMutation(m);
+    bw.close();
+
+    BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 2);
+    bs.setRanges(Collections.singletonList(new Range()));
+
+    // should not timeout
+    for (Entry<Key,Value> entry : bs) {
+      entry.getKey();
+    }
+
+    bs.setTimeout(5, TimeUnit.SECONDS);
+    IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
+    iterSetting.addOption("sleepTime", 2000 + "");
+    bs.addScanIterator(iterSetting);
+
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        entry.getKey();
+      }
+      fail("batch scanner did not time out");
+    } catch (TimedOutException toe) {
+      // toe.printStackTrace();
+    }
+    bs.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/VisibilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/VisibilityIT.java b/test/src/main/java/org/apache/accumulo/test/functional/VisibilityIT.java
new file mode 100644
index 0000000..3d6ad85
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/VisibilityIT.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.ByteArraySet;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class VisibilityIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  Authorizations origAuths = null;
+
+  @Before
+  public void emptyAuths() throws Exception {
+    Connector c = getConnector();
+    origAuths = c.securityOperations().getUserAuthorizations(getAdminPrincipal());
+  }
+
+  @After
+  public void resetAuths() throws Exception {
+    Connector c = getConnector();
+    if (null != origAuths) {
+      c.securityOperations().changeUserAuthorizations(getAdminPrincipal(), origAuths);
+    }
+  }
+
+  @Test
+  public void run() throws Exception {
+    Connector c = getConnector();
+    String[] tableNames = getUniqueNames(2);
+    String table = tableNames[0];
+    c.tableOperations().create(table);
+    String table2 = tableNames[1];
+    c.tableOperations().create(table2);
+    c.tableOperations().setProperty(table2, Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "DEFLABEL");
+
+    insertData(c, table);
+    queryData(c, table);
+    deleteData(c, table);
+
+    insertDefaultData(c, table2);
+    queryDefaultData(c, table2);
+
+  }
+
+  private static SortedSet<String> nss(String... labels) {
+    TreeSet<String> ts = new TreeSet<String>();
+
+    for (String s : labels) {
+      ts.add(s);
+    }
+
+    return ts;
+  }
+
+  private void mput(Mutation m, String cf, String cq, String cv, String val) {
+    ColumnVisibility le = new ColumnVisibility(cv.getBytes(UTF_8));
+    m.put(new Text(cf), new Text(cq), le, new Value(val.getBytes(UTF_8)));
+  }
+
+  private void mputDelete(Mutation m, String cf, String cq, String cv) {
+    ColumnVisibility le = new ColumnVisibility(cv.getBytes(UTF_8));
+    m.putDelete(new Text(cf), new Text(cq), le);
+  }
+
+  private void insertData(Connector c, String tableName) throws Exception {
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m1 = new Mutation(new Text("row1"));
+
+    mput(m1, "cf1", "cq1", "", "v1");
+    mput(m1, "cf1", "cq1", "A", "v2");
+    mput(m1, "cf1", "cq1", "B", "v3");
+    mput(m1, "cf1", "cq1", "A&B", "v4");
+    mput(m1, "cf1", "cq1", "A&(L|M)", "v5");
+    mput(m1, "cf1", "cq1", "B&(L|M)", "v6");
+    mput(m1, "cf1", "cq1", "A&B&(L|M)", "v7");
+    mput(m1, "cf1", "cq1", "A&B&(L)", "v8");
+    mput(m1, "cf1", "cq1", "A&FOO", "v9");
+    mput(m1, "cf1", "cq1", "A&FOO&(L|M)", "v10");
+    mput(m1, "cf1", "cq1", "FOO", "v11");
+    mput(m1, "cf1", "cq1", "(A|B)&FOO&(L|M)", "v12");
+    mput(m1, "cf1", "cq1", "A&B&(L|M|FOO)", "v13");
+
+    bw.addMutation(m1);
+    bw.close();
+  }
+
+  private void deleteData(Connector c, String tableName) throws Exception {
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m1 = new Mutation(new Text("row1"));
+
+    mputDelete(m1, "cf1", "cq1", "");
+    mputDelete(m1, "cf1", "cq1", "A");
+    mputDelete(m1, "cf1", "cq1", "A&B");
+    mputDelete(m1, "cf1", "cq1", "B&(L|M)");
+    mputDelete(m1, "cf1", "cq1", "A&B&(L)");
+    mputDelete(m1, "cf1", "cq1", "A&FOO&(L|M)");
+    mputDelete(m1, "cf1", "cq1", "(A|B)&FOO&(L|M)");
+    mputDelete(m1, "cf1", "cq1", "FOO&A"); // should not delete anything
+
+    bw.addMutation(m1);
+    bw.close();
+
+    Map<Set<String>,Set<String>> expected = new HashMap<Set<String>,Set<String>>();
+
+    expected.put(nss("A", "L"), nss("v5"));
+    expected.put(nss("A", "M"), nss("v5"));
+    expected.put(nss("B"), nss("v3"));
+    expected.put(nss("Z"), nss());
+    expected.put(nss("A", "B", "L"), nss("v7", "v13"));
+    expected.put(nss("A", "B", "M"), nss("v7", "v13"));
+    expected.put(nss("A", "B", "FOO"), nss("v13"));
+    expected.put(nss("FOO"), nss("v11"));
+    expected.put(nss("A", "FOO"), nss("v9"));
+
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss("A", "B", "FOO", "L", "M", "Z"), expected);
+  }
+
+  private void insertDefaultData(Connector c, String tableName) throws Exception {
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m1 = new Mutation(new Text("row1"));
+
+    mput(m1, "cf1", "cq1", "BASE", "v1");
+    mput(m1, "cf1", "cq2", "DEFLABEL", "v2");
+    mput(m1, "cf1", "cq3", "", "v3");
+
+    bw.addMutation(m1);
+    bw.close();
+  }
+
+  private static void uniqueCombos(List<Set<String>> all, Set<String> prefix, Set<String> suffix) {
+
+    all.add(prefix);
+
+    TreeSet<String> ss = new TreeSet<String>(suffix);
+
+    for (String s : suffix) {
+      TreeSet<String> ps = new TreeSet<String>(prefix);
+      ps.add(s);
+      ss.remove(s);
+
+      uniqueCombos(all, ps, ss);
+    }
+  }
+
+  private void queryData(Connector c, String tableName) throws Exception {
+    Map<Set<String>,Set<String>> expected = new HashMap<Set<String>,Set<String>>();
+    expected.put(nss(), nss("v1"));
+    expected.put(nss("A"), nss("v2"));
+    expected.put(nss("A", "L"), nss("v5"));
+    expected.put(nss("A", "M"), nss("v5"));
+    expected.put(nss("B"), nss("v3"));
+    expected.put(nss("B", "L"), nss("v6"));
+    expected.put(nss("B", "M"), nss("v6"));
+    expected.put(nss("Z"), nss());
+    expected.put(nss("A", "B"), nss("v4"));
+    expected.put(nss("A", "B", "L"), nss("v7", "v8", "v13"));
+    expected.put(nss("A", "B", "M"), nss("v7", "v13"));
+    expected.put(nss("A", "B", "FOO"), nss("v13"));
+    expected.put(nss("FOO"), nss("v11"));
+    expected.put(nss("A", "FOO"), nss("v9"));
+    expected.put(nss("A", "FOO", "L"), nss("v10", "v12"));
+    expected.put(nss("A", "FOO", "M"), nss("v10", "v12"));
+    expected.put(nss("B", "FOO", "L"), nss("v12"));
+    expected.put(nss("B", "FOO", "M"), nss("v12"));
+
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss("A", "B", "FOO", "L", "M", "Z"), expected);
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss("A", "B", "L", "M", "Z"), expected);
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss("A", "Z"), expected);
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss("Z"), expected);
+    queryData(c, tableName, nss("A", "B", "FOO", "L", "M", "Z"), nss(), expected);
+  }
+
+  private void queryData(Connector c, String tableName, Set<String> allAuths, Set<String> userAuths, Map<Set<String>,Set<String>> expected) throws Exception {
+
+    c.securityOperations().changeUserAuthorizations(getAdminPrincipal(), new Authorizations(nbas(userAuths)));
+
+    ArrayList<Set<String>> combos = new ArrayList<Set<String>>();
+    uniqueCombos(combos, nss(), allAuths);
+
+    for (Set<String> set1 : combos) {
+      Set<String> e = new TreeSet<String>();
+      for (Set<String> set2 : combos) {
+
+        set2 = new HashSet<String>(set2);
+        set2.retainAll(userAuths);
+
+        if (set1.containsAll(set2) && expected.containsKey(set2)) {
+          e.addAll(expected.get(set2));
+        }
+      }
+
+      set1.retainAll(userAuths);
+      verify(c, tableName, set1, e);
+    }
+
+  }
+
+  private void queryDefaultData(Connector c, String tableName) throws Exception {
+    Scanner scanner;
+
+    // should return no records
+    c.securityOperations().changeUserAuthorizations(getAdminPrincipal(), new Authorizations("BASE", "DEFLABEL"));
+    scanner = getConnector().createScanner(tableName, new Authorizations());
+    verifyDefault(scanner, 0);
+
+    // should return one record
+    scanner = getConnector().createScanner(tableName, new Authorizations("BASE"));
+    verifyDefault(scanner, 1);
+
+    // should return all three records
+    scanner = getConnector().createScanner(tableName, new Authorizations("BASE", "DEFLABEL"));
+    verifyDefault(scanner, 3);
+  }
+
+  private void verifyDefault(Scanner scanner, int expectedCount) throws Exception {
+    int actual = Iterators.size(scanner.iterator());
+    if (actual != expectedCount)
+      throw new Exception("actual count " + actual + " != expected count " + expectedCount);
+  }
+
+  private void verify(Connector c, String tableName, Set<String> auths, Set<String> expectedValues) throws Exception {
+    ByteArraySet bas = nbas(auths);
+
+    try {
+      verify(c, tableName, bas, expectedValues.toArray(new String[0]));
+    } catch (Exception e) {
+      throw new Exception("Verification failed auths=" + auths + " exp=" + expectedValues, e);
+    }
+  }
+
+  private ByteArraySet nbas(Set<String> auths) {
+    ByteArraySet bas = new ByteArraySet();
+    for (String auth : auths) {
+      bas.add(auth.getBytes(UTF_8));
+    }
+    return bas;
+  }
+
+  private void verify(Connector c, String tableName, ByteArraySet nss, String... expected) throws Exception {
+    Scanner scanner = c.createScanner(tableName, new Authorizations(nss));
+    verify(scanner.iterator(), expected);
+
+    BatchScanner bs = getConnector().createBatchScanner(tableName, new Authorizations(nss), 3);
+    bs.setRanges(Collections.singleton(new Range()));
+    verify(bs.iterator(), expected);
+    bs.close();
+  }
+
+  private void verify(Iterator<Entry<Key,Value>> iter, String... expected) throws Exception {
+    HashSet<String> valuesSeen = new HashSet<String>();
+
+    while (iter.hasNext()) {
+      Entry<Key,Value> entry = iter.next();
+      if (valuesSeen.contains(entry.getValue().toString())) {
+        throw new Exception("Value seen twice");
+      }
+      valuesSeen.add(entry.getValue().toString());
+    }
+
+    for (String ev : expected) {
+      if (!valuesSeen.remove(ev)) {
+        throw new Exception("Did not see expected value " + ev);
+      }
+    }
+
+    if (valuesSeen.size() != 0) {
+      throw new Exception("Saw more values than expected " + valuesSeen);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
new file mode 100644
index 0000000..34d1c6d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
+import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
+import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
+import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
+import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
+import static org.apache.accumulo.core.security.Authorizations.EMPTY;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WALSunnyDayIT extends ConfigurableMacBase {
+
+  private static final Text CF = new Text(new byte[0]);
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(GC_CYCLE_DELAY, "1s");
+    cfg.setProperty(GC_CYCLE_START, "0s");
+    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(TSERV_WAL_REPLICATION, "1");
+    cfg.setProperty(INSTANCE_ZK_TIMEOUT, "3s");
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  int countTrue(Collection<Boolean> bools) {
+    int result = 0;
+    for (Boolean b : bools) {
+      if (b.booleanValue())
+        result++;
+    }
+    return result;
+  }
+
+  @Test
+  public void test() throws Exception {
+    MiniAccumuloClusterImpl mac = getCluster();
+    MiniAccumuloClusterControl control = mac.getClusterControl();
+    control.stop(GARBAGE_COLLECTOR);
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    writeSomeData(c, tableName, 1, 1);
+
+    // wal markers are added lazily
+    Map<String,Boolean> wals = getWals(c);
+    assertEquals(wals.toString(), 2, wals.size());
+    for (Boolean b : wals.values()) {
+      assertTrue("logs should be in use", b.booleanValue());
+    }
+
+    // roll log, get a new next
+    writeSomeData(c, tableName, 1000, 50);
+    Map<String,Boolean> walsAfterRoll = getWals(c);
+    assertEquals("should have 3 WALs after roll", 3, walsAfterRoll.size());
+    assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet()));
+    assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values()));
+
+    // flush the tables
+    for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
+      c.tableOperations().flush(table, null, null, true);
+    }
+    UtilWaitThread.sleep(1000);
+    // rolled WAL is no longer in use, but needs to be GC'd
+    Map<String,Boolean> walsAfterflush = getWals(c);
+    assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size());
+    assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
+
+    // let the GC run for a little bit
+    control.start(GARBAGE_COLLECTOR);
+    UtilWaitThread.sleep(5 * 1000);
+    // make sure the unused WAL goes away
+    Map<String,Boolean> walsAfterGC = getWals(c);
+    assertEquals(walsAfterGC.toString(), 2, walsAfterGC.size());
+    control.stop(GARBAGE_COLLECTOR);
+    // restart the tserver, but don't run recovery on all tablets
+    control.stop(TABLET_SERVER);
+    // this delays recovery on the normal tables
+    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
+    control.start(TABLET_SERVER);
+
+    // wait for the metadata table to go back online
+    getRecoveryMarkers(c);
+    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
+    UtilWaitThread.sleep(5 * 1000);
+    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
+    // log.debug("markers " + markers);
+    assertEquals("one tablet should have markers", 1, markers.keySet().size());
+    assertEquals("tableId of the keyExtent should be 1", markers.keySet().iterator().next().getTableId(), new Text("1"));
+
+    // put some data in the WAL
+    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+    verifySomeData(c, tableName, 1000 * 50 + 1);
+    writeSomeData(c, tableName, 100, 100);
+
+    Map<String,Boolean> walsAfterRestart = getWals(c);
+    // log.debug("wals after " + walsAfterRestart);
+    assertEquals("used WALs after restart should be 4", 4, countTrue(walsAfterRestart.values()));
+    control.start(GARBAGE_COLLECTOR);
+    UtilWaitThread.sleep(5 * 1000);
+    Map<String,Boolean> walsAfterRestartAndGC = getWals(c);
+    assertEquals("wals left should be 2", 2, walsAfterRestartAndGC.size());
+    assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
+  }
+
+  private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
+    Scanner scan = c.createScanner(tableName, EMPTY);
+    int result = Iterators.size(scan.iterator());
+    scan.close();
+    Assert.assertEquals(expected, result);
+  }
+
+  private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
+    Random rand = new Random();
+    BatchWriter bw = conn.createBatchWriter(tableName, null);
+    byte[] rowData = new byte[10];
+    byte[] cq = new byte[10];
+    byte[] value = new byte[10];
+
+    for (int r = 0; r < row; r++) {
+      rand.nextBytes(rowData);
+      Mutation m = new Mutation(rowData);
+      for (int c = 0; c < col; c++) {
+        rand.nextBytes(cq);
+        rand.nextBytes(value);
+        m.put(CF, new Text(cq), new Value(value));
+      }
+      bw.addMutation(m);
+      if (r % 100 == 0) {
+        bw.flush();
+      }
+    }
+    bw.close();
+  }
+
+  private Map<String,Boolean> getWals(Connector c) throws Exception {
+    Map<String,Boolean> result = new HashMap<>();
+    Instance i = c.getInstance();
+    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+    WalStateManager wals = new WalStateManager(c.getInstance(), zk);
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      // WALs are in use if they are not unreferenced
+      result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
+    }
+    return result;
+  }
+
+  private Map<KeyExtent,List<String>> getRecoveryMarkers(Connector c) throws Exception {
+    Map<KeyExtent,List<String>> result = new HashMap<>();
+    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
+    root.setRange(TabletsSection.getRange());
+    root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
+
+    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
+    meta.setRange(TabletsSection.getRange());
+    meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+    TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
+
+    List<String> logs = new ArrayList<>();
+    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
+    while (both.hasNext()) {
+      Entry<Key,Value> entry = both.next();
+      Key key = entry.getKey();
+      if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
+        logs.add(key.getColumnQualifier().toString());
+      }
+      if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
+        KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
+        result.put(extent, logs);
+        logs = new ArrayList<String>();
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
new file mode 100644
index 0000000..07d197d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.Socket;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Range;
+import com.google.common.net.HostAndPort;
+
+// ACCUMULO-2757 - make sure we don't make too many more watchers
+public class WatchTheWatchCountIT extends ConfigurableMacBase {
+  private static final Logger log = LoggerFactory.getLogger(WatchTheWatchCountIT.class);
+
+  public int defaultOverrideSeconds() {
+    return 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(3);
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String[] tableNames = getUniqueNames(3);
+    for (String tableName : tableNames) {
+      c.tableOperations().create(tableName);
+    }
+    c.tableOperations().list();
+    String zooKeepers = c.getInstance().getZooKeepers();
+    final Range<Long> expectedWatcherRange = Range.open(475l, 700l);
+    long total = 0;
+    final HostAndPort hostAndPort = HostAndPort.fromString(zooKeepers);
+    for (int i = 0; i < 5; i++) {
+      Socket socket = new Socket(hostAndPort.getHostText(), hostAndPort.getPort());
+      try {
+        socket.getOutputStream().write("wchs\n".getBytes(), 0, 5);
+        byte[] buffer = new byte[1024];
+        int n = socket.getInputStream().read(buffer);
+        String response = new String(buffer, 0, n);
+        total = Long.parseLong(response.split(":")[1].trim());
+        log.info("Total: {}", total);
+        if (expectedWatcherRange.contains(total)) {
+          break;
+        }
+        log.debug("Expected number of watchers to be contained in {}, but actually was {}. Sleeping and retrying", expectedWatcherRange, total);
+        Thread.sleep(5000);
+      } finally {
+        socket.close();
+      }
+    }
+
+    assertTrue("Expected number of watchers to be contained in " + expectedWatcherRange + ", but actually was " + total, expectedWatcherRange.contains(total));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
new file mode 100644
index 0000000..d877969
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+public class WriteAheadLogIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+    cfg.setProperty(Property.GC_CYCLE_START, "1");
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s");
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "750K");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    opts.setTableName(tableName);
+
+    ClientConfiguration clientConfig = cluster.getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(clientConfig);
+      vopts.updateKerberosCredentials(clientConfig);
+    } else {
+      opts.setPrincipal(getAdminPrincipal());
+      vopts.setPrincipal(getAdminPrincipal());
+    }
+
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+    vopts.setTableName(tableName);
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+    getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
new file mode 100644
index 0000000..45b671c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WriteLotsIT.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.junit.Test;
+
+public class WriteLotsIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 90;
+  }
+
+  @Test
+  public void writeLots() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    final AtomicReference<Exception> ref = new AtomicReference<Exception>();
+    List<Thread> threads = new ArrayList<Thread>();
+    final ClientConfiguration clientConfig = getCluster().getClientConfig();
+    for (int i = 0; i < 10; i++) {
+      final int index = i;
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            TestIngest.Opts opts = new TestIngest.Opts();
+            opts.startRow = index * 10000;
+            opts.rows = 10000;
+            opts.setTableName(tableName);
+            if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+              opts.updateKerberosCredentials(clientConfig);
+            } else {
+              opts.setPrincipal(getAdminPrincipal());
+            }
+            TestIngest.ingest(c, opts, new BatchWriterOpts());
+          } catch (Exception ex) {
+            ref.set(ex);
+          }
+        }
+      };
+      t.start();
+      threads.add(t);
+    }
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    if (ref.get() != null) {
+      throw ref.get();
+    }
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = 10000 * 10;
+    vopts.setTableName(tableName);
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      vopts.updateKerberosCredentials(clientConfig);
+    } else {
+      vopts.setPrincipal(getAdminPrincipal());
+    }
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
new file mode 100644
index 0000000..a531ee0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZooCacheIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ZooCacheIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  private static String pathName = "/zcTest-42";
+  private static File testDir;
+
+  @BeforeClass
+  public static void createTestDirectory() {
+    testDir = new File(createTestDir(ZooCacheIT.class.getName()), pathName);
+    FileUtils.deleteQuietly(testDir);
+    assertTrue(testDir.mkdir());
+  }
+
+  @Test
+  public void test() throws Exception {
+    assertEquals(0, exec(CacheTestClean.class, pathName, testDir.getAbsolutePath()).waitFor());
+    final AtomicReference<Exception> ref = new AtomicReference<Exception>();
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < 3; i++) {
+      Thread reader = new Thread() {
+        @Override
+        public void run() {
+          try {
+            CacheTestReader.main(new String[] {pathName, testDir.getAbsolutePath(), getConnector().getInstance().getZooKeepers()});
+          } catch (Exception ex) {
+            ref.set(ex);
+          }
+        }
+      };
+      reader.start();
+      threads.add(reader);
+    }
+    assertEquals(0, exec(CacheTestWriter.class, pathName, testDir.getAbsolutePath(), "3", "50").waitFor());
+    for (Thread t : threads) {
+      t.join();
+      if (ref.get() != null)
+        throw ref.get();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
new file mode 100644
index 0000000..19f90fe
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class ZookeeperRestartIT extends ConfigurableMacBase {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> siteConfig = new HashMap<String,String>();
+    siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    BatchWriter bw = c.createBatchWriter("test_ingest", null);
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
+    bw.close();
+
+    // kill zookeeper
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.ZOOKEEPER))
+      cluster.killProcess(ServerType.ZOOKEEPER, proc);
+
+    // give the servers time to react
+    UtilWaitThread.sleep(1000);
+
+    // start zookeeper back up
+    cluster.start();
+
+    // use the tservers
+    Scanner s = c.createScanner("test_ingest", Authorizations.EMPTY);
+    Iterator<Entry<Key,Value>> i = s.iterator();
+    assertTrue(i.hasNext());
+    assertEquals("row", i.next().getKey().getRow().toString());
+    assertFalse(i.hasNext());
+    // use the master
+    c.tableOperations().delete("test_ingest");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/main/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
new file mode 100644
index 0000000..a0d355e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.test.continuous.ContinuousIngest;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class RollWALPerformanceIT extends ConfigurableMacBase {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M");
+    cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100");
+    cfg.setProperty(Property.GC_FILE_ARCHIVE, "false");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.useMiniDFS(true);
+  }
+
+  private long ingest() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+
+    log.info("Creating the table");
+    c.tableOperations().create(tableName);
+
+    log.info("Splitting the table");
+    final long SPLIT_COUNT = 100;
+    final long distance = Long.MAX_VALUE / SPLIT_COUNT;
+    final SortedSet<Text> splits = new TreeSet<Text>();
+    for (int i = 1; i < SPLIT_COUNT; i++) {
+      splits.add(new Text(String.format("%016x", i * distance)));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+
+    log.info("Waiting for balance");
+    c.instanceOperations().waitForBalance();
+
+    final Instance inst = c.getInstance();
+
+    log.info("Starting ingest");
+    final long start = System.currentTimeMillis();
+    final String args[] = {"-i", inst.getInstanceName(), "-z", inst.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--batchThreads", "2", "--table",
+        tableName, "--num", Long.toString(1000 * 1000), // 1M 100 byte entries
+    };
+
+    ContinuousIngest.main(args);
+    final long result = System.currentTimeMillis() - start;
+    log.debug(String.format("Finished in %,d ms", result));
+    log.debug("Dropping table");
+    c.tableOperations().delete(tableName);
+    return result;
+  }
+
+  private long getAverage() throws Exception {
+    final int REPEAT = 3;
+    long totalTime = 0;
+    for (int i = 0; i < REPEAT; i++) {
+      totalTime += ingest();
+    }
+    return totalTime / REPEAT;
+  }
+
+  private void testWalPerformanceOnce() throws Exception {
+    // get time with a small WAL, which will cause many WAL roll-overs
+    long avg1 = getAverage();
+    // use a bigger WAL max size to eliminate WAL roll-overs
+    Connector c = getConnector();
+    c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G");
+    c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+    c.tableOperations().flush(RootTable.NAME, null, null, true);
+    for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) {
+      getCluster().killProcess(ServerType.TABLET_SERVER, tserver);
+    }
+    getCluster().start();
+    long avg2 = getAverage();
+    log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2));
+    assertTrue(avg1 > avg2);
+    double percent = (100. * avg1) / avg2;
+    log.info(String.format("Percent of large log: %.2f%%", percent));
+    assertTrue(percent < 125.);
+  }
+
+  @Test(timeout = 20 * 60 * 1000)
+  public void testWalPerformance() throws Exception {
+    testWalPerformanceOnce();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
new file mode 100644
index 0000000..236522a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.performance.metadata;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+// ACCUMULO-3327
+public class FastBulkImportIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(3);
+    cfg.setProperty(Property.TSERV_BULK_ASSIGNMENT_THREADS, "5");
+    cfg.setProperty(Property.TSERV_BULK_PROCESS_THREADS, "5");
+    cfg.setProperty(Property.TABLE_MAJC_RATIO, "9999");
+    cfg.setProperty(Property.TABLE_FILE_MAX, "9999");
+  }
+
+  @Test
+  public void test() throws Exception {
+    log.info("Creating table");
+    final String tableName = getUniqueNames(1)[0];
+    final Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    log.info("Adding splits");
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 1; i < 0xfff; i += 7) {
+      splits.add(new Text(Integer.toHexString(i)));
+    }
+    c.tableOperations().addSplits(tableName, splits);
+
+    log.info("Creating lots of bulk import files");
+    FileSystem fs = getCluster().getFileSystem();
+    Path basePath = getCluster().getTemporaryPath();
+    CachedConfiguration.setInstance(fs.getConf());
+
+    Path base = new Path(basePath, "testBulkFail_" + tableName);
+    fs.delete(base, true);
+    fs.mkdirs(base);
+    Path bulkFailures = new Path(base, "failures");
+    Path files = new Path(base, "files");
+    fs.mkdirs(bulkFailures);
+    fs.mkdirs(files);
+    for (int i = 0; i < 100; i++) {
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+          AccumuloConfiguration.getDefaultConfiguration());
+      writer.startDefaultLocalityGroup();
+      for (int j = 0x100; j < 0xfff; j += 3) {
+        writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
+      }
+      writer.close();
+    }
+    log.info("Waiting for balance");
+    c.instanceOperations().waitForBalance();
+
+    log.info("Bulk importing files");
+    long now = System.currentTimeMillis();
+    c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(), true);
+    double diffSeconds = (System.currentTimeMillis() - now) / 1000.;
+    log.info(String.format("Import took %.2f seconds", diffSeconds));
+    assertTrue(diffSeconds < 30);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java b/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
new file mode 100644
index 0000000..745326e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.proxy.Proxy;
+import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client;
+import org.apache.accumulo.proxy.thrift.Column;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.Condition;
+import org.apache.accumulo.proxy.thrift.ConditionalStatus;
+import org.apache.accumulo.proxy.thrift.ConditionalUpdates;
+import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions;
+import org.apache.accumulo.proxy.thrift.Durability;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.server.TServer;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+import com.google.common.net.HostAndPort;
+
+public class ProxyDurabilityIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
+    cfg.setNumTservers(1);
+  }
+
+  private static ByteBuffer bytes(String value) {
+    return ByteBuffer.wrap(value.getBytes());
+  }
+
+  @Test
+  public void testDurability() throws Exception {
+    Connector c = getConnector();
+    Properties props = new Properties();
+    // Avoid issues with locally installed client configuration files with custom properties
+    File emptyFile = Files.createTempFile(null, null).toFile();
+    emptyFile.deleteOnExit();
+    props.put("instance", c.getInstance().getInstanceName());
+    props.put("zookeepers", c.getInstance().getZooKeepers());
+    props.put("tokenClass", PasswordToken.class.getName());
+    props.put("clientConfigurationFile", emptyFile.toString());
+
+    TJSONProtocol.Factory protocol = new TJSONProtocol.Factory();
+
+    int proxyPort = PortUtils.getRandomFreePort();
+    final TServer proxyServer = Proxy.createProxyServer(HostAndPort.fromParts("localhost", proxyPort), protocol, props).server;
+    while (!proxyServer.isServing())
+      UtilWaitThread.sleep(100);
+    Client client = new TestProxyClient("localhost", proxyPort, protocol).proxy();
+    Map<String,String> properties = new TreeMap<String,String>();
+    properties.put("password", ROOT_PASSWORD);
+    ByteBuffer login = client.login("root", properties);
+
+    String tableName = getUniqueNames(1)[0];
+    client.createTable(login, tableName, true, TimeType.MILLIS);
+    assertTrue(c.tableOperations().exists(tableName));
+
+    WriterOptions options = new WriterOptions();
+    options.setDurability(Durability.NONE);
+    String writer = client.createWriter(login, tableName, options);
+    Map<ByteBuffer,List<ColumnUpdate>> cells = new TreeMap<ByteBuffer,List<ColumnUpdate>>();
+    ColumnUpdate column = new ColumnUpdate(bytes("cf"), bytes("cq"));
+    column.setValue("value".getBytes());
+    cells.put(bytes("row"), Collections.singletonList(column));
+    client.update(writer, cells);
+    client.closeWriter(writer);
+    assertEquals(1, count(tableName));
+    restartTServer();
+    assertEquals(0, count(tableName));
+
+    ConditionalWriterOptions cfg = new ConditionalWriterOptions();
+    cfg.setDurability(Durability.SYNC);
+    String cwriter = client.createConditionalWriter(login, tableName, cfg);
+    ConditionalUpdates updates = new ConditionalUpdates();
+    updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
+    updates.addToUpdates(column);
+    Map<ByteBuffer,ConditionalStatus> status = client.updateRowsConditionally(cwriter, Collections.singletonMap(bytes("row"), updates));
+    assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
+    assertEquals(1, count(tableName));
+    restartTServer();
+    assertEquals(1, count(tableName));
+
+    proxyServer.stop();
+  }
+
+  private void restartTServer() throws Exception {
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    cluster.start();
+  }
+
+  private int count(String tableName) throws Exception {
+    return Iterators.size((getConnector().createScanner(tableName, Authorizations.EMPTY)).iterator());
+  }
+
+}


Mime
View raw message