Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B3D2200C0C for ; Mon, 30 Jan 2017 21:25:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 09C91160B35; Mon, 30 Jan 2017 20:25:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3A6F9160B60 for ; Mon, 30 Jan 2017 21:25:18 +0100 (CET) Received: (qmail 17057 invoked by uid 500); 30 Jan 2017 20:25:17 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 17048 invoked by uid 99); 30 Jan 2017 20:25:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2017 20:25:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F19DDFF0F; Mon, 30 Jan 2017 20:25:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Mon, 30 Jan 2017 20:25:19 -0000 Message-Id: <56d0ade66b9241beb73a0f561f67aff6@git.apache.org> In-Reply-To: <72c9dbfd21644746b5488f643eac9f5e@git.apache.org> References: <72c9dbfd21644746b5488f643eac9f5e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] accumulo git commit: Merge branch '1.7' into 1.8 archived-at: Mon, 30 Jan 2017 20:25:20 -0000 Merge branch '1.7' into 1.8 Conflicts: test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1f31ca6c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1f31ca6c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1f31ca6c Branch: refs/heads/1.8 Commit: 1f31ca6c24d33458c7582f9cf0257e9f58508007 Parents: bf5b6e0 6d8a5fa Author: Keith Turner Authored: Mon Jan 30 15:09:36 2017 -0500 Committer: Keith Turner Committed: Mon Jan 30 15:09:36 2017 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/fate/AdminUtil.java | 72 ++++++++++++++++---- .../master/tableOps/CancelCompactions.java | 2 +- .../master/tableOps/FinishCancelCompaction.java | 12 +++- .../apache/accumulo/test/TableOperationsIT.java | 7 ++ .../accumulo/test/UserCompactionStrategyIT.java | 6 ++ .../functional/ConcurrentDeleteTableIT.java | 34 +-------- .../test/functional/FateStarvationIT.java | 2 + .../test/functional/FunctionalTestUtils.java | 30 ++++++++ .../accumulo/test/functional/RenameIT.java | 2 + 9 files changed, 118 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java index d1a52fb,0000000..a83b0e2 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java @@@ -1,375 -1,0 +1,382 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.DiskUsage; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.functional.BadIterator; ++import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TException; ++import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class TableOperationsIT extends AccumuloClusterHarness { + + static TabletClientService.Client client; + + private Connector connector; + + @Override + public int defaultTimeoutSeconds() { + return 30; + } + + @Before + public void setup() throws Exception { + connector = getConnector(); + } + ++ @After ++ public void checkForDanglingFateLocks() { ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); ++ } ++ + @Test + public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + List diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsage.size()); + assertEquals(0, (long) diskUsage.get(0).getUsage()); + assertEquals(tableName, diskUsage.get(0).getTables().iterator().next()); + + connector.securityOperations().revokeTablePermission(getAdminPrincipal(), tableName, TablePermission.READ); + try { + connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + fail("Should throw securityexception"); + } catch (AccumuloSecurityException e) {} + + connector.tableOperations().delete(tableName); + try { + connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + fail("Should throw tablenotfound"); + } catch (TableNotFoundException e) {} + } + + @Test + public void getDiskUsage() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException { + final String[] names = getUniqueNames(2); + String tableName = names[0]; + connector.tableOperations().create(tableName); + + // verify 0 disk usage + List diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertEquals(Long.valueOf(0), diskUsages.get(0).getUsage()); + assertEquals(tableName, diskUsages.get(0).getTables().first()); + + // add some data + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", new Value("abcde".getBytes())); + bw.addMutation(m); + bw.flush(); + bw.close(); + + connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true); + + // verify we have usage + diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + assertEquals(tableName, diskUsages.get(0).getTables().first()); + + String newTable = names[1]; + + // clone table + connector.tableOperations().clone(tableName, newTable, false, null, null); + + // verify tables are exactly the same + Set tables = new HashSet<>(); + tables.add(tableName); + tables.add(newTable); + diskUsages = connector.tableOperations().getDiskUsage(tables); + assertEquals(1, diskUsages.size()); + assertEquals(2, diskUsages.get(0).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + + connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true); + connector.tableOperations().compact(newTable, new Text("A"), new Text("z"), true, true); + + // verify tables have differences + diskUsages = connector.tableOperations().getDiskUsage(tables); + assertEquals(2, diskUsages.size()); + assertEquals(1, diskUsages.get(0).getTables().size()); + assertEquals(1, diskUsages.get(1).getTables().size()); + assertTrue(diskUsages.get(0).getUsage() > 0); + assertTrue(diskUsages.get(1).getUsage() > 0); + + connector.tableOperations().delete(tableName); + } + + @Test + public void createTable() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Iterable> itrProps = connector.tableOperations().getProperties(tableName); + Map props = propsToMap(itrProps); + assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1")); + connector.tableOperations().delete(tableName); + } + + @Test + public void createMergeClonedTable() throws Exception { + String[] names = getUniqueNames(2); + String originalTable = names[0]; + TableOperations tops = connector.tableOperations(); + + TreeSet splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d"))); + + tops.create(originalTable); + tops.addSplits(originalTable, splits); + + BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig()); + for (Text row : splits) { + Mutation m = new Mutation(row); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j)); + } + } + + bw.addMutation(m); + } + + bw.close(); + + String clonedTable = names[1]; + + tops.clone(originalTable, clonedTable, true, null, null); + tops.merge(clonedTable, null, new Text("b")); + + Map rowCounts = new HashMap<>(); + Scanner s = connector.createScanner(clonedTable, new Authorizations()); + for (Entry entry : s) { + final Key key = entry.getKey(); + String row = key.getRow().toString(); + String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString(); + String value = entry.getValue().toString(); + + if (rowCounts.containsKey(row)) { + rowCounts.put(row, rowCounts.get(row) + 1); + } else { + rowCounts.put(row, 1); + } + + Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value)); + } + + Collection clonedSplits = tops.listSplits(clonedTable); + Set expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d")); + for (Text clonedSplit : clonedSplits) { + Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit)); + } + + Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty()); + } + + private Map propsToMap(Iterable> props) { + Map map = new HashMap<>(); + for (Map.Entry prop : props) { + map.put(prop.getKey(), prop.getValue()); + } + return map; + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + /** Compare only the row, column family and column qualifier. */ + static class KeyRowColFColQComparator implements Comparator { + @Override + public int compare(Key k1, Key k2) { + return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL); + } + } + + static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator(); + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertEquals(HardListIterator.allEntriesToInject, actual); + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet splitset = new TreeSet<>(); + splitset.add(new Text("f")); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + connector.tableOperations().cancelCompaction(tableName); + // depending on timing, compaction will finish or be canceled + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + switch (actual.size()) { + case 3: + // Compaction cancel didn't happen in time + assertTrue(HardListIterator.allEntriesToInject.equals(actual)); + break; + case 2: + // Compacted the first tablet (-inf, f) + assertEquals(HardListIterator.allEntriesToInject.headMap(new Key("f")), actual); + break; + case 1: + // Compacted the second tablet [f, +inf) + assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key("f")), actual); + break; + case 0: + // Cancelled the compaction before it ran. No generated entries. + break; + default: + Assert.fail("Unexpected number of entries"); + break; + } + connector.tableOperations().delete(tableName); + } + + @Test + public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + Text splitRow = new Text("f"); + SortedSet splitset = new TreeSet<>(); + splitset.add(splitRow); + connector.tableOperations().addSplits(tableName, splitset); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, HardListIterator.class)); + // compact the second tablet, not the first + connector.tableOperations().compact(tableName, splitRow, null, list, true, true); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + // only expect the entries in the second tablet + assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual); + connector.tableOperations().delete(tableName); + } + + /** Test recovery from bad majc iterator via compaction cancel. */ + @Test + public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + + List list = new ArrayList<>(); + list.add(new IteratorSetting(15, BadIterator.class)); + connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block + sleepUninterruptibly(2, TimeUnit.SECONDS); // start compaction + connector.tableOperations().cancelCompaction(tableName); + + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + Map actual = new TreeMap<>(); + for (Map.Entry entry : scanner) + actual.put(entry.getKey(), entry.getValue()); + assertTrue("Should be empty. Actual is " + actual, actual.isEmpty()); + connector.tableOperations().delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java index 4451987,0000000..ddf8ad7 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java @@@ -1,308 -1,0 +1,314 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.functional.ConfigurableCompactionIT; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; ++import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class UserCompactionStrategyIT extends AccumuloClusterHarness { + + @Override + public int defaultTimeoutSeconds() { + return 3 * 60; + } + ++ @After ++ public void checkForDanglingFateLocks() { ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); ++ } ++ + @Test + public void testDropA() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + // create a file that starts with A containing rows 'a' and 'b' + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + writeFlush(c, tableName, "c"); + writeFlush(c, tableName, "d"); + + // drop files that start with A + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("dropPrefix", "A", "inputPrefix", "F")); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName)); + + // this compaction should not drop files starting with A + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName)); + } + + private void testDropNone(Map options) throws Exception { + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(options); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(ImmutableSet.of("a", "b"), getRows(c, tableName)); + } + + @Test + public void testDropNone() throws Exception { + // test a compaction strategy that selects no files. In this case there is no work to do, want to ensure it does not hang. + + testDropNone(ImmutableMap.of("inputPrefix", "Z")); + } + + @Test + public void testDropNone2() throws Exception { + // test a compaction strategy that selects no files. This differs testDropNone() in that shouldCompact() will return true and getCompactionPlan() will + // return no work to do. + + testDropNone(ImmutableMap.of("inputPrefix", "Z", "shouldCompact", "true")); + } + + @Test + public void testPerTableClasspath() throws Exception { + // Can't assume that a test-resource will be on the server's classpath + Assume.assumeTrue(ClusterType.MINI == getClusterType()); + + // test per-table classpath + user specified compaction strategy + + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + File target = new File(System.getProperty("user.dir"), "target"); + Assert.assertTrue(target.mkdirs() || target.isDirectory()); + File destFile = installJar(target, "/TestCompactionStrat.jar"); + c.tableOperations().create(tableName); + c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", destFile.toString()); + c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1"); + + c.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("efg")))); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + + writeFlush(c, tableName, "h"); + writeFlush(c, tableName, "i"); + + Assert.assertEquals(4, FunctionalTestUtils.countRFiles(c, tableName)); + + // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted. + CompactionStrategyConfig csConfig = new CompactionStrategyConfig("org.apache.accumulo.test.EfgCompactionStrat"); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName)); + } + + private static File installJar(File destDir, String jarFile) throws IOException { + File destName = new File(destDir, new File(jarFile).getName()); + FileUtils.copyInputStreamToFile(ConfigurableCompactionIT.class.getResourceAsStream(jarFile), destName); + return destName; + } + + @Test + public void testIterators() throws Exception { + // test compaction strategy + iterators + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + writeFlush(c, tableName, "a"); + writeFlush(c, tableName, "b"); + // create a file that starts with A containing rows 'a' and 'b' + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + writeFlush(c, tableName, "c"); + writeFlush(c, tableName, "d"); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + // drop files that start with A + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("inputPrefix", "F")); + + IteratorSetting iterConf = new IteratorSetting(21, "myregex", RegExFilter.class); + RegExFilter.setRegexs(iterConf, "a|c", null, null, null, false); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig).setIterators(Arrays.asList(iterConf))); + + // compaction strategy should only be applied to one file. If its applied to both, then row 'b' would be dropped by filter. + Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName)); + + Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName)); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + // ensure that iterator is not applied + Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + } + + @Test + public void testFileSize() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + // write random data because its very unlikely it will compress + writeRandomValue(c, tableName, 1 << 16); + writeRandomValue(c, tableName, 1 << 16); + + writeRandomValue(c, tableName, 1 << 9); + writeRandomValue(c, tableName, 1 << 7); + writeRandomValue(c, tableName, 1 << 6); + + Assert.assertEquals(5, FunctionalTestUtils.countRFiles(c, tableName)); + + CompactionStrategyConfig csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 15))); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName)); + + csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName()); + csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 17))); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + + } + + @Test + public void testConcurrent() throws Exception { + // two compactions without iterators or strategy should be able to run concurrently + + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + // write random data because its very unlikely it will compress + writeRandomValue(c, tableName, 1 << 16); + writeRandomValue(c, tableName, 1 << 16); + + c.tableOperations().compact(tableName, new CompactionConfig().setWait(false)); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + + Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName)); + + writeRandomValue(c, tableName, 1 << 16); + + IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class); + SlowIterator.setSleepTime(iterConfig, 1000); + + long t1 = System.currentTimeMillis(); + c.tableOperations().compact(tableName, new CompactionConfig().setWait(false).setIterators(Arrays.asList(iterConfig))); + try { + // this compaction should fail because previous one set iterators + c.tableOperations().compact(tableName, new CompactionConfig().setWait(true)); + if (System.currentTimeMillis() - t1 < 2000) + Assert.fail("Expected compaction to fail because another concurrent compaction set iterators"); + } catch (AccumuloException e) {} + } + + void writeRandomValue(Connector c, String tableName, int size) throws Exception { + Random rand = new Random(); + + byte data1[] = new byte[size]; + rand.nextBytes(data1); + + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation m1 = new Mutation("r" + rand.nextInt(909090)); + m1.put("data", "bl0b", new Value(data1)); + + bw.addMutation(m1); + bw.close(); + c.tableOperations().flush(tableName, null, null, true); + } + + private Set getRows(Connector c, String tableName) throws TableNotFoundException { + Set rows = new HashSet<>(); + Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); + + for (Entry entry : scanner) + rows.add(entry.getKey().getRowData().toString()); + return rows; + + } + + private void writeFlush(Connector conn, String tablename, String row) throws Exception { + BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig()); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bw.close(); + conn.tableOperations().flush(tablename, null, null, true); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index 5808804,0000000..52fc57f mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@@ -1,298 -1,0 +1,270 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.functional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; - import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.CompactionConfig; - import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; - import org.apache.accumulo.core.zookeeper.ZooUtil; - import org.apache.accumulo.fate.AdminUtil; - import org.apache.accumulo.fate.AdminUtil.FateStatus; - import org.apache.accumulo.fate.ZooStore; - import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.harness.AccumuloClusterHarness; - import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; +import org.apache.hadoop.io.Text; - import org.apache.zookeeper.KeeperException; +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { + + @Test + public void testConcurrentDeleteTablesOps() throws Exception { + final Connector c = getConnector(); + String[] tables = getUniqueNames(2); + + TreeSet splits = createSplits(); + + ExecutorService es = Executors.newFixedThreadPool(20); + + int count = 0; + for (final String table : tables) { + c.tableOperations().create(table); + c.tableOperations().addSplits(table, splits); + writeData(c, table); + if (count == 1) { + c.tableOperations().flush(table, null, null, true); + } + count++; + + int numDeleteOps = 20; + final CountDownLatch cdl = new CountDownLatch(numDeleteOps); + + List> futures = new ArrayList<>(); + + for (int i = 0; i < numDeleteOps; i++) { + Future future = es.submit(new Runnable() { + + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + }); + + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + + try { + c.createScanner(table, Authorizations.EMPTY); + Assert.fail("Expected table " + table + " to be gone."); + } catch (TableNotFoundException tnfe) { + // expected + } + - FateStatus fateStatus = getFateStatus(); - - // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. - Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); - Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + + es.shutdown(); + } + + private TreeSet createSplits() { + TreeSet splits = new TreeSet<>(); + + for (int i = 0; i < 1000; i++) { + Text split = new Text(String.format("%09x", i * 100000)); + splits.add(split); + } + return splits; + } + + private static abstract class DelayedTableOp implements Runnable { + private CountDownLatch cdl; + + DelayedTableOp(CountDownLatch cdl) { + this.cdl = cdl; + } + + public void run() { + try { + cdl.countDown(); + cdl.await(); + Thread.sleep(10); + doTableOp(); - } catch (TableNotFoundException e) { ++ } catch (TableNotFoundException | TableOfflineException e) { + // expected + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected abstract void doTableOp() throws Exception; + } + + @Test + public void testConcurrentFateOpsWithDelete() throws Exception { + final Connector c = getConnector(); + String[] tables = getUniqueNames(2); + + TreeSet splits = createSplits(); + + int numOperations = 8; + + ExecutorService es = Executors.newFixedThreadPool(numOperations); + + int count = 0; + for (final String table : tables) { + c.tableOperations().create(table); + c.tableOperations().addSplits(table, splits); + writeData(c, table); + if (count == 1) { + c.tableOperations().flush(table, null, null, true); + } + count++; + + // increment this for each test + final CountDownLatch cdl = new CountDownLatch(numOperations); + + List> futures = new ArrayList<>(); + + futures.add(es.submit(new Runnable() { + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException | TableOfflineException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().compact(table, new CompactionConfig()); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().merge(table, null, null); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + Map m = Collections.emptyMap(); + Set s = Collections.emptySet(); + c.tableOperations().clone(table, table + "_clone", true, m, s); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().deleteRows(table, null, null); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().cancelCompaction(table); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().rename(table, table + "_renamed"); + } + })); + + futures.add(es.submit(new DelayedTableOp(cdl) { + @Override + protected void doTableOp() throws Exception { + c.tableOperations().offline(table); + } + })); + + Assert.assertEquals(numOperations, futures.size()); + + for (Future future : futures) { + future.get(); + } + + try { + c.createScanner(table, Authorizations.EMPTY); + Assert.fail("Expected table " + table + " to be gone."); + } catch (TableNotFoundException tnfe) { + // expected + } + - FateStatus fateStatus = getFateStatus(); - - // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. - Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); - Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + + es.shutdown(); + } + - private FateStatus getFateStatus() throws KeeperException, InterruptedException { - Instance instance = getConnector().getInstance(); - AdminUtil admin = new AdminUtil<>(false); - String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET); - IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); - ZooStore zs = new ZooStore(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); - FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); - return fateStatus; - } - + private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); + try { + Random rand = new Random(); + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000))); + m.put("m", "order", "" + i); + bw.addMutation(m); + } + } finally { + bw.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java index 30f4476,0000000..c5f9eab mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java @@@ -1,80 -1,0 +1,82 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.TestIngest; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +/** + * See ACCUMULO-779 + */ +public class FateStarvationIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void run() throws Exception { + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(tableName); + + c.tableOperations().addSplits(tableName, TestIngest.getSplitPoints(0, 100000, 50)); + + TestIngest.Opts opts = new TestIngest.Opts(); + opts.random = 89; + opts.timestamp = 7; + opts.dataSize = 50; + opts.rows = 100000; + opts.cols = 1; + opts.setTableName(tableName); + ClientConfiguration clientConf = cluster.getClientConfig(); + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConf); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, opts, new BatchWriterOpts()); + + c.tableOperations().flush(tableName, null, null, true); + + List splits = new ArrayList<>(TestIngest.getSplitPoints(0, 100000, 67)); + Random rand = new Random(); + + for (int i = 0; i < 100; i++) { + int idx1 = rand.nextInt(splits.size() - 1); + int idx2 = rand.nextInt(splits.size() - (idx1 + 1)) + idx1 + 1; + + c.tableOperations().compact(tableName, splits.get(idx1), splits.get(idx2), false, false); + } + + c.tableOperations().offline(tableName); ++ ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 829293e,0000000..8659922 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@@ -1,186 -1,0 +1,216 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertFalse; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + ++import org.apache.accumulo.cluster.AccumuloCluster; ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; ++import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.zookeeper.ZooUtil; ++import org.apache.accumulo.fate.AdminUtil; ++import org.apache.accumulo.fate.AdminUtil.FateStatus; ++import org.apache.accumulo.fate.ZooStore; ++import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter; ++import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; +import org.apache.accumulo.test.TestIngest; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; ++import org.apache.zookeeper.KeeperException; ++import org.junit.Assert; + +import com.google.common.collect.Iterators; + +public class FunctionalTestUtils { + + public static int countRFiles(Connector c, String tableName) throws Exception { + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + String tableId = c.tableOperations().tableIdMap().get(tableName); + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + return Iterators.size(scanner.iterator()); + } + + static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception { + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + String tableId = c.tableOperations().tableIdMap().get(tableName); + scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"), true)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + + HashMap tabletFileCounts = new HashMap<>(); + + for (Entry entry : scanner) { + + Text row = entry.getKey().getRow(); + + Integer count = tabletFileCounts.get(row); + if (count == null) + count = 0; + if (entry.getKey().getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) { + count = count + 1; + } + + tabletFileCounts.put(row, count); + } + + if (tabletFileCounts.size() < minTablets || tabletFileCounts.size() > maxTablets) { + throw new Exception("Did not find expected number of tablets " + tabletFileCounts.size()); + } + + Set> es = tabletFileCounts.entrySet(); + for (Entry entry : es) { + if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) { + throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map files"); + } + } + } + + static public void bulkImport(Connector c, FileSystem fs, String table, String dir) throws Exception { + String failDir = dir + "_failures"; + Path failPath = new Path(failDir); + fs.delete(failPath, true); + fs.mkdirs(failPath); + + // Ensure server can read/modify files + c.tableOperations().importDirectory(table, dir, failDir, false); + + if (fs.listStatus(failPath).length > 0) { + throw new Exception("Some files failed to bulk import"); + } + + } + + static public void checkSplits(Connector c, String table, int min, int max) throws Exception { + Collection splits = c.tableOperations().listSplits(table); + if (splits.size() < min || splits.size() > max) { + throw new Exception("# of table splits points out of range, #splits=" + splits.size() + " table=" + table + " min=" + min + " max=" + max); + } + } + + static public void createRFiles(final Connector c, final FileSystem fs, String path, int rows, int splits, int threads) throws Exception { + fs.delete(new Path(path), true); + ExecutorService threadPool = Executors.newFixedThreadPool(threads); + final AtomicBoolean fail = new AtomicBoolean(false); + for (int i = 0; i < rows; i += rows / splits) { + final TestIngest.Opts opts = new TestIngest.Opts(); + opts.outputFile = String.format("%s/mf%s", path, i); + opts.random = 56; + opts.timestamp = 1; + opts.dataSize = 50; + opts.rows = rows / splits; + opts.startRow = i; + opts.cols = 1; + threadPool.execute(new Runnable() { + @Override + public void run() { + try { + TestIngest.ingest(c, fs, opts, new BatchWriterOpts()); + } catch (Exception e) { + fail.set(true); + } + } + }); + } + threadPool.shutdown(); + threadPool.awaitTermination(1, TimeUnit.HOURS); + assertFalse(fail.get()); + } + + static public String readAll(InputStream is) throws IOException { + byte[] buffer = new byte[4096]; + StringBuilder result = new StringBuilder(); + while (true) { + int n = is.read(buffer); + if (n <= 0) + break; + result.append(new String(buffer, 0, n)); + } + return result.toString(); + } + + public static String readAll(MiniAccumuloClusterImpl c, Class klass, Process p) throws Exception { + for (LogWriter writer : c.getLogWriters()) + writer.flush(); + return readAll(new FileInputStream(c.getConfig().getLogDir() + "/" + klass.getSimpleName() + "_" + p.hashCode() + ".out")); + } + + static Mutation nm(String row, String cf, String cq, Value value) { + Mutation m = new Mutation(new Text(row)); + m.put(new Text(cf), new Text(cq), value); + return m; + } + + static Mutation nm(String row, String cf, String cq, String value) { + return nm(row, cf, cq, new Value(value.getBytes())); + } + + public static SortedSet splits(String[] splits) { + SortedSet result = new TreeSet<>(); + for (String split : splits) + result.add(new Text(split)); + return result; + } + ++ public static void assertNoDanglingFateLocks(Instance instance, AccumuloCluster cluster) { ++ FateStatus fateStatus = getFateStatus(instance, cluster); ++ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingHeldLocks(), 0, fateStatus.getDanglingHeldLocks().size()); ++ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingWaitingLocks(), 0, fateStatus.getDanglingWaitingLocks().size()); ++ } ++ ++ private static FateStatus getFateStatus(Instance instance, AccumuloCluster cluster) { ++ try { ++ AdminUtil admin = new AdminUtil<>(false); ++ String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET); ++ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); ++ ZooStore zs = new ZooStore(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); ++ FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); ++ return fateStatus; ++ } catch (KeeperException | InterruptedException e) { ++ throw new RuntimeException(e); ++ } ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java index 0c22196,0000000..47438a6 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java @@@ -1,74 -1,0 +1,76 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.junit.Test; + +public class RenameIT extends AccumuloClusterHarness { + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void renameTest() throws Exception { + String[] tableNames = getUniqueNames(2); + String name1 = tableNames[0]; + String name2 = tableNames[1]; + BatchWriterOpts bwOpts = new BatchWriterOpts(); + ScannerOpts scanOpts = new ScannerOpts(); + TestIngest.Opts opts = new TestIngest.Opts(); + opts.createTable = true; + opts.setTableName(name1); + + final ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + + Connector c = getConnector(); + TestIngest.ingest(c, opts, bwOpts); + c.tableOperations().rename(name1, name2); + TestIngest.ingest(c, opts, bwOpts); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + vopts.updateKerberosCredentials(clientConfig); + } else { + vopts.setPrincipal(getAdminPrincipal()); + } + + vopts.setTableName(name2); + VerifyIngest.verifyIngest(c, vopts, scanOpts); + c.tableOperations().delete(name1); + c.tableOperations().rename(name2, name1); + vopts.setTableName(name1); + VerifyIngest.verifyIngest(c, vopts, scanOpts); ++ ++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster()); + } + +}