accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/3] accumulo git commit: ACCUMULO-4575 Fixed concurrent delete table bug
Date Wed, 25 Jan 2017 17:54:36 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master b31ce443f -> 2f5203c24


ACCUMULO-4575 Fixed concurrent delete table bug


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/df400c59
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/df400c59
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/df400c59

Branch: refs/heads/master
Commit: df400c59efd2274d8714cad4c9d3648bb0845c50
Parents: 4ea66d4
Author: Keith Turner <kturner@apache.org>
Authored: Wed Jan 25 12:19:01 2017 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Wed Jan 25 12:19:01 2017 -0500

----------------------------------------------------------------------
 .../org/apache/accumulo/fate/AdminUtil.java     | 104 +++++++++++--
 .../accumulo/cluster/AccumuloCluster.java       |   6 +
 .../standalone/StandaloneAccumuloCluster.java   |  12 ++
 .../impl/MiniAccumuloClusterImpl.java           |   8 +
 .../accumulo/master/FateServiceHandler.java     |   2 +-
 .../accumulo/master/tableOps/DeleteTable.java   |  25 +++-
 .../apache/accumulo/master/tableOps/Utils.java  |  13 ++
 .../test/functional/BackupMasterIT.java         |   7 +-
 .../functional/ConcurrentDeleteTableIT.java     | 147 +++++++++++++++++++
 9 files changed, 303 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index f6aa811..b8baa67 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;
+import org.apache.accumulo.fate.zookeeper.IZooReader;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -62,11 +63,77 @@ public class AdminUtil<T> {
     this.exitOnError = exitOnError;
   }
 
-  public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath) throws
KeeperException, InterruptedException {
-    print(zs, zk, lockPath, new Formatter(System.out), null, null);
+  public static class TransactionStatus {
+
+    private final long txid;
+    private final TStatus status;
+    private final String debug;
+    private final List<String> hlocks;
+    private final List<String> wlocks;
+    private final String top;
+
+    private TransactionStatus(Long tid, TStatus status, String debug, List<String>
hlocks, List<String> wlocks, String top) {
+      this.txid = tid;
+      this.status = status;
+      this.debug = debug;
+      this.hlocks = Collections.unmodifiableList(hlocks);
+      this.wlocks = Collections.unmodifiableList(wlocks);
+      this.top = top;
+    }
+
+    public long getTxid() {
+      return txid;
+    }
+
+    public TStatus getStatus() {
+      return status;
+    }
+
+    public String getDebug() {
+      return debug;
+    }
+
+    public List<String> getHeldLocks() {
+      return hlocks;
+    }
+
+    public List<String> getWaitingLocks() {
+      return wlocks;
+    }
+
+    public String getTop() {
+      return top;
+    }
+
   }
 
-  public void print(ReadOnlyTStore<T> zs, IZooReaderWriter zk, String lockPath, Formatter
fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
+  public static class FateStatus {
+
+    private final List<TransactionStatus> transactions;
+    private final Map<Long,List<String>> danglingHeldLocks;
+    private final Map<Long,List<String>> danglingWaitingLocks;
+
+    private FateStatus(List<TransactionStatus> transactions, Map<Long,List<String>>
danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) {
+      this.transactions = Collections.unmodifiableList(transactions);
+      this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks);
+      this.danglingWaitingLocks = Collections.unmodifiableMap(danglingWaitingLocks);
+
+    }
+
+    public List<TransactionStatus> getTransactions() {
+      return transactions;
+    }
+
+    public Map<Long,List<String>> getDanglingHeldLocks() {
+      return danglingHeldLocks;
+    }
+
+    public Map<Long,List<String>> getDanglingWaitingLocks() {
+      return danglingWaitingLocks;
+    }
+  }
+
+  public FateStatus getStatus(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath,
Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
       throws KeeperException, InterruptedException {
     Map<Long,List<String>> heldLocks = new HashMap<>();
     Map<Long,List<String>> waitingLocks = new HashMap<>();
@@ -118,13 +185,12 @@ public class AdminUtil<T> {
 
       } catch (Exception e) {
         log.error("Failed to read locks for " + id + " continuing.", e);
-        fmt.format("Failed to read locks for %s continuing", id);
       }
     }
 
     List<Long> transactions = zs.list();
+    List<TransactionStatus> statuses = new ArrayList<>(transactions.size());
 
-    long txCount = 0;
     for (Long tid : transactions) {
 
       zs.reserve(tid);
@@ -152,17 +218,33 @@ public class AdminUtil<T> {
       if ((filterTxid != null && !filterTxid.contains(tid)) || (filterStatus != null
&& !filterStatus.contains(status)))
         continue;
 
-      ++txCount;
-      fmt.format("txid: %016x  status: %-18s  op: %-15s  locked: %-15s locking: %-15s top:
%s%n", tid, status, debug, hlocks, wlocks, top);
+      statuses.add(new TransactionStatus(tid, status, debug, hlocks, wlocks, top));
+    }
+
+    return new FateStatus(statuses, heldLocks, waitingLocks);
+  }
+
+  public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath) throws KeeperException,
InterruptedException {
+    print(zs, zk, lockPath, new Formatter(System.out), null, null);
+  }
+
+  public void print(ReadOnlyTStore<T> zs, IZooReader zk, String lockPath, Formatter
fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
+      throws KeeperException, InterruptedException {
+
+    FateStatus fateStatus = getStatus(zs, zk, lockPath, filterTxid, filterStatus);
+
+    for (TransactionStatus txStatus : fateStatus.getTransactions()) {
+      fmt.format("txid: %016x  status: %-18s  op: %-15s  locked: %-15s locking: %-15s top:
%s%n", txStatus.getTxid(), txStatus.getStatus(),
+          txStatus.getDebug(), txStatus.getHeldLocks(), txStatus.getWaitingLocks(), txStatus.getTop());
     }
-    fmt.format(" %s transactions", txCount);
+    fmt.format(" %s transactions", fateStatus.getTransactions().size());
 
-    if (heldLocks.size() != 0 || waitingLocks.size() != 0) {
+    if (fateStatus.getDanglingHeldLocks().size() != 0 || fateStatus.getDanglingWaitingLocks().size()
!= 0) {
       fmt.format("%nThe following locks did not have an associated FATE operation%n");
-      for (Entry<Long,List<String>> entry : heldLocks.entrySet())
+      for (Entry<Long,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet())
         fmt.format("txid: %016x  locked: %s%n", entry.getKey(), entry.getValue());
 
-      for (Entry<Long,List<String>> entry : waitingLocks.entrySet())
+      for (Entry<Long,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet())
         fmt.format("txid: %016x  locking: %s%n", entry.getKey(), entry.getValue());
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
index 767633b..8e80358 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
@@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -57,6 +58,11 @@ public interface AccumuloCluster {
   ClientConfiguration getClientConfig();
 
   /**
+   * Get server side config derived from accumulo-site.xml
+   */
+  AccumuloConfiguration getSiteConfiguration();
+
+  /**
    * Get an object that can manage a cluster
    *
    * @return Manage the state of the cluster

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 1baa3a1..ad84f2f 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -32,6 +32,8 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.minicluster.ServerType;
@@ -41,6 +43,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
 /**
  * AccumuloCluster implementation to connect to an existing deployment of Accumulo
  */
@@ -184,4 +188,12 @@ public class StandaloneAccumuloCluster implements AccumuloCluster {
     checkArgument(offset >= 0 && offset < users.size(), "Invalid offset, should
be non-negative and less than " + users.size());
     return users.get(offset);
   }
+
+  @Override
+  public AccumuloConfiguration getSiteConfiguration() {
+    Configuration conf = new Configuration(false);
+    Path accumuloSite = new Path(serverAccumuloConfDir, "accumulo-site.xml");
+    conf.addResource(accumuloSite);
+    return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(),
conf));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index f2e5c7c..79ad527 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -110,6 +111,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 /**
@@ -820,4 +822,10 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
       return new Path(tmp.toString());
     }
   }
+
+  @Override
+  public AccumuloConfiguration getSiteConfiguration() {
+    // TODO Auto-generated method stub
+    return new ConfigurationCopy(Iterables.concat(AccumuloConfiguration.getDefaultConfiguration(),
config.getSiteConfig().entrySet()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index 09a90b5..5f0ddd2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -254,7 +254,7 @@ class FateServiceHandler implements FateService.Iface {
 
         if (!canDeleteTable)
           throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
-        master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(tableId)),
autoCleanup);
+        master.fate.seedTransaction(opid, new TraceRepo<>(new DeleteTable(namespaceId,
tableId)), autoCleanup);
         break;
       }
       case TABLE_ONLINE: {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index a1158f4..1eae5b9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.master.tableOps;
 
-import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.fate.Repo;
@@ -28,20 +27,32 @@ public class DeleteTable extends MasterRepo {
   private static final long serialVersionUID = 1L;
 
   private String tableId;
+  private String namespaceId;
 
-  public DeleteTable(String tableId) {
+  private String getNamespaceId(Master environment) throws Exception {
+    if (namespaceId == null) {
+      // For ACCUMULO-4575 namespaceId was added in a bug fix release. Since it was added
in bug fix release, we have to ensure we can properly deserialize
+      // older versions. When deserializing an older version, namespaceId will be null. For
this case revert to the old buggy behavior.
+      return Utils.getNamespaceId(environment.getInstance(), tableId, TableOperation.DELETE);
+    }
+
+    return namespaceId;
+  }
+
+  public DeleteTable(String namespaceId, String tableId) {
+    this.namespaceId = namespaceId;
     this.tableId = tableId;
   }
 
   @Override
   public long isReady(long tid, Master environment) throws Exception {
-    String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+    String namespaceId = getNamespaceId(environment);
     return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE)
+ Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
   }
 
   @Override
   public Repo<Master> call(long tid, Master environment) throws Exception {
-    String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
+    String namespaceId = getNamespaceId(environment);
     TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
     environment.getEventCoordinator().event("deleting table %s ", tableId);
     return new CleanUp(tableId, namespaceId);
@@ -49,9 +60,9 @@ public class DeleteTable extends MasterRepo {
 
   @Override
   public void undo(long tid, Master environment) throws Exception {
-    String namespaceId = Tables.getNamespaceId(environment.getInstance(), tableId);
-    Utils.unreserveNamespace(namespaceId, tid, false);
+    if (namespaceId != null) {
+      Utils.unreserveNamespace(namespaceId, tid, false);
+    }
     Utils.unreserveTable(tableId, tid, true);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
index 0fb9138..9b921e2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@ -116,6 +116,19 @@ public class Utils {
       return 100;
   }
 
+  public static String getNamespaceId(Instance instance, String tableId, TableOperation op)
throws Exception {
+    try {
+      return Tables.getNamespaceId(instance, tableId);
+    } catch (RuntimeException e) {
+      // see if this was caused because the table does not exists
+      IZooReaderWriter zk = ZooReaderWriter.getInstance();
+      if (!zk.exists(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId))
+        throw new ThriftTableOperationException(tableId, "", op, TableOperationExceptionType.NOTFOUND,
"Table does not exist");
+      else
+        throw e;
+    }
+  }
+
   public static long reserveHdfsDirectory(String directory, long tid) throws KeeperException,
InterruptedException {
     Instance instance = HdfsZooInstance.getInstance();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
index efed7a4..8621ab1 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BackupMasterIT.java
@@ -19,10 +19,12 @@ package org.apache.accumulo.test.functional;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 import org.junit.Test;
 
 public class BackupMasterIT extends ConfigurableMacIT {
@@ -39,7 +41,8 @@ public class BackupMasterIT extends ConfigurableMacIT {
     // create a backup
     Process backup = exec(Master.class);
     try {
-      ZooReaderWriter writer = new ZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, "digest",
"accumulo:DONTTELL".getBytes());
+      String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
+      IZooReaderWriter writer = new ZooReaderWriterFactory().getZooReaderWriter(cluster.getZooKeepers(),
30 * 1000, secret);
       String root = "/accumulo/" + getConnector().getInstance().getInstanceID();
       List<String> children = Collections.emptyList();
       // wait for 2 lock entries

http://git-wip-us.apache.org/repos/asf/accumulo/blob/df400c59/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
new file mode 100644
index 0000000..4798095
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
+
+  @Test
+  public void testConcurrentDeleteTablesOps() throws Exception {
+    final Connector c = getConnector();
+    String[] tables = getUniqueNames(2);
+
+    TreeSet<Text> splits = new TreeSet<>();
+
+    for (int i = 0; i < 1000; i++) {
+      Text split = new Text(String.format("%09x", i * 100000));
+      splits.add(split);
+    }
+
+    ExecutorService es = Executors.newFixedThreadPool(20);
+
+    int count = 0;
+    for (final String table : tables) {
+      c.tableOperations().create(table);
+      c.tableOperations().addSplits(table, splits);
+      writeData(c, table);
+      if (count == 1) {
+        c.tableOperations().flush(table, null, null, true);
+      }
+      count++;
+
+      final CountDownLatch cdl = new CountDownLatch(20);
+
+      List<Future<?>> futures = new ArrayList<>();
+
+      for (int i = 0; i < 20; i++) {
+        Future<?> future = es.submit(new Runnable() {
+
+          @Override
+          public void run() {
+            try {
+              cdl.countDown();
+              cdl.await();
+              c.tableOperations().delete(table);
+            } catch (TableNotFoundException e) {
+              // expected
+            } catch (InterruptedException | AccumuloException | AccumuloSecurityException
e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+
+        futures.add(future);
+      }
+
+      for (Future<?> future : futures) {
+        future.get();
+      }
+
+      try {
+        c.createScanner(table, Authorizations.EMPTY);
+        Assert.fail("Expected table " + table + " to be gone.");
+      } catch (TableNotFoundException tnfe) {
+        // expected
+      }
+
+      FateStatus fateStatus = getFateStatus();
+
+      // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent
delete tables could fail and leave dangling locks.
+      Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
+      Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+    }
+
+    es.shutdown();
+  }
+
+  private FateStatus getFateStatus() throws KeeperException, InterruptedException {
+    Instance instance = getConnector().getInstance();
+    AdminUtil<String> admin = new AdminUtil<>(false);
+    String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
+    IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(),
instance.getZooKeepersSessionTimeOut(), secret);
+    ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE,
zk);
+    FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS,
null, null);
+    return fateStatus;
+  }
+
+  private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException
{
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+    try {
+      Random rand = new Random();
+      for (int i = 0; i < 1000; i++) {
+        Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000)));
+        m.put("m", "order", "" + i);
+        bw.addMutation(m);
+      }
+    } finally {
+      bw.close();
+    }
+  }
+}


Mime
View raw message