Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B1F5200C17 for ; Fri, 27 Jan 2017 01:45:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 29826160B58; Fri, 27 Jan 2017 00:45:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ED90A160B50 for ; Fri, 27 Jan 2017 01:45:42 +0100 (CET) Received: (qmail 44914 invoked by uid 500); 27 Jan 2017 00:45:42 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 44892 invoked by uid 99); 27 Jan 2017 00:45:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jan 2017 00:45:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 081A2DFC68; Fri, 27 Jan 2017 00:45:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Fri, 27 Jan 2017 00:45:43 -0000 Message-Id: <27c9ffc20b9d4f928f72995ab4ba04b8@git.apache.org> In-Reply-To: <5ae77e05307248c1bd299267ae9a2d0c@git.apache.org> References: <5ae77e05307248c1bd299267ae9a2d0c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] accumulo git commit: Merge branch '1.7' into 1.8 archived-at: Fri, 27 Jan 2017 00:45:44 -0000 Merge branch '1.7' into 1.8 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bf5b6e0f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bf5b6e0f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bf5b6e0f Branch: refs/heads/1.8 Commit: bf5b6e0fae4974bb77b6409c21c5770d815b991f Parents: 8668c7f 7b9a11a Author: Keith Turner Authored: Thu Jan 26 19:24:14 2017 -0500 Committer: Keith Turner Committed: Thu Jan 26 19:24:14 2017 -0500 ---------------------------------------------------------------------- .../accumulo/master/FateServiceHandler.java | 15 +- .../master/tableOps/CancelCompactions.java | 19 ++- .../master/tableOps/ChangeTableState.java | 20 ++- .../accumulo/master/tableOps/CompactRange.java | 30 ++-- .../master/tableOps/CompactionDriver.java | 14 +- .../accumulo/master/tableOps/DeleteTable.java | 29 ++-- .../accumulo/master/tableOps/RenameTable.java | 19 ++- .../accumulo/master/tableOps/TableRangeOp.java | 20 ++- .../master/tableOps/TableRangeOpWait.java | 9 +- .../apache/accumulo/master/tableOps/Utils.java | 9 +- .../functional/ConcurrentDeleteTableIT.java | 167 ++++++++++++++++++- 11 files changed, 262 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index d7d5b14,e641479..f3d53a7 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@@ -24,11 -24,10 +24,10 @@@ import java.util.List import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CompactionStrategyConfig; +import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil; - import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.fate.Repo; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; @@@ -51,9 -51,14 +51,14 @@@ public class CompactRange extends Maste private byte[] endRow; private byte[] config; - public CompactRange(String tableId, byte[] startRow, byte[] endRow, List iterators, CompactionStrategyConfig compactionStrategy) - throws AcceptableThriftTableOperationException { + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT, this.namespaceId); + } + + public CompactRange(String namespaceId, String tableId, byte[] startRow, byte[] endRow, List iterators, - CompactionStrategyConfig compactionStrategy) throws ThriftTableOperationException { ++ CompactionStrategyConfig compactionStrategy) throws AcceptableThriftTableOperationException { + requireNonNull(namespaceId, "Invalid argument: null namespaceId"); requireNonNull(tableId, "Invalid argument: null tableId"); requireNonNull(iterators, "Invalid argument: null iterator list"); requireNonNull(compactionStrategy, "Invalid argument: null compactionStrategy"); @@@ -122,9 -127,9 +127,9 @@@ } }); - return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), tableId, startRow, endRow); + return new CompactionDriver(Long.parseLong(new String(cid, UTF_8).split(",")[0]), getNamespaceId(env), tableId, startRow, endRow); } catch (NoNodeException nne) { - throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); + throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java index e2e7018,64d08be..d91755d --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java @@@ -16,10 -16,9 +16,9 @@@ */ package org.apache.accumulo.master.tableOps; +import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException; - import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.TableOperation; import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; -import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.TextUtil; @@@ -42,15 -42,19 +42,19 @@@ public class TableRangeOp extends Maste private byte[] endRow; private Operation op; - @Override - public long isReady(long tid, Master environment) throws Exception { - String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId); - return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.MERGE) + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE); + private String getNamespaceId(Master env) throws Exception { + return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.MERGE, this.namespaceId); } - public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws AcceptableThriftTableOperationException { + @Override + public long isReady(long tid, Master env) throws Exception { + return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, TableOperation.MERGE) + + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE); + } - public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException { ++ public TableRangeOp(MergeInfo.Operation op, String namespaceId, String tableId, Text startRow, Text endRow) throws AcceptableThriftTableOperationException { this.tableId = tableId; + this.namespaceId = namespaceId; this.startRow = TextUtil.getBytes(startRow); this.endRow = TextUtil.getBytes(endRow); this.op = op; @@@ -85,14 -90,14 +89,14 @@@ @Override public void undo(long tid, Master env) throws Exception { - String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId); // Not sure this is a good thing to do. The Master state engine should be the one to remove it. - Text tableIdText = new Text(tableId); - MergeInfo mergeInfo = env.getMergeInfo(tableIdText); + MergeInfo mergeInfo = env.getMergeInfo(tableId); if (mergeInfo.getState() != MergeState.NONE) log.info("removing merge information " + mergeInfo); - env.clearMergeState(tableIdText); + env.clearMergeState(tableId); + Utils.unreserveNamespace(namespaceId, tid, false); Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(getNamespaceId(env), tid, false); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java index a7c82b1,5feb06d..1194c67 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java @@@ -57,12 -61,12 +59,11 @@@ class TableRangeOpWait extends MasterRe @Override public Repo call(long tid, Master master) throws Exception { - String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId); - Text tableIdText = new Text(tableId); - MergeInfo mergeInfo = master.getMergeInfo(tableIdText); + MergeInfo mergeInfo = master.getMergeInfo(tableId); log.info("removing merge information " + mergeInfo); - master.clearMergeState(tableIdText); + master.clearMergeState(tableId); - Utils.unreserveNamespace(namespaceId, tid, false); Utils.unreserveTable(tableId, tid, true); + Utils.unreserveNamespace(Utils.getNamespaceId(master.getInstance(), tableId, TableOperation.MERGE, this.namespaceId), tid, false); return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java index 3f7a305,0000000..5808804 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java @@@ -1,147 -1,0 +1,298 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.functional; + +import java.util.ArrayList; ++import java.util.Collections; +import java.util.List; ++import java.util.Map; +import java.util.Random; ++import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; ++import org.apache.accumulo.core.client.TableOfflineException; ++import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AdminUtil; +import org.apache.accumulo.fate.AdminUtil.FateStatus; +import org.apache.accumulo.fate.ZooStore; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentDeleteTableIT extends AccumuloClusterHarness { + + @Test + public void testConcurrentDeleteTablesOps() throws Exception { + final Connector c = getConnector(); + String[] tables = getUniqueNames(2); + - TreeSet splits = new TreeSet<>(); - - for (int i = 0; i < 1000; i++) { - Text split = new Text(String.format("%09x", i * 100000)); - splits.add(split); - } ++ TreeSet splits = createSplits(); + + ExecutorService es = Executors.newFixedThreadPool(20); + + int count = 0; + for (final String table : tables) { + c.tableOperations().create(table); + c.tableOperations().addSplits(table, splits); + writeData(c, table); + if (count == 1) { + c.tableOperations().flush(table, null, null, true); + } + count++; + - final CountDownLatch cdl = new CountDownLatch(20); ++ int numDeleteOps = 20; ++ final CountDownLatch cdl = new CountDownLatch(numDeleteOps); + + List> futures = new ArrayList<>(); + - for (int i = 0; i < 20; i++) { ++ for (int i = 0; i < numDeleteOps; i++) { + Future future = es.submit(new Runnable() { + + @Override + public void run() { + try { + cdl.countDown(); + cdl.await(); + c.tableOperations().delete(table); + } catch (TableNotFoundException e) { + // expected + } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + }); + + futures.add(future); + } + + for (Future future : futures) { + future.get(); + } + + try { + c.createScanner(table, Authorizations.EMPTY); + Assert.fail("Expected table " + table + " to be gone."); + } catch (TableNotFoundException tnfe) { + // expected + } + + FateStatus fateStatus = getFateStatus(); + + // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. + Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); + Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); + } + + es.shutdown(); + } + ++ private TreeSet createSplits() { ++ TreeSet splits = new TreeSet<>(); ++ ++ for (int i = 0; i < 1000; i++) { ++ Text split = new Text(String.format("%09x", i * 100000)); ++ splits.add(split); ++ } ++ return splits; ++ } ++ ++ private static abstract class DelayedTableOp implements Runnable { ++ private CountDownLatch cdl; ++ ++ DelayedTableOp(CountDownLatch cdl) { ++ this.cdl = cdl; ++ } ++ ++ public void run() { ++ try { ++ cdl.countDown(); ++ cdl.await(); ++ Thread.sleep(10); ++ doTableOp(); ++ } catch (TableNotFoundException e) { ++ // expected ++ } catch (RuntimeException e) { ++ throw e; ++ } catch (Exception e) { ++ throw new RuntimeException(e); ++ } ++ } ++ ++ protected abstract void doTableOp() throws Exception; ++ } ++ ++ @Test ++ public void testConcurrentFateOpsWithDelete() throws Exception { ++ final Connector c = getConnector(); ++ String[] tables = getUniqueNames(2); ++ ++ TreeSet splits = createSplits(); ++ ++ int numOperations = 8; ++ ++ ExecutorService es = Executors.newFixedThreadPool(numOperations); ++ ++ int count = 0; ++ for (final String table : tables) { ++ c.tableOperations().create(table); ++ c.tableOperations().addSplits(table, splits); ++ writeData(c, table); ++ if (count == 1) { ++ c.tableOperations().flush(table, null, null, true); ++ } ++ count++; ++ ++ // increment this for each test ++ final CountDownLatch cdl = new CountDownLatch(numOperations); ++ ++ List> futures = new ArrayList<>(); ++ ++ futures.add(es.submit(new Runnable() { ++ @Override ++ public void run() { ++ try { ++ cdl.countDown(); ++ cdl.await(); ++ c.tableOperations().delete(table); ++ } catch (TableNotFoundException | TableOfflineException e) { ++ // expected ++ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) { ++ throw new RuntimeException(e); ++ } ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().compact(table, new CompactionConfig()); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().merge(table, null, null); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ Map m = Collections.emptyMap(); ++ Set s = Collections.emptySet(); ++ c.tableOperations().clone(table, table + "_clone", true, m, s); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().deleteRows(table, null, null); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().cancelCompaction(table); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().rename(table, table + "_renamed"); ++ } ++ })); ++ ++ futures.add(es.submit(new DelayedTableOp(cdl) { ++ @Override ++ protected void doTableOp() throws Exception { ++ c.tableOperations().offline(table); ++ } ++ })); ++ ++ Assert.assertEquals(numOperations, futures.size()); ++ ++ for (Future future : futures) { ++ future.get(); ++ } ++ ++ try { ++ c.createScanner(table, Authorizations.EMPTY); ++ Assert.fail("Expected table " + table + " to be gone."); ++ } catch (TableNotFoundException tnfe) { ++ // expected ++ } ++ ++ FateStatus fateStatus = getFateStatus(); ++ ++ // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks. ++ Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size()); ++ Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size()); ++ } ++ ++ es.shutdown(); ++ } ++ + private FateStatus getFateStatus() throws KeeperException, InterruptedException { + Instance instance = getConnector().getInstance(); + AdminUtil admin = new AdminUtil<>(false); + String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET); + IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret); + ZooStore zs = new ZooStore(ZooUtil.getRoot(instance) + Constants.ZFATE, zk); + FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null); + return fateStatus; + } + + private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig()); + try { + Random rand = new Random(); + for (int i = 0; i < 1000; i++) { + Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000))); + m.put("m", "order", "" + i); + bw.addMutation(m); + } + } finally { + bw.close(); + } + } +}