accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [09/24] accumulo git commit: Merge branch '1.7' into 1.8
Date Tue, 25 Jul 2017 23:02:57 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index 748423e,0000000..46bf3fc
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@@ -1,390 -1,0 +1,389 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +import java.util.EnumSet;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.ActiveScan;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorUtil;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +/**
 + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()}
 + * returns a unique scan id.
 + *
 + * <p>
 + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions. The test exercises multiple
 + * tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers for completeness.
 + *
 + * <p>
 + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added:
 + *
 + * <p>
 + * private static final long serialVersionUID = -4659975753252858243l;
 + *
 + * <p>
 + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated.
 + */
 +public class ScanIdIT extends AccumuloClusterHarness {
 +
 +  private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
 +
 +  private static final int NUM_SCANNERS = 8;
 +
 +  private static final int NUM_DATA_ROWS = 100;
 +
 +  private static final Random random = new Random();
 +
 +  private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS);
 +
 +  private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
 +
 +  private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<>();
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  /**
 +   * @throws Exception
 +   *           any exception is a test failure.
 +   */
 +  @Test
 +  public void testScanId() throws Exception {
 +
 +    final String tableName = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +    conn.tableOperations().create(tableName);
 +
 +    addSplits(conn, tableName);
 +
 +    log.info("Splits added");
 +
 +    generateSampleData(conn, tableName);
 +
 +    log.info("Generated data for {}", tableName);
 +
 +    attachSlowIterator(conn, tableName);
 +
 +    CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
 +
 +    for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
 +      ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch);
 +      pool.submit(st);
 +    }
 +
 +    // wait for scanners to report a result.
 +    while (testInProgress.get()) {
 +
 +      if (resultsByWorker.size() < NUM_SCANNERS) {
 +        log.trace("Results reported {}", resultsByWorker.size());
 +        sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
 +      } else {
 +        // each worker has reported at least one result.
 +        testInProgress.set(false);
 +
 +        log.debug("Final result count {}", resultsByWorker.size());
 +
 +        // delay to allow scanners to react to end of test and cleanly close.
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      }
 +
 +    }
 +
 +    // all scanner have reported at least 1 result, so check for unique scan ids.
 +    Set<Long> scanIds = new HashSet<>();
 +
 +    List<String> tservers = conn.instanceOperations().getTabletServers();
 +
 +    log.debug("tablet servers {}", tservers.toString());
 +
 +    for (String tserver : tservers) {
 +
 +      List<ActiveScan> activeScans = null;
 +      for (int i = 0; i < 10; i++) {
 +        try {
 +          activeScans = conn.instanceOperations().getActiveScans(tserver);
 +          break;
 +        } catch (AccumuloException e) {
 +          if (e.getCause() instanceof TableNotFoundException) {
 +            log.debug("Got TableNotFoundException, will retry");
 +            Thread.sleep(200);
 +            continue;
 +          }
 +          throw e;
 +        }
 +      }
 +
 +      assertNotNull("Repeatedly got exception trying to active scans", activeScans);
 +
 +      log.debug("TServer {} has {} active scans", tserver, activeScans.size());
 +
 +      for (ActiveScan scan : activeScans) {
 +        log.debug("Tserver {} scan id {}", tserver, scan.getScanid());
 +        scanIds.add(scan.getScanid());
 +      }
 +    }
 +
 +    assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(), NUM_SCANNERS <= scanIds.size());
 +
 +  }
 +
 +  /**
 +   * Runs scanner in separate thread to allow multiple scanners to execute in parallel.
 +   * <p>
 +   * The thread run method is terminated when the testInProgress flag is set to false.
 +   */
 +  private static class ScannerThread implements Runnable {
 +
 +    private final Connector connector;
 +    private Scanner scanner = null;
 +    private final int workerIndex;
 +    private final String tablename;
 +    private final CountDownLatch latch;
 +
 +    public ScannerThread(final Connector connector, final int workerIndex, final String tablename, final CountDownLatch latch) {
 +      this.connector = connector;
 +      this.workerIndex = workerIndex;
 +      this.tablename = tablename;
 +      this.latch = latch;
 +    }
 +
 +    /**
 +     * execute the scan across the sample data and put scan result into result map until testInProgress flag is set to false.
 +     */
 +    @Override
 +    public void run() {
 +
 +      latch.countDown();
 +      try {
 +        latch.await();
 +      } catch (InterruptedException e) {
 +        log.error("Thread interrupted with id {}", workerIndex);
 +        Thread.currentThread().interrupt();
 +        return;
 +      }
 +
 +      log.debug("Creating scanner in worker thread {}", workerIndex);
 +
 +      try {
 +
 +        scanner = connector.createScanner(tablename, new Authorizations());
 +
 +        // Never start readahead
 +        scanner.setReadaheadThreshold(Long.MAX_VALUE);
 +        scanner.setBatchSize(1);
 +
 +        // create different ranges to try to hit more than one tablet.
 +        scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9")));
 +
 +      } catch (TableNotFoundException e) {
 +        throw new IllegalStateException("Initialization failure. Could not create scanner", e);
 +      }
 +
 +      scanner.fetchColumnFamily(new Text("fam1"));
 +
 +      for (Map.Entry<Key,Value> entry : scanner) {
 +
 +        // exit when success condition is met.
 +        if (!testInProgress.get()) {
 +          scanner.clearScanIterators();
 +          scanner.close();
 +
 +          return;
 +        }
 +
 +        Text row = entry.getKey().getRow();
 +
 +        log.debug("worker {}, row {}", workerIndex, row.toString());
 +
 +        if (entry.getValue() != null) {
 +
 +          Value prevValue = resultsByWorker.put(workerIndex, entry.getValue());
 +
 +          // value should always being increasing
 +          if (prevValue != null) {
 +
 +            log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue()));
 +
 +            assertTrue(prevValue.compareTo(entry.getValue()) > 0);
 +          }
 +        } else {
 +          log.info("Scanner returned null");
 +          fail("Scanner returned unexpected null value");
 +        }
 +
 +      }
 +
 +      log.debug("Scanner ran out of data. (info only, not an error) ");
 +
 +    }
 +  }
 +
 +  /**
 +   * Create splits on table and force migration by taking table offline and then bring back online for test.
 +   *
 +   * @param conn
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void addSplits(final Connector conn, final String tableName) {
 +
 +    SortedSet<Text> splits = createSplits();
 +
 +    try {
 +
 +      conn.tableOperations().addSplits(tableName, splits);
 +
 +      conn.tableOperations().offline(tableName, true);
 +
 +      sleepUninterruptibly(2, TimeUnit.SECONDS);
 +      conn.tableOperations().online(tableName, true);
 +
 +      for (Text split : conn.tableOperations().listSplits(tableName)) {
 +        log.trace("Split {}", split);
 +      }
 +
 +    } catch (AccumuloSecurityException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
 +    } catch (TableNotFoundException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
 +    } catch (AccumuloException e) {
 +      throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e);
 +    }
 +
 +  }
 +
 +  /**
 +   * Create splits to distribute data across multiple tservers.
 +   *
 +   * @return splits in sorted set for addSplits.
 +   */
 +  private SortedSet<Text> createSplits() {
 +
 +    SortedSet<Text> splits = new TreeSet<>();
 +
 +    for (int split = 0; split < 10; split++) {
 +      splits.add(new Text(Integer.toString(split)));
 +    }
 +
 +    return splits;
 +  }
 +
 +  /**
 +   * Generate some sample data using random row id to distribute across splits.
 +   * <p>
 +   * The primary goal is to determine that each scanner is assigned a unique scan id. This test does check that the count value for fam1 increases if a scanner
 +   * reads multiple value, but this is secondary consideration for this test, that is included for completeness.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void generateSampleData(Connector connector, final String tablename) {
 +
 +    try {
 +
 +      BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig());
 +
 +      ColumnVisibility vis = new ColumnVisibility("public");
 +
 +      for (int i = 0; i < NUM_DATA_ROWS; i++) {
 +
 +        Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i)));
 +
 +        Mutation m = new Mutation(rowId);
 +        m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8)));
 +        m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8)));
 +
 +        log.trace("Added row {}", rowId);
 +
 +        bw.addMutation(m);
 +      }
 +
 +      bw.close();
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
 +    } catch (MutationsRejectedException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not create test data", ex);
 +    }
 +  }
 +
 +  /**
 +   * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a fairly large sleep and delay times because
 +   * we are not concerned with how much data is read and we do not read all of the data - the test stops once each scanner reports a scan id.
 +   *
 +   * @param connector
 +   *          Accumulo connector Accumulo connector to test cluster or MAC instance.
 +   */
 +  private void attachSlowIterator(Connector connector, final String tablename) {
 +    try {
 +
 +      IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator");
 +      slowIter.addOption("sleepTime", "200");
 +      slowIter.addOption("seekSleepTime", "200");
 +
 +      connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan));
 +
 +    } catch (AccumuloException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
 +    } catch (TableNotFoundException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
 +    } catch (AccumuloSecurityException ex) {
 +      throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex);
 +    }
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
index a274dec,0000000..662cf75
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIteratorIT.java
@@@ -1,251 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertEquals;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.ScannerBase;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
- import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.core.security.TablePermission;
++import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.test.functional.AuthsIterator;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class ScanIteratorIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(ScanIteratorIT.class);
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  private Connector connector;
 +  private String tableName;
 +  private String user;
 +  private boolean saslEnabled;
 +
 +  @Before
 +  public void setup() throws Exception {
 +    connector = getConnector();
 +    tableName = getUniqueNames(1)[0];
 +
 +    connector.tableOperations().create(tableName);
 +    ClientConfiguration clientConfig = cluster.getClientConfig();
 +    ClusterUser clusterUser = getUser(0);
 +    user = clusterUser.getPrincipal();
 +    PasswordToken userToken;
 +    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
 +      userToken = null;
 +      saslEnabled = true;
 +    } else {
 +      userToken = new PasswordToken(clusterUser.getPassword());
 +      saslEnabled = false;
 +    }
 +    if (connector.securityOperations().listLocalUsers().contains(user)) {
 +      log.info("Dropping {}", user);
 +      connector.securityOperations().dropLocalUser(user);
 +    }
 +    connector.securityOperations().createLocalUser(user, userToken);
 +    connector.securityOperations().grantTablePermission(user, tableName, TablePermission.READ);
 +    connector.securityOperations().grantTablePermission(user, tableName, TablePermission.WRITE);
 +    connector.securityOperations().changeUserAuthorizations(user, AuthsIterator.AUTHS);
 +  }
 +
 +  @After
 +  public void tearDown() throws Exception {
 +    if (null != user) {
 +      if (saslEnabled) {
 +        ClusterUser rootUser = getAdminUser();
 +        UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());
 +      }
 +      connector.securityOperations().dropLocalUser(user);
 +    }
 +  }
 +
 +  @Test
 +  public void run() throws Exception {
 +    String tableName = getUniqueNames(1)[0];
 +    Connector c = getConnector();
 +
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    for (int i = 0; i < 1000; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%06d", i)));
 +      m.put(new Text("cf1"), new Text("cq1"), new Value(Integer.toString(1000 - i).getBytes(UTF_8)));
 +      m.put(new Text("cf1"), new Text("cq2"), new Value(Integer.toString(i - 1000).getBytes(UTF_8)));
 +
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    Scanner scanner = c.createScanner(tableName, new Authorizations());
 +
 +    setupIter(scanner);
 +    verify(scanner, 1, 999);
 +
 +    BatchScanner bscanner = c.createBatchScanner(tableName, new Authorizations(), 3);
 +    bscanner.setRanges(Collections.singleton(new Range((Key) null, null)));
 +
 +    setupIter(bscanner);
 +    verify(bscanner, 1, 999);
 +
 +    ArrayList<Range> ranges = new ArrayList<>();
 +    ranges.add(new Range(new Text(String.format("%06d", 1))));
 +    ranges.add(new Range(new Text(String.format("%06d", 6)), new Text(String.format("%06d", 16))));
 +    ranges.add(new Range(new Text(String.format("%06d", 20))));
 +    ranges.add(new Range(new Text(String.format("%06d", 23))));
 +    ranges.add(new Range(new Text(String.format("%06d", 56)), new Text(String.format("%06d", 61))));
 +    ranges.add(new Range(new Text(String.format("%06d", 501)), new Text(String.format("%06d", 504))));
 +    ranges.add(new Range(new Text(String.format("%06d", 998)), new Text(String.format("%06d", 1000))));
 +
 +    HashSet<Integer> got = new HashSet<>();
 +    HashSet<Integer> expected = new HashSet<>();
 +    for (int i : new int[] {1, 7, 9, 11, 13, 15, 23, 57, 59, 61, 501, 503, 999}) {
 +      expected.add(i);
 +    }
 +
 +    bscanner.setRanges(ranges);
 +
 +    for (Entry<Key,Value> entry : bscanner) {
 +      got.add(Integer.parseInt(entry.getKey().getRow().toString()));
 +    }
 +
 +    System.out.println("got : " + got);
 +
 +    if (!got.equals(expected)) {
 +      throw new Exception(got + " != " + expected);
 +    }
 +
 +    bscanner.close();
 +
 +  }
 +
 +  private void verify(Iterable<Entry<Key,Value>> scanner, int start, int finish) throws Exception {
 +
 +    int expected = start;
 +    for (Entry<Key,Value> entry : scanner) {
 +      if (Integer.parseInt(entry.getKey().getRow().toString()) != expected) {
 +        throw new Exception("Saw unexpexted " + entry.getKey().getRow() + " " + expected);
 +      }
 +
 +      if (entry.getKey().getColumnQualifier().toString().equals("cq2")) {
 +        expected += 2;
 +      }
 +    }
 +
 +    if (expected != finish + 2) {
 +      throw new Exception("Ended at " + expected + " not " + (finish + 2));
 +    }
 +  }
 +
 +  private void setupIter(ScannerBase scanner) throws Exception {
 +    IteratorSetting dropMod = new IteratorSetting(50, "dropMod", "org.apache.accumulo.test.functional.DropModIter");
 +    dropMod.addOption("mod", "2");
 +    dropMod.addOption("drop", "0");
 +    scanner.addScanIterator(dropMod);
 +  }
 +
 +  @Test
 +  public void testAuthsPresentInIteratorEnvironment() throws Exception {
 +    runTest(AuthsIterator.AUTHS, false);
 +  }
 +
 +  @Test
 +  public void testAuthsNotPresentInIteratorEnvironment() throws Exception {
 +    runTest(new Authorizations("B"), true);
 +  }
 +
 +  @Test
 +  public void testEmptyAuthsInIteratorEnvironment() throws Exception {
 +    runTest(Authorizations.EMPTY, true);
 +  }
 +
 +  private void runTest(ScannerBase scanner, Authorizations auths, boolean shouldFail) throws AccumuloSecurityException, AccumuloException,
 +      TableNotFoundException {
 +    int count = 0;
 +    for (Map.Entry<Key,Value> entry : scanner) {
 +      assertEquals(shouldFail ? AuthsIterator.FAIL : AuthsIterator.SUCCESS, entry.getKey().getRow().toString());
 +      count++;
 +    }
 +
 +    assertEquals(1, count);
 +  }
 +
 +  private void runTest(Authorizations auths, boolean shouldFail) throws Exception {
 +    ClusterUser clusterUser = getUser(0);
 +    Connector userC = getCluster().getConnector(clusterUser.getPrincipal(), clusterUser.getToken());
 +    writeTestMutation(userC);
 +
 +    IteratorSetting setting = new IteratorSetting(10, AuthsIterator.class);
 +
 +    Scanner scanner = userC.createScanner(tableName, auths);
 +    scanner.addScanIterator(setting);
 +
 +    BatchScanner batchScanner = userC.createBatchScanner(tableName, auths, 1);
 +    batchScanner.setRanges(Collections.singleton(new Range("1")));
 +    batchScanner.addScanIterator(setting);
 +
 +    runTest(scanner, auths, shouldFail);
 +    runTest(batchScanner, auths, shouldFail);
 +
 +    scanner.close();
 +    batchScanner.close();
 +  }
 +
 +  private void writeTestMutation(Connector userC) throws TableNotFoundException, MutationsRejectedException {
 +    BatchWriter batchWriter = userC.createBatchWriter(tableName, new BatchWriterConfig());
 +    Mutation m = new Mutation("1");
 +    m.put(new Text("2"), new Text("3"), new Value("".getBytes()));
 +    batchWriter.addMutation(m);
 +    batchWriter.flush();
 +    batchWriter.close();
 +
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 0074eac,0000000..5864155
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@@ -1,153 -1,0 +1,152 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.InstanceOperations;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.io.Text;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class ScanSessionTimeOutIT extends AccumuloClusterHarness {
 +  private static final Logger log = LoggerFactory.getLogger(ScanSessionTimeOutIT.class);
 +
 +  @Override
 +  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
 +    siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
 +    cfg.setSiteConfig(siteConfig);
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  private String sessionIdle = null;
 +
 +  @Before
 +  public void reduceSessionIdle() throws Exception {
 +    InstanceOperations ops = getConnector().instanceOperations();
 +    sessionIdle = ops.getSystemConfiguration().get(Property.TSERV_SESSION_MAXIDLE.getKey());
 +    ops.setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), getMaxIdleTimeString());
 +    log.info("Waiting for existing session idle time to expire");
 +    Thread.sleep(AccumuloConfiguration.getTimeInMillis(sessionIdle));
 +    log.info("Finished waiting");
 +  }
 +
 +  /**
 +   * Returns the max idle time as a string.
 +   *
 +   * @return new max idle time
 +   */
 +  protected String getMaxIdleTimeString() {
 +    return "3";
 +  }
 +
 +  @After
 +  public void resetSessionIdle() throws Exception {
 +    if (null != sessionIdle) {
 +      getConnector().instanceOperations().setProperty(Property.TSERV_SESSION_MAXIDLE.getKey(), sessionIdle);
 +    }
 +  }
 +
 +  @Test
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    for (int i = 0; i < 100000; i++) {
 +      Mutation m = new Mutation(new Text(String.format("%08d", i)));
 +      for (int j = 0; j < 3; j++)
 +        m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
 +
 +      bw.addMutation(m);
 +    }
 +
 +    bw.close();
 +
 +    Scanner scanner = c.createScanner(tableName, new Authorizations());
 +    scanner.setBatchSize(1000);
 +
 +    Iterator<Entry<Key,Value>> iter = scanner.iterator();
 +
 +    verify(iter, 0, 200);
 +
 +    // sleep three times the session timeout
 +    sleepUninterruptibly(9, TimeUnit.SECONDS);
 +
 +    verify(iter, 200, 100000);
 +
 +  }
 +
 +  protected void verify(Iterator<Entry<Key,Value>> iter, int start, int stop) throws Exception {
 +    for (int i = start; i < stop; i++) {
 +
 +      Text er = new Text(String.format("%08d", i));
 +
 +      for (int j = 0; j < 3; j++) {
 +        Entry<Key,Value> entry = iter.next();
 +
 +        if (!entry.getKey().getRow().equals(er)) {
 +          throw new Exception("row " + entry.getKey().getRow() + " != " + er);
 +        }
 +
 +        if (!entry.getKey().getColumnFamily().equals(new Text("cf1"))) {
 +          throw new Exception("cf " + entry.getKey().getColumnFamily() + " != cf1");
 +        }
 +
 +        if (!entry.getKey().getColumnQualifier().equals(new Text("cq" + j))) {
 +          throw new Exception("cq " + entry.getKey().getColumnQualifier() + " != cq" + j);
 +        }
 +
 +        if (!entry.getValue().toString().equals("" + i + "_" + j)) {
 +          throw new Exception("value " + entry.getValue() + " != " + i + "_" + j);
 +        }
 +
 +      }
 +    }
 +
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
index 334cf1c,0000000..37bef1a
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ServerSideErrorIT.java
@@@ -1,130 -1,0 +1,130 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
 +import java.util.Collections;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.Combiner;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class ServerSideErrorIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Test
 +  public void run() throws Exception {
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    IteratorSetting is = new IteratorSetting(5, "Bad Aggregator", BadCombiner.class);
 +    Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("acf")));
 +    c.tableOperations().attachIterator(tableName, is);
 +
 +    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    Mutation m = new Mutation(new Text("r1"));
 +    m.put(new Text("acf"), new Text("foo"), new Value(new byte[] {'1'}));
 +
 +    bw.addMutation(m);
 +
 +    bw.close();
 +
 +    // try to scan table
 +    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
 +
 +    boolean caught = false;
 +    try {
 +      for (Entry<Key,Value> entry : scanner) {
 +        entry.getKey();
 +      }
 +    } catch (Exception e) {
 +      caught = true;
 +    }
 +
 +    if (!caught)
 +      throw new Exception("Scan did not fail");
 +
 +    // try to batch scan the table
 +    BatchScanner bs = c.createBatchScanner(tableName, Authorizations.EMPTY, 2);
 +    bs.setRanges(Collections.singleton(new Range()));
 +
 +    caught = false;
 +    try {
 +      for (Entry<Key,Value> entry : bs) {
 +        entry.getKey();
 +      }
 +    } catch (Exception e) {
 +      caught = true;
 +    } finally {
 +      bs.close();
 +    }
 +
 +    if (!caught)
 +      throw new Exception("batch scan did not fail");
 +
 +    // remove the bad agg so accumulo can shutdown
 +    TableOperations to = c.tableOperations();
 +    for (Entry<String,String> e : to.getProperties(tableName)) {
 +      to.removeProperty(tableName, e.getKey());
 +    }
 +
 +    sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
 +
 +    // should be able to scan now
 +    scanner = c.createScanner(tableName, Authorizations.EMPTY);
 +    for (Entry<Key,Value> entry : scanner) {
 +      entry.getKey();
 +    }
 +
 +    // set a non existant iterator, should cause scan to fail on server side
 +    scanner.addScanIterator(new IteratorSetting(100, "bogus", "com.bogus.iterator"));
 +
 +    caught = false;
 +    try {
 +      for (Entry<Key,Value> entry : scanner) {
 +        // should error
 +        entry.getKey();
 +      }
 +    } catch (Exception e) {
 +      caught = true;
 +    }
 +
 +    if (!caught)
 +      throw new Exception("Scan did not fail");
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
index 98e1031,0000000..ca099bf
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
@@@ -1,124 -1,0 +1,123 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +
 +import java.io.IOException;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.server.util.Admin;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.TestRandomDeletes;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +public class ShutdownIT extends ConfigurableMacBase {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Test
 +  public void shutdownDuringIngest() throws Exception {
 +    Process ingest = cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD,
 +        "--createTable");
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +    ingest.destroy();
 +  }
 +
 +  @Test
 +  public void shutdownDuringQuery() throws Exception {
 +    assertEquals(0,
 +        cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
 +            .waitFor());
 +    Process verify = cluster.exec(VerifyIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD);
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +    verify.destroy();
 +  }
 +
 +  @Test
 +  public void shutdownDuringDelete() throws Exception {
 +    assertEquals(0,
 +        cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
 +            .waitFor());
 +    Process deleter = cluster.exec(TestRandomDeletes.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD);
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +    deleter.destroy();
 +  }
 +
 +  @Test
 +  public void shutdownDuringDeleteTable() throws Exception {
 +    final Connector c = getConnector();
 +    for (int i = 0; i < 10; i++) {
 +      c.tableOperations().create("table" + i);
 +    }
 +    final AtomicReference<Exception> ref = new AtomicReference<>();
 +    Thread async = new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          for (int i = 0; i < 10; i++)
 +            c.tableOperations().delete("table" + i);
 +        } catch (Exception ex) {
 +          ref.set(ex);
 +        }
 +      }
 +    };
 +    async.start();
 +    sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +    if (ref.get() != null)
 +      throw ref.get();
 +  }
 +
 +  @Test
 +  public void stopDuringStart() throws Exception {
 +    assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
 +  }
 +
 +  @Test
 +  public void adminStop() throws Exception {
 +    runAdminStopTest(getConnector(), cluster);
 +  }
 +
 +  static void runAdminStopTest(Connector c, MiniAccumuloClusterImpl cluster) throws InterruptedException, IOException {
 +    assertEquals(0,
 +        cluster.exec(TestIngest.class, "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), "-u", "root", "-p", ROOT_PASSWORD, "--createTable")
 +            .waitFor());
 +    List<String> tabletServers = c.instanceOperations().getTabletServers();
 +    assertEquals(2, tabletServers.size());
 +    String doomed = tabletServers.get(0);
 +    log.info("Stopping " + doomed);
 +    assertEquals(0, cluster.exec(Admin.class, "stop", doomed).waitFor());
 +    tabletServers = c.instanceOperations().getTabletServers();
 +    assertEquals(1, tabletServers.size());
 +    assertFalse(tabletServers.get(0).equals(doomed));
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
index 3f8cc27,187da35..aca092b
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowConstraint.java
@@@ -16,14 -16,12 +16,14 @@@
   */
  package org.apache.accumulo.test.functional;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.util.List;
 +import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.constraints.Constraint;
  import org.apache.accumulo.core.data.Mutation;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
  /**
   *
   */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index aeb0dff,f84a4d9..2f72db4
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@@ -16,6 -16,6 +16,8 @@@
   */
  package org.apache.accumulo.test.functional;
  
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++
  import java.io.IOException;
  import java.util.Collection;
  import java.util.Map;
@@@ -29,9 -28,8 +31,7 @@@ import org.apache.accumulo.core.data.Va
  import org.apache.accumulo.core.iterators.IteratorEnvironment;
  import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
  import org.apache.accumulo.core.iterators.WrappingIterator;
 -import org.apache.accumulo.core.util.UtilWaitThread;
  
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
  public class SlowIterator extends WrappingIterator {
  
    static private final String SLEEP_TIME = "sleepTime";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
index 7ab9f3f,0000000..f837ca3
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
@@@ -1,439 -1,0 +1,439 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static java.nio.charset.StandardCharsets.UTF_8;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++import java.util.ArrayList;
++import java.util.List;
++import java.util.Map;
++import java.util.concurrent.Callable;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeUnit;
++
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AdminUtil;
 +import org.apache.accumulo.fate.ZooStore;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 +import org.apache.hadoop.io.Text;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- 
- import static java.nio.charset.StandardCharsets.UTF_8;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertTrue;
- 
 +/**
 + * ACCUMULO-4574. Test to verify that changing table state to online / offline {@link org.apache.accumulo.core.client.admin.TableOperations#online(String)} when
 + * the table is already in that state returns without blocking.
 + */
 +public class TableChangeStateIT extends AccumuloClusterHarness {
 +
 +  private static final Logger log = LoggerFactory.getLogger(TableChangeStateIT.class);
 +
 +  private static final int NUM_ROWS = 1000;
 +  private static final long SLOW_SCAN_SLEEP_MS = 100L;
 +
 +  private Connector connector;
 +
 +  @Before
 +  public void setup() {
 +    connector = getConnector();
 +  }
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 4 * 60;
 +  }
 +
 +  /**
 +   * Validate that {@code TableOperations} online operation does not block when table is already online and fate transaction lock is held by other operations.
 +   * The test creates, populates a table and then runs a compaction with a slow iterator so that operation takes long enough to simulate the condition. After
 +   * the online operation while compaction is running completes, the test is complete and the compaction is canceled so that other tests can run.
 +   *
 +   * @throws Exception
 +   *           any exception is a test failure.
 +   */
 +  @Test
 +  public void changeTableStateTest() throws Exception {
 +
 +    ExecutorService pool = Executors.newCachedThreadPool();
 +
 +    String tableName = getUniqueNames(1)[0];
 +
 +    createData(tableName);
 +
 +    assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
 +
 +    OnLineCallable onlineOp = new OnLineCallable(tableName);
 +
 +    Future<OnlineOpTiming> task = pool.submit(onlineOp);
 +
 +    OnlineOpTiming timing1 = task.get();
 +
 +    log.trace("Online 1 in {} ms", TimeUnit.MILLISECONDS.convert(timing1.runningTime(), TimeUnit.NANOSECONDS));
 +
 +    assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
 +
 +    // verify that offline then online functions as expected.
 +
 +    connector.tableOperations().offline(tableName, true);
 +    assertEquals("verify table is offline", TableState.OFFLINE, getTableState(tableName));
 +
 +    onlineOp = new OnLineCallable(tableName);
 +
 +    task = pool.submit(onlineOp);
 +
 +    OnlineOpTiming timing2 = task.get();
 +
 +    log.trace("Online 2 in {} ms", TimeUnit.MILLISECONDS.convert(timing2.runningTime(), TimeUnit.NANOSECONDS));
 +
 +    assertEquals("verify table is back online", TableState.ONLINE, getTableState(tableName));
 +
 +    // launch a full table compaction with the slow iterator to ensure table lock is acquired and held by the compaction
 +
 +    Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
 +    assertTrue("verify that compaction running and fate transaction exists", blockUntilCompactionRunning(tableName));
 +
 +    // try to set online while fate transaction is in progress - before ACCUMULO-4574 this would block
 +
 +    onlineOp = new OnLineCallable(tableName);
 +
 +    task = pool.submit(onlineOp);
 +
 +    OnlineOpTiming timing3 = task.get();
 +
 +    assertTrue("online should take less time than expected compaction time",
 +        timing3.runningTime() < TimeUnit.NANOSECONDS.convert(NUM_ROWS * SLOW_SCAN_SLEEP_MS, TimeUnit.MILLISECONDS));
 +
 +    assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
 +
 +    assertTrue("verify compaction still running and fate transaction still exists", blockUntilCompactionRunning(tableName));
 +
 +    // test complete, cancel compaction and move on.
 +    connector.tableOperations().cancelCompaction(tableName);
 +
 +    log.debug("Success: Timing results for online commands.");
 +    log.debug("Time for unblocked online {} ms", TimeUnit.MILLISECONDS.convert(timing1.runningTime(), TimeUnit.NANOSECONDS));
 +    log.debug("Time for online when offline {} ms", TimeUnit.MILLISECONDS.convert(timing2.runningTime(), TimeUnit.NANOSECONDS));
 +    log.debug("Time for blocked online {} ms", TimeUnit.MILLISECONDS.convert(timing3.runningTime(), TimeUnit.NANOSECONDS));
 +
 +    // block if compaction still running
 +    compactTask.get();
 +
 +  }
 +
 +  /**
 +   * Blocks current thread until compaction is running.
 +   *
 +   * @return true if compaction and associate fate found.
 +   */
 +  private boolean blockUntilCompactionRunning(final String tableName) {
 +
 +    int runningCompactions = 0;
 +
 +    List<String> tservers = connector.instanceOperations().getTabletServers();
 +
 +    /*
 +     * wait for compaction to start - The compaction will acquire a fate transaction lock that used to block a subsequent online command while the fate
 +     * transaction lock was held.
 +     */
 +    while (runningCompactions == 0) {
 +
 +      try {
 +
 +        for (String tserver : tservers) {
 +          runningCompactions += connector.instanceOperations().getActiveCompactions(tserver).size();
 +          log.trace("tserver {}, running compactions {}", tservers, runningCompactions);
 +        }
 +
 +      } catch (AccumuloSecurityException | AccumuloException ex) {
 +        throw new IllegalStateException("failed to get active compactions, test fails.", ex);
 +      }
 +
 +      try {
 +        Thread.sleep(250);
 +      } catch (InterruptedException ex) {
 +        // reassert interrupt
 +        Thread.currentThread().interrupt();
 +      }
 +    }
 +
 +    // Validate that there is a compaction fate transaction - otherwise test is invalid.
 +    return findFate(tableName);
 +  }
 +
 +  /**
 +   * Checks fates in zookeeper looking for transaction associated with a compaction as a double check that the test will be valid because the running compaction
 +   * does have a fate transaction lock.
 +   *
 +   * @return true if corresponding fate transaction found, false otherwise
 +   */
 +  private boolean findFate(final String tableName) {
 +
 +    Instance instance = connector.getInstance();
 +    AdminUtil<String> admin = new AdminUtil<>(false);
 +
 +    try {
 +
 +      String tableId = Tables.getTableId(instance, tableName);
 +
 +      log.trace("tid: {}", tableId);
 +
 +      String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
 +      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
 +      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
 +      AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
 +
 +      for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
 +
 +        if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
 +          return true;
 +        }
 +      }
 +
 +    } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
 +      throw new IllegalStateException(ex);
 +    }
 +
 +    // did not find appropriate fate transaction for compaction.
 +    return Boolean.FALSE;
 +  }
 +
 +  /**
 +   * Returns the current table state (ONLINE, OFFLINE,...) of named table.
 +   *
 +   * @param tableName
 +   *          the table name
 +   * @return the current table state
 +   * @throws TableNotFoundException
 +   *           if table does not exist
 +   */
 +  private TableState getTableState(String tableName) throws TableNotFoundException {
 +
 +    String tableId = Tables.getTableId(connector.getInstance(), tableName);
 +
 +    TableState tstate = Tables.getTableState(connector.getInstance(), tableId);
 +
 +    log.trace("tableName: '{}': tableId {}, current state: {}", tableName, tableId, tstate);
 +
 +    return tstate;
 +  }
 +
 +  /**
 +   * Create the provided table and populate with some data using a batch writer. The table is scanned to ensure it was populated as expected.
 +   *
 +   * @param tableName
 +   *          the name of the table
 +   */
 +  private void createData(final String tableName) {
 +
 +    try {
 +
 +      // create table.
 +      connector.tableOperations().create(tableName);
 +      BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
 +
 +      // populate
 +      for (int i = 0; i < NUM_ROWS; i++) {
 +        Mutation m = new Mutation(new Text(String.format("%05d", i)));
 +        m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8)));
 +        bw.addMutation(m);
 +      }
 +      bw.close();
 +
 +      long startTimestamp = System.nanoTime();
 +
 +      Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +      int count = 0;
 +      for (Map.Entry<Key,Value> elt : scanner) {
 +        String expected = String.format("%05d", count);
 +        assert (elt.getKey().getRow().toString().equals(expected));
 +        count++;
 +      }
 +
 +      log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
 +
 +      scanner.close();
 +
 +      if (count != NUM_ROWS) {
 +        throw new IllegalStateException(String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
 +      }
 +    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException ex) {
 +      throw new IllegalStateException("Create data failed with exception", ex);
 +    }
 +  }
 +
 +  /**
 +   * Provides timing information for oline operation.
 +   */
 +  private static class OnlineOpTiming {
 +
 +    private long started = 0L;
 +    private long completed = 0L;
 +
 +    OnlineOpTiming() {
 +      started = System.nanoTime();
 +    }
 +
 +    /**
 +     * stop timing and set completion flag.
 +     */
 +    void setComplete() {
 +      completed = System.nanoTime();
 +    }
 +
 +    /**
 +     * @return running time in nanoseconds.
 +     */
 +    long runningTime() {
 +      return completed - started;
 +    }
 +  }
 +
 +  /**
 +   * Run online operation in a separate thread and gather timing information.
 +   */
 +  private class OnLineCallable implements Callable<OnlineOpTiming> {
 +
 +    final String tableName;
 +
 +    /**
 +     * Create an instance of this class to set the provided table online.
 +     *
 +     * @param tableName
 +     *          The table name that will be set online.
 +     */
 +    OnLineCallable(final String tableName) {
 +      this.tableName = tableName;
 +    }
 +
 +    @Override
 +    public OnlineOpTiming call() throws Exception {
 +
 +      OnlineOpTiming status = new OnlineOpTiming();
 +
 +      log.trace("Setting {} online", tableName);
 +
 +      connector.tableOperations().online(tableName, true);
 +      // stop timing
 +      status.setComplete();
 +
 +      log.trace("Online completed in {} ms", TimeUnit.MILLISECONDS.convert(status.runningTime(), TimeUnit.NANOSECONDS));
 +
 +      return status;
 +    }
 +  }
 +
 +  /**
 +   * Instance to create / run a compaction using a slow iterator.
 +   */
 +  private class SlowCompactionRunner implements Runnable {
 +
 +    private final String tableName;
 +
 +    /**
 +     * Create an instance of this class.
 +     *
 +     * @param tableName
 +     *          the name of the table that will be compacted with the slow iterator.
 +     */
 +    SlowCompactionRunner(final String tableName) {
 +      this.tableName = tableName;
 +    }
 +
 +    @Override
 +    public void run() {
 +
 +      long startTimestamp = System.nanoTime();
 +
 +      IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
 +      SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
 +
 +      List<IteratorSetting> compactIterators = new ArrayList<>();
 +      compactIterators.add(slow);
 +
 +      log.trace("Slow iterator {}", slow.toString());
 +
 +      try {
 +
 +        log.trace("Start compaction");
 +
 +        connector.tableOperations().compact(tableName, new Text("0"), new Text("z"), compactIterators, true, true);
 +
 +        log.trace("Compaction wait is complete");
 +
 +        log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
 +
 +        // validate that number of rows matches expected.
 +
 +        startTimestamp = System.nanoTime();
 +
 +        // validate expected data created and exists in table.
 +
 +        Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
 +
 +        int count = 0;
 +        for (Map.Entry<Key,Value> elt : scanner) {
 +          String expected = String.format("%05d", count);
 +          assert (elt.getKey().getRow().toString().equals(expected));
 +          count++;
 +        }
 +
 +        log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS,
 +            TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
 +
 +        if (count != NUM_ROWS) {
 +          throw new IllegalStateException(String.format("After compaction, number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
 +        }
 +
 +      } catch (TableNotFoundException ex) {
 +        throw new IllegalStateException("test failed, table " + tableName + " does not exist", ex);
 +      } catch (AccumuloSecurityException ex) {
 +        throw new IllegalStateException("test failed, could not add iterator due to security exception", ex);
 +      } catch (AccumuloException ex) {
 +        // test cancels compaction on complete, so ignore it as an exception.
 +        if (!ex.getMessage().contains("Compaction canceled")) {
 +          throw new IllegalStateException("test failed with an Accumulo exception", ex);
 +        }
 +      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
index d3036f7,0000000..b40569e
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
@@@ -1,110 -1,0 +1,110 @@@
- package org.apache.accumulo.test.functional;
- 
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
++package org.apache.accumulo.test.functional;
++
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.FileNotFoundException;
 +
 +import org.apache.accumulo.cluster.AccumuloCluster;
 +import org.apache.accumulo.core.cli.BatchWriterOpts;
 +import org.apache.accumulo.core.cli.ScannerOpts;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.admin.TableOperations;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.test.TestIngest;
 +import org.apache.accumulo.test.VerifyIngest;
 +import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.hamcrest.CoreMatchers;
 +import org.junit.Assume;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +import com.google.common.collect.Iterators;
 +
 +@Category(MiniClusterOnlyTests.class)
 +public class TableIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 2 * 60;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
 +
 +    AccumuloCluster cluster = getCluster();
 +    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
 +    String rootPath = mac.getConfig().getDir().getAbsolutePath();
 +
 +    Connector c = getConnector();
 +    TableOperations to = c.tableOperations();
 +    String tableName = getUniqueNames(1)[0];
 +    to.create(tableName);
 +
 +    TestIngest.Opts opts = new TestIngest.Opts();
 +    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
 +    ClientConfiguration clientConfig = getCluster().getClientConfig();
 +    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
 +      opts.updateKerberosCredentials(clientConfig);
 +      vopts.updateKerberosCredentials(clientConfig);
 +    } else {
 +      opts.setPrincipal(getAdminPrincipal());
 +      vopts.setPrincipal(getAdminPrincipal());
 +    }
 +
 +    opts.setTableName(tableName);
 +    TestIngest.ingest(c, opts, new BatchWriterOpts());
 +    to.flush(tableName, null, null, true);
 +    vopts.setTableName(tableName);
 +    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
 +    String id = to.tableIdMap().get(tableName);
 +    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    s.setRange(new KeyExtent(id, null, null).toMetadataRange());
 +    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
 +    assertTrue(Iterators.size(s.iterator()) > 0);
 +
 +    FileSystem fs = getCluster().getFileSystem();
 +    assertTrue(fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length > 0);
 +    to.delete(tableName);
 +    assertEquals(0, Iterators.size(s.iterator()));
 +    try {
 +      assertEquals(0, fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length);
 +    } catch (FileNotFoundException ex) {
 +      // that's fine, too
 +    }
 +    assertNull(to.tableIdMap().get(tableName));
 +    to.create(tableName);
 +    TestIngest.ingest(c, opts, new BatchWriterOpts());
 +    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
 +    to.delete(tableName);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
index d13c4af,0000000..8511d10
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TimeoutIT.java
@@@ -1,121 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.junit.Assert.fail;
 +
 +import java.util.Collections;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.junit.Test;
 +
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
- 
 +/**
 + *
 + */
 +public class TimeoutIT extends AccumuloClusterHarness {
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 75;
 +  }
 +
 +  @Test
 +  public void run() throws Exception {
 +    Connector conn = getConnector();
 +    String[] tableNames = getUniqueNames(2);
 +    testBatchWriterTimeout(conn, tableNames[0]);
 +    testBatchScannerTimeout(conn, tableNames[1]);
 +  }
 +
 +  public void testBatchWriterTimeout(Connector conn, String tableName) throws Exception {
 +    conn.tableOperations().create(tableName);
 +    conn.tableOperations().addConstraint(tableName, SlowConstraint.class.getName());
 +
 +    // give constraint time to propagate through zookeeper
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig().setTimeout(3, TimeUnit.SECONDS));
 +
 +    Mutation mut = new Mutation("r1");
 +    mut.put("cf1", "cq1", "v1");
 +
 +    bw.addMutation(mut);
 +    try {
 +      bw.close();
 +      fail("batch writer did not timeout");
 +    } catch (MutationsRejectedException mre) {
 +      if (mre.getCause() instanceof TimedOutException)
 +        return;
 +      throw mre;
 +    }
 +  }
 +
 +  public void testBatchScannerTimeout(Connector conn, String tableName) throws Exception {
 +    getConnector().tableOperations().create(tableName);
 +
 +    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
 +
 +    Mutation m = new Mutation("r1");
 +    m.put("cf1", "cq1", "v1");
 +    m.put("cf1", "cq2", "v2");
 +    m.put("cf1", "cq3", "v3");
 +    m.put("cf1", "cq4", "v4");
 +
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 2);
 +    bs.setRanges(Collections.singletonList(new Range()));
 +
 +    // should not timeout
 +    for (Entry<Key,Value> entry : bs) {
 +      entry.getKey();
 +    }
 +
 +    bs.setTimeout(5, TimeUnit.SECONDS);
 +    IteratorSetting iterSetting = new IteratorSetting(100, SlowIterator.class);
 +    iterSetting.addOption("sleepTime", 2000 + "");
 +    bs.addScanIterator(iterSetting);
 +
 +    try {
 +      for (Entry<Key,Value> entry : bs) {
 +        entry.getKey();
 +      }
 +      fail("batch scanner did not time out");
 +    } catch (TimedOutException toe) {
 +      // toe.printStackTrace();
 +    }
 +    bs.close();
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/018c7fe5/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 2f5ee81,0000000..c92e70f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@@ -1,268 -1,0 +1,268 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_DELAY;
 +import static org.apache.accumulo.core.conf.Property.GC_CYCLE_START;
 +import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_TIMEOUT;
 +import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE;
 +import static org.apache.accumulo.core.conf.Property.TSERV_WAL_REPLICATION;
 +import static org.apache.accumulo.core.security.Authorizations.EMPTY;
 +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
 +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.impl.KeyExtent;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.master.state.SetGoalState;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.log.WalStateManager;
 +import org.apache.accumulo.server.log.WalStateManager.WalState;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.RawLocalFileSystem;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import com.google.common.collect.Iterators;
- import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +
 +public class WALSunnyDayIT extends ConfigurableMacBase {
 +
 +  private static final Text CF = new Text(new byte[0]);
 +
 +  @Override
 +  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
 +    cfg.setProperty(GC_CYCLE_DELAY, "1s");
 +    cfg.setProperty(GC_CYCLE_START, "0s");
 +    cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M");
 +    cfg.setProperty(TSERV_WAL_REPLICATION, "1");
 +    cfg.setProperty(INSTANCE_ZK_TIMEOUT, "15s");
 +    cfg.setNumTservers(1);
 +    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
 +  }
 +
 +  int countTrue(Collection<Boolean> bools) {
 +    int result = 0;
 +    for (Boolean b : bools) {
 +      if (b.booleanValue())
 +        result++;
 +    }
 +    return result;
 +  }
 +
 +  @Test
 +  public void test() throws Exception {
 +    MiniAccumuloClusterImpl mac = getCluster();
 +    MiniAccumuloClusterControl control = mac.getClusterControl();
 +    control.stop(GARBAGE_COLLECTOR);
 +    Connector c = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +    c.tableOperations().create(tableName);
 +    writeSomeData(c, tableName, 1, 1);
 +
 +    // wal markers are added lazily
 +    Map<String,Boolean> wals = getWALsAndAssertCount(c, 2);
 +    for (Boolean b : wals.values()) {
 +      assertTrue("logs should be in use", b.booleanValue());
 +    }
 +
 +    // roll log, get a new next
 +    writeSomeData(c, tableName, 1001, 50);
 +    Map<String,Boolean> walsAfterRoll = getWALsAndAssertCount(c, 3);
 +    assertTrue("new WALs should be a superset of the old WALs", walsAfterRoll.keySet().containsAll(wals.keySet()));
 +    assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values()));
 +
 +    // flush the tables
 +    for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
 +      c.tableOperations().flush(table, null, null, true);
 +    }
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    // rolled WAL is no longer in use, but needs to be GC'd
 +    Map<String,Boolean> walsAfterflush = getWALsAndAssertCount(c, 3);
 +    assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
 +
 +    // let the GC run for a little bit
 +    control.start(GARBAGE_COLLECTOR);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    // make sure the unused WAL goes away
 +    getWALsAndAssertCount(c, 2);
 +    control.stop(GARBAGE_COLLECTOR);
 +    // restart the tserver, but don't run recovery on all tablets
 +    control.stop(TABLET_SERVER);
 +    // this delays recovery on the normal tables
 +    assertEquals(0, cluster.exec(SetGoalState.class, "SAFE_MODE").waitFor());
 +    control.start(TABLET_SERVER);
 +
 +    // wait for the metadata table to go back online
 +    getRecoveryMarkers(c);
 +    // allow a little time for the master to notice ASSIGNED_TO_DEAD_SERVER tablets
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
 +    // log.debug("markers " + markers);
 +    assertEquals("one tablet should have markers", 1, markers.keySet().size());
 +    assertEquals("tableId of the keyExtent should be 1", "1", markers.keySet().iterator().next().getTableId());
 +
 +    // put some data in the WAL
 +    assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
 +    verifySomeData(c, tableName, 1001 * 50 + 1);
 +    writeSomeData(c, tableName, 100, 100);
 +
 +    Map<String,Boolean> walsAfterRestart = getWALsAndAssertCount(c, 4);
 +    // log.debug("wals after " + walsAfterRestart);
 +    assertEquals("used WALs after restart should be 4", 4, countTrue(walsAfterRestart.values()));
 +    control.start(GARBAGE_COLLECTOR);
 +    sleepUninterruptibly(5, TimeUnit.SECONDS);
 +    Map<String,Boolean> walsAfterRestartAndGC = getWALsAndAssertCount(c, 2);
 +    assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
 +  }
 +
 +  private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
 +    Scanner scan = c.createScanner(tableName, EMPTY);
 +    int result = Iterators.size(scan.iterator());
 +    scan.close();
 +    Assert.assertEquals(expected, result);
 +  }
 +
 +  private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
 +    Random rand = new Random();
 +    BatchWriter bw = conn.createBatchWriter(tableName, null);
 +    byte[] rowData = new byte[10];
 +    byte[] cq = new byte[10];
 +    byte[] value = new byte[10];
 +
 +    for (int r = 0; r < row; r++) {
 +      rand.nextBytes(rowData);
 +      Mutation m = new Mutation(rowData);
 +      for (int c = 0; c < col; c++) {
 +        rand.nextBytes(cq);
 +        rand.nextBytes(value);
 +        m.put(CF, new Text(cq), new Value(value));
 +      }
 +      bw.addMutation(m);
 +      if (r % 100 == 0) {
 +        bw.flush();
 +      }
 +    }
 +    bw.close();
 +  }
 +
 +  private Map<KeyExtent,List<String>> getRecoveryMarkers(Connector c) throws Exception {
 +    Map<KeyExtent,List<String>> result = new HashMap<>();
 +    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
 +    root.setRange(TabletsSection.getRange());
 +    root.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
 +    TabletColumnFamily.PREV_ROW_COLUMN.fetch(root);
 +
 +    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
 +    meta.setRange(TabletsSection.getRange());
 +    meta.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
 +    TabletColumnFamily.PREV_ROW_COLUMN.fetch(meta);
 +
 +    List<String> logs = new ArrayList<>();
 +    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
 +    while (both.hasNext()) {
 +      Entry<Key,Value> entry = both.next();
 +      Key key = entry.getKey();
 +      if (key.getColumnFamily().equals(TabletsSection.LogColumnFamily.NAME)) {
 +        logs.add(key.getColumnQualifier().toString());
 +      }
 +      if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && !logs.isEmpty()) {
 +        KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
 +        result.put(extent, logs);
 +        logs = new ArrayList<>();
 +      }
 +    }
 +    return result;
 +  }
 +
 +  private final int TIMES_TO_COUNT = 20;
 +  private final int PAUSE_BETWEEN_COUNTS = 100;
 +
 +  private Map<String,Boolean> getWALsAndAssertCount(Connector c, int expectedCount) throws Exception {
 +    // see https://issues.apache.org/jira/browse/ACCUMULO-4110. Sometimes this test counts the logs before
 +    // the new standby log is actually ready. So let's try a few times before failing, returning the last
 +    // wals variable with the the correct count.
 +    Map<String,Boolean> wals = _getWals(c);
 +    if (wals.size() == expectedCount) {
 +      return wals;
 +    }
 +
 +    int waitLonger = getWaitFactor();
 +    for (int i = 1; i <= TIMES_TO_COUNT; i++) {
 +      Thread.sleep(i * PAUSE_BETWEEN_COUNTS * waitLonger);
 +      wals = _getWals(c);
 +      if (wals.size() == expectedCount) {
 +        return wals;
 +      }
 +    }
 +
 +    fail("Unable to get the correct number of WALs, expected " + expectedCount + " but got " + wals.toString());
 +    return new HashMap<>();
 +  }
 +
 +  private int getWaitFactor() {
 +    int waitLonger = 1;
 +    String timeoutString = System.getProperty("timeout.factor");
 +    if (timeoutString != null && !timeoutString.isEmpty()) {
 +      int timeout = Integer.parseInt(timeoutString);
 +      if (timeout > 1) {
 +        waitLonger = timeout;
 +      }
 +    }
 +    return waitLonger;
 +  }
 +
 +  private Map<String,Boolean> _getWals(Connector c) throws Exception {
 +    Map<String,Boolean> result = new HashMap<>();
 +    Instance i = c.getInstance();
 +    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
 +    WalStateManager wals = new WalStateManager(c.getInstance(), zk);
 +    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
 +      // WALs are in use if they are not unreferenced
 +      result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
 +    }
 +    return result;
 +  }
 +
 +}


Mime
View raw message