From commits-return-23535-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu Nov 7 22:30:41 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3ADF7180630 for ; Thu, 7 Nov 2019 23:30:41 +0100 (CET) Received: (qmail 52015 invoked by uid 500); 7 Nov 2019 22:30:40 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 52005 invoked by uid 99); 7 Nov 2019 22:30:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Nov 2019 22:30:40 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7137F851DB; Thu, 7 Nov 2019 22:30:40 +0000 (UTC) Date: Thu, 07 Nov 2019 22:30:40 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch 1.9 updated: Fix #1308 - Refactor fate concurrency IT (#1414) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157316584033.20220.9162464810875360023@gitbox.apache.org> From: ctubbsii@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/1.9 X-Git-Reftype: branch X-Git-Oldrev: f3b3590a5b4f8adf80f0acffe768a8509d409c51 X-Git-Newrev: 2b8243b72eace4e1fcba0c77077981a84401b8ea X-Git-Rev: 2b8243b72eace4e1fcba0c77077981a84401b8ea X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/1.9 by this push: new 2b8243b Fix #1308 - Refactor fate concurrency IT (#1414) 2b8243b is described below commit 2b8243b72eace4e1fcba0c77077981a84401b8ea Author: EdColeman AuthorDate: Thu Nov 7 17:30:32 2019 -0500 Fix #1308 - Refactor fate concurrency IT (#1414) fix error that caused test failure. --- .../test/functional/FateConcurrencyIT.java | 316 +++++-------------- .../org/apache/accumulo/test/util/SlowOps.java | 347 +++++++++++++++++++++ 2 files changed, 419 insertions(+), 244 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 863e3c4..c3a4d79 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.test.functional; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -24,9 +23,7 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -35,28 +32,19 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.state.tables.TableState; -import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.AdminUtil; import org.apache.accumulo.fate.ZooStore; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; -import org.apache.hadoop.io.Text; +import org.apache.accumulo.test.util.SlowOps; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Before; @@ -90,10 +78,9 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { private String secret; - // Test development only. When true, multiple tables, multiple compactions will be - // used during the test run which simulates transient condition that was causing - // the test to fail.. - private boolean runMultipleCompactions = false; + private long maxWait; + + private SlowOps slowOps; @Before public void setup() { @@ -104,7 +91,9 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET); - createData(tableName); + maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2); + + slowOps = new SlowOps(connector, tableName, maxWait, 1); } @AfterClass @@ -161,8 +150,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { // launch a full table compaction with the slow iterator to ensure table lock is acquired and // held by the compaction - - Future compactTask = startCompactTask(); + slowOps.startCompactTask(); // try to set online while fate transaction is in progress - before ACCUMULO-4574 this would // block @@ -178,8 +166,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName)); - assertTrue("verify compaction still running and fate transaction still exists", - blockUntilCompactionRunning(tableName)); + assertTrue("Find FATE operation for table", findFate(tableName)); // test complete, cancel compaction and move on. connector.tableOperations().cancelCompaction(tableName); @@ -193,8 +180,31 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { TimeUnit.MILLISECONDS.convert(timing3.runningTime(), TimeUnit.NANOSECONDS)); // block if compaction still running - compactTask.get(); + slowOps.blockWhileCompactionRunning(); + + } + + private boolean findFate(String aTableName) { + for (int retry = 0; retry < 5; retry++) { + + try { + boolean found = lookupFateInZookeeper(aTableName); + log.trace("Try {}: Fate in zk for table {} : {}", retry, aTableName, found); + if (found) { + log.trace("found for {}", aTableName); + return true; + } else { + Thread.sleep(150); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } catch (Exception ex) { + log.debug("Find fate failed for table name {} with exception, will retry", aTableName, ex); + } + } + return false; } /** @@ -208,12 +218,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { Instance instance = connector.getInstance(); String tableId; - // for development testing - force transient condition that was failing this test so that - // we know if multiple compactions are running, they are properly handled by the test code. - if (runMultipleCompactions) { - runMultipleCompactions(); - } - try { assertEquals("verify table online after created", TableState.ONLINE, @@ -228,7 +232,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { String.format("Table %s does not exist, failing test", tableName)); } - Future compactTask = startCompactTask(); + slowOps.startCompactTask(); AdminUtil.FateStatus withLocks = null; List noLocks = null; @@ -300,114 +304,19 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { connector.tableOperations().cancelCompaction(tableName); // block if compaction still running - compactTask.get(); - - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException - | ExecutionException ex) { - log.debug("Could not cancel compaction", ex); - } - } - - /** - * This method was helpful for debugging a condition that was causing transient test failures. - * This forces a condition that the test should be able to handle. This method is not needed - * during normal testing, it was kept to aid future test development / troubleshooting if other - * transient failures occur. - */ - private void runMultipleCompactions() { - - for (int i = 0; i < 4; i++) { - - String aTableName = getUniqueNames(1)[0] + "_" + i; - - createData(aTableName); - - log.debug("Table: {}", aTableName); - - pool.submit(new SlowCompactionRunner(aTableName)); - - assertTrue("verify that compaction running and fate transaction exists", - blockUntilCompactionRunning(aTableName)); + boolean cancelled = slowOps.blockWhileCompactionRunning(); + log.debug("Cancel completed successfully: {}", cancelled); + } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException ex) { + log.debug("Could not cancel compaction due to exception", ex); } } /** - * Create and run a slow running compaction task. The method will block until the compaction has - * been started. - * - * @return a reference to the running compaction task. - */ - private Future startCompactTask() { - Future compactTask = pool.submit(new SlowCompactionRunner(tableName)); - assertTrue("verify that compaction running and fate transaction exists", - blockUntilCompactionRunning(tableName)); - return compactTask; - } - - /** - * Blocks current thread until compaction is running. - * - * @return true if compaction and associate fate found. - */ - private boolean blockUntilCompactionRunning(final String tableName) { - - long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2); - - long startWait = System.currentTimeMillis(); - - List tservers = connector.instanceOperations().getTabletServers(); - - /* - * wait for compaction to start on table - The compaction will acquire a fate transaction lock - * that used to block a subsequent online command while the fate transaction lock was held. - */ - while (System.currentTimeMillis() < (startWait + maxWait)) { - - try { - - int runningCompactions = 0; - - for (String tserver : tservers) { - runningCompactions += connector.instanceOperations().getActiveCompactions(tserver).size(); - log.trace("tserver {}, running compactions {}", tservers, runningCompactions); - } - - if (runningCompactions > 0) { - // Validate that there is a compaction fate transaction - otherwise test is invalid. - if (findFate(tableName)) { - return true; - } - } - - } catch (AccumuloSecurityException | AccumuloException ex) { - throw new IllegalStateException("failed to get active compactions, test fails.", ex); - } catch (KeeperException ex) { - log.trace("Saw possible transient zookeeper error"); - } - - try { - Thread.sleep(250); - } catch (InterruptedException ex) { - // reassert interrupt - Thread.currentThread().interrupt(); - } - } - - log.debug("Could not find compaction for {} after {} seconds", tableName, - TimeUnit.MILLISECONDS.toSeconds(maxWait)); - - return false; - - } - - /** * Checks fates in zookeeper looking for transaction associated with a compaction as a double * check that the test will be valid because the running compaction does have a fate transaction * lock. - * + *

* This method throws can throw either IllegalStateException (failed) or a Zookeeper exception. * Throwing the Zookeeper exception allows for retries if desired to handle transient zookeeper * issues. @@ -418,7 +327,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { * @throws KeeperException * if a zookeeper error occurred - allows for retries. */ - private boolean findFate(final String tableName) throws KeeperException { + private boolean lookupFateInZookeeper(final String tableName) throws KeeperException { Instance instance = connector.getInstance(); AdminUtil admin = new AdminUtil<>(false); @@ -497,61 +406,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { } /** - * Create the provided table and populate with some data using a batch writer. The table is - * scanned to ensure it was populated as expected. - * - * @param tableName - * the name of the table - */ - private void createData(final String tableName) { - - try { - - // create table. - connector.tableOperations().create(tableName); - BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); - - // populate - for (int i = 0; i < NUM_ROWS; i++) { - Mutation m = new Mutation(new Text(String.format("%05d", i))); - m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8))); - bw.addMutation(m); - } - bw.close(); - - long startTimestamp = System.nanoTime(); - - int count = scanCount(tableName); - - log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS - .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS)); - - if (count != NUM_ROWS) { - throw new IllegalStateException( - String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS)); - } - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException - | TableExistsException ex) { - throw new IllegalStateException("Create data failed with exception", ex); - } - } - - private int scanCount(String tableName) throws TableNotFoundException { - - Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); - int count = 0; - for (Map.Entry elt : scanner) { - String expected = String.format("%05d", count); - assert (elt.getKey().getRow().toString().equals(expected)); - count++; - } - - scanner.close(); - - return count; - } - - /** * Provides timing information for online operation. */ private static class OnlineOpTiming { @@ -614,74 +468,48 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { } /** - * Instance to create / run a compaction using a slow iterator. + * Concurrency testing - ensure that tests are valid id multiple compactions are running. for + * development testing - force transient condition that was failing this test so that we know if + * multiple compactions are running, they are properly handled by the test code and the tests are + * valid. */ - private class SlowCompactionRunner implements Runnable { - - private final String tableName; + @Test + public void multipleCompactions() { - /** - * Create an instance of this class. - * - * @param tableName - * the name of the table that will be compacted with the slow iterator. - */ - SlowCompactionRunner(final String tableName) { - this.tableName = tableName; - } + int tableCount = 4; - @Override - public void run() { + List tables = new ArrayList<>(); - long startTimestamp = System.nanoTime(); + for (int i = 0; i < tableCount; i++) { + String uniqueName = getUniqueNames(1)[0] + "_" + i; + SlowOps gen = new SlowOps(connector, uniqueName, maxWait, tableCount); + tables.add(gen); + gen.startCompactTask(); + } - IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class); - SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS); + int foundCount = 0; - List compactIterators = new ArrayList<>(); - compactIterators.add(slow); + for (SlowOps t : tables) { + log.debug("Look for fate {}", t.getTableName()); + if (findFate(t.getTableName())) { + log.debug("Found fate {}", t.getTableName()); + foundCount++; + } + } - log.trace("Slow iterator {}", slow.toString()); + assertEquals(tableCount, foundCount); + for (SlowOps t : tables) { try { - - log.trace("Start compaction"); - - connector.tableOperations().compact(tableName, new Text("0"), new Text("z"), - compactIterators, true, true); - - log.trace("Compaction wait is complete"); - - log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, TimeUnit.MILLISECONDS - .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS)); - - // validate that number of rows matches expected. - - startTimestamp = System.nanoTime(); - - // validate expected data created and exists in table. - - int count = scanCount(tableName); - - log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS - .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS)); - - if (count != NUM_ROWS) { - throw new IllegalStateException( - String.format("After compaction, number of rows %1$d does not match expected %2$d", - count, NUM_ROWS)); - } - - } catch (TableNotFoundException ex) { - throw new IllegalStateException("test failed, table " + tableName + " does not exist", ex); - } catch (AccumuloSecurityException ex) { - throw new IllegalStateException( - "test failed, could not add iterator due to security exception", ex); - } catch (AccumuloException ex) { - // test cancels compaction on complete, so ignore it as an exception. - if (!ex.getMessage().contains("Compaction canceled")) { - throw new IllegalStateException("test failed with an Accumulo exception", ex); + connector.tableOperations().cancelCompaction(t.getTableName()); + // block if compaction still running + boolean cancelled = t.blockWhileCompactionRunning(); + if (!cancelled) { + log.info("Failed to cancel compaction during multiple compaction test clean-up for {}", + t.getTableName()); } + } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException ex) { + log.debug("Exception throw during multiple table test clean-up", ex); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java new file mode 100644 index 0000000..bd51990 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Common methods for performing operations that are deliberately take some period of time so that + * tests can interact while the operations are in progress. + */ +public class SlowOps { + + private static final Logger log = LoggerFactory.getLogger(SlowOps.class); + + private static final String TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX = + "tserver.compaction.major.concurrent.max"; + + private static final long SLOW_SCAN_SLEEP_MS = 250L; + private static final int NUM_DATA_ROWS = 1000; + + private final Connector connector; + private final String tableName; + private final long maxWait; + + // private final int numRows = DEFAULT_NUM_DATA_ROWS; + + private static final ExecutorService pool = Executors.newCachedThreadPool(); + + private Future compactTask = null; + + private SlowOps(final Connector connector, final String tableName, final long maxWait) { + + this.connector = connector; + this.tableName = tableName; + this.maxWait = maxWait; + + createData(); + } + + public SlowOps(final Connector connector, final String tableName, final long maxWait, + final int numParallelExpected) { + + this(connector, tableName, maxWait); + + setExpectedCompactions(numParallelExpected); + + } + + public void setExpectedCompactions(final int numParallelExpected) { + + final int target = numParallelExpected + 1; + + Map sysConfig; + + try { + + sysConfig = connector.instanceOperations().getSystemConfiguration(); + + int current = Integer.parseInt(sysConfig.get("tserver.compaction.major.concurrent.max")); + + if (current < target) { + connector.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX, + Integer.toString(target)); + + sysConfig = connector.instanceOperations().getSystemConfiguration(); + + } + + Integer.parseInt(sysConfig.get(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX)); + + } catch (AccumuloException | AccumuloSecurityException | NumberFormatException ex) { + throw new IllegalStateException("Could not set parallel compaction limit to " + target, ex); + } + } + + public String getTableName() { + return tableName; + } + + private void createData() { + + try { + + // create table. + connector.tableOperations().create(tableName); + + log.info("Created table id: {}, name \'{}\'", + connector.tableOperations().tableIdMap().get(tableName), tableName); + + try (BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig())) { + // populate + for (int i = 0; i < NUM_DATA_ROWS; i++) { + Mutation m = new Mutation(new Text(String.format("%05d", i))); + m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"), + new Value("junk".getBytes(UTF_8))); + bw.addMutation(m); + } + } + + verifyRows(); + + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException + | TableExistsException ex) { + throw new IllegalStateException("Create data failed with exception", ex); + } + } + + private void verifyRows() { + + long startTimestamp = System.nanoTime(); + + int count = scanCount(); + + log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS, + TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS)); + + if (count != NUM_DATA_ROWS) { + throw new IllegalStateException( + String.format("Number of rows %1$d does not match expected %2$d", count, NUM_DATA_ROWS)); + } + } + + private int scanCount() { + try (Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY)) { + + int count = 0; + + for (Map.Entry elt : scanner) { + String expected = String.format("%05d", count); + assert (elt.getKey().getRow().toString().equals(expected)); + count++; + } + return count; + } catch (TableNotFoundException ex) { + log.debug("cannot verify row count, table \'{}\' does not exist", tableName); + throw new IllegalStateException(ex); + } + } + + /** + * Create and run a slow running compaction task. The method will block until the compaction has + * been started. The compaction should be cancelled using Accumulo tableOps, and then the caller + * can use blockWhileCompactionRunning() on the instance of this class. + */ + public void startCompactTask() { + + compactTask = pool.submit(new SlowCompactionRunner()); + + if (!blockUntilCompactionRunning()) { + throw new IllegalStateException("Compaction could not be started for " + tableName); + } + } + + /** + * Instance to create / run a compaction using a slow iterator. + */ + private class SlowCompactionRunner implements Runnable { + + SlowCompactionRunner() {} + + @Override + public void run() { + + long startTimestamp = System.nanoTime(); + + IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class); + SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS); + + List compactIterators = new ArrayList<>(); + compactIterators.add(slow); + + log.trace("Starting slow operation using iterator: {}", slow); + + int retry = 0; + boolean completed = false; + + while (!completed && retry++ < 5) { + + try { + log.info("Starting compaction. Attempt {}", retry); + connector.tableOperations().compact(tableName, null, null, compactIterators, true, true); + completed = true; + } catch (Throwable ex) { + // test cancels compaction on complete, so ignore it as an exception. + if (ex.getMessage().contains("Compaction canceled")) { + return; + } + log.info("Exception thrown while waiting for compaction - will retry", ex); + try { + Thread.sleep(10_000 * retry); + } catch (InterruptedException iex) { + Thread.currentThread().interrupt(); + return; + } + } + } + log.debug("Compaction wait is complete"); + + log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS, TimeUnit.MILLISECONDS + .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS)); + + // validate that number of rows matches expected. + + startTimestamp = System.nanoTime(); + + // validate expected data created and exists in table. + + int count = scanCount(); + + log.trace("After compaction, scan time for {} rows {} ms", NUM_DATA_ROWS, + TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), + TimeUnit.NANOSECONDS)); + + if (count != NUM_DATA_ROWS) { + throw new IllegalStateException( + String.format("After compaction, number of rows %1$d does not match expected %2$d", + count, NUM_DATA_ROWS)); + } + } + } + + /** + * Blocks current thread until compaction is running. + * + * @return true if compaction and associate fate found. + */ + private boolean blockUntilCompactionRunning() { + + long startWait = System.currentTimeMillis(); + + List tservers = connector.instanceOperations().getTabletServers(); + + /* + * wait for compaction to start on table - The compaction will acquire a fate transaction lock + * that used to block a subsequent online command while the fate transaction lock was held. + */ + while (System.currentTimeMillis() < (startWait + maxWait)) { + + try { + + List activeCompactions = new ArrayList<>(); + + for (String tserver : tservers) { + List ac = connector.instanceOperations().getActiveCompactions(tserver); + activeCompactions.addAll(ac); + // runningCompactions += ac.size(); + log.trace("tserver {}, running compactions {}", tservers, ac.size()); + } + + if (!activeCompactions.isEmpty()) { + try { + for (ActiveCompaction compaction : activeCompactions) { + log.debug("Compaction running for {}", compaction.getTable()); + if (compaction.getTable().compareTo(tableName) == 0) { + return true; + } + } + } catch (TableNotFoundException ex) { + log.trace("Compaction found for unknown table {}", activeCompactions); + } + } + } catch (AccumuloSecurityException | AccumuloException ex) { + throw new IllegalStateException("failed to get active compactions, test fails.", ex); + } + + try { + Thread.sleep(3_000); + } catch (InterruptedException ex) { + // reassert interrupt + Thread.currentThread().interrupt(); + } + } + + log.debug("Could not find compaction for {} after {} seconds", tableName, + TimeUnit.MILLISECONDS.toSeconds(maxWait)); + + return false; + + } + + /** + * Will block as long as the underlying compaction task is running. This method is intended to be + * used when the the compaction is cancelled via table operation cancel method - when the cancel + * command completed, the running task will terminate and then this method will return. + * + * @return true if the task returned. + */ + public boolean blockWhileCompactionRunning() { + + try { + if (compactTask == null) { + throw new IllegalStateException( + "Compaction task has not been started - call startCompactionTask() before blocking"); + } + compactTask.get(); + return true; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return false; + } catch (ExecutionException ex) { + return false; + } + } + +}