Repository: accumulo
Updated Branches:
refs/heads/1.7 7b9a11ad4 -> 6d8a5fa59
ACCUMULO-4578 release namespace lock when compaction canceled
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db84650e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db84650e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db84650e
Branch: refs/heads/1.7
Commit: db84650e7454b8354a28d0dcda8da1235a6ea175
Parents: 7b9a11a
Author: Keith Turner <kturner@apache.org>
Authored: Thu Jan 26 21:42:03 2017 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Thu Jan 26 21:42:03 2017 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/fate/AdminUtil.java | 33 +++++++++++++-------
.../master/tableOps/CancelCompactions.java | 2 +-
.../master/tableOps/FinishCancelCompaction.java | 12 +++++--
.../apache/accumulo/test/TableOperationsIT.java | 7 +++++
.../accumulo/test/UserCompactionStrategyIT.java | 6 ++++
.../functional/ConcurrentDeleteTableIT.java | 32 ++-----------------
.../test/functional/FateStarvationIT.java | 2 ++
.../test/functional/FunctionalTestUtils.java | 29 +++++++++++++++++
.../accumulo/test/functional/RenameIT.java | 2 ++
9 files changed, 81 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
index b8baa67..6d388ed 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
@@ -110,25 +110,36 @@ public class AdminUtil<T> {
public static class FateStatus {
private final List<TransactionStatus> transactions;
- private final Map<Long,List<String>> danglingHeldLocks;
- private final Map<Long,List<String>> danglingWaitingLocks;
+ private final Map<String,List<String>> danglingHeldLocks;
+ private final Map<String,List<String>> danglingWaitingLocks;
+
+ private static Map<String,List<String>> convert(Map<Long,List<String>>
danglocks) {
+ if (danglocks.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String,List<String>> ret = new HashMap<>();
+ for (Entry<Long,List<String>> entry : danglocks.entrySet()) {
+ ret.put(String.format("%016x", entry.getKey()), Collections.unmodifiableList(entry.getValue()));
+ }
+ return Collections.unmodifiableMap(ret);
+ }
private FateStatus(List<TransactionStatus> transactions, Map<Long,List<String>>
danglingHeldLocks, Map<Long,List<String>> danglingWaitingLocks) {
this.transactions = Collections.unmodifiableList(transactions);
- this.danglingHeldLocks = Collections.unmodifiableMap(danglingHeldLocks);
- this.danglingWaitingLocks = Collections.unmodifiableMap(danglingWaitingLocks);
-
+ this.danglingHeldLocks = convert(danglingHeldLocks);
+ this.danglingWaitingLocks = convert(danglingWaitingLocks);
}
public List<TransactionStatus> getTransactions() {
return transactions;
}
- public Map<Long,List<String>> getDanglingHeldLocks() {
+ public Map<String,List<String>> getDanglingHeldLocks() {
return danglingHeldLocks;
}
- public Map<Long,List<String>> getDanglingWaitingLocks() {
+ public Map<String,List<String>> getDanglingWaitingLocks() {
return danglingWaitingLocks;
}
}
@@ -241,11 +252,11 @@ public class AdminUtil<T> {
if (fateStatus.getDanglingHeldLocks().size() != 0 || fateStatus.getDanglingWaitingLocks().size()
!= 0) {
fmt.format("%nThe following locks did not have an associated FATE operation%n");
- for (Entry<Long,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet())
- fmt.format("txid: %016x locked: %s%n", entry.getKey(), entry.getValue());
+ for (Entry<String,List<String>> entry : fateStatus.getDanglingHeldLocks().entrySet())
+ fmt.format("txid: %s locked: %s%n", entry.getKey(), entry.getValue());
- for (Entry<Long,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet())
- fmt.format("txid: %016x locking: %s%n", entry.getKey(), entry.getValue());
+ for (Entry<String,List<String>> entry : fateStatus.getDanglingWaitingLocks().entrySet())
+ fmt.format("txid: %s locking: %s%n", entry.getKey(), entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
index c98174e..42d2699 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CancelCompactions.java
@@ -73,7 +73,7 @@ public class CancelCompactions extends MasterRepo {
}
});
- return new FinishCancelCompaction(tableId);
+ return new FinishCancelCompaction(getNamespaceId(environment), tableId);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
index 45fc8df..2bb34d2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/FinishCancelCompaction.java
@@ -16,20 +16,28 @@
*/
package org.apache.accumulo.master.tableOps;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
class FinishCancelCompaction extends MasterRepo {
private static final long serialVersionUID = 1L;
private String tableId;
+ private String namespaceId;
- public FinishCancelCompaction(String tableId) {
+ private String getNamespaceId(Master env) throws Exception {
+ return Utils.getNamespaceId(env.getInstance(), tableId, TableOperation.COMPACT_CANCEL,
this.namespaceId);
+ }
+
+ public FinishCancelCompaction(String namespaceId, String tableId) {
this.tableId = tableId;
+ this.namespaceId = namespaceId;
}
@Override
public Repo<Master> call(long tid, Master environment) throws Exception {
- Utils.getReadLock(tableId, tid).unlock();
+ Utils.unreserveTable(tableId, tid, false);
+ Utils.unreserveNamespace(getNamespaceId(environment), tid, false);
return null;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
index 54cb738..0d91bb0 100644
--- a/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -58,8 +58,10 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -82,6 +84,11 @@ public class TableOperationsIT extends AccumuloClusterIT {
connector = getConnector();
}
+ @After
+ public void checkForDanglingFateLocks() {
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
@Test
public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException,
TableNotFoundException, TException {
String tableName = getUniqueNames(1)[0];
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
index 844a4d2..2d1bd15 100644
--- a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.harness.AccumuloClusterIT;
import org.apache.accumulo.test.functional.FunctionalTestUtils;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.hadoop.io.Text;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -58,6 +59,11 @@ public class UserCompactionStrategyIT extends AccumuloClusterIT {
return 3 * 60;
}
+ @After
+ public void checkForDanglingFateLocks() {
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
@Test
public void testDropA() throws Exception {
Connector c = getConnector();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 0c63e59..0116f64 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@ -29,29 +29,19 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.CompactionConfig;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.AdminUtil;
-import org.apache.accumulo.fate.AdminUtil.FateStatus;
-import org.apache.accumulo.fate.ZooStore;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.harness.AccumuloClusterIT;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
import org.apache.hadoop.io.Text;
-import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;
@@ -112,11 +102,7 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
// expected
}
- FateStatus fateStatus = getFateStatus();
-
- // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent
delete tables could fail and leave dangling locks.
- Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
- Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
}
es.shutdown();
@@ -262,26 +248,12 @@ public class ConcurrentDeleteTableIT extends AccumuloClusterIT {
// expected
}
- FateStatus fateStatus = getFateStatus();
-
- // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent
delete tables could fail and leave dangling locks.
- Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
- Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
}
es.shutdown();
}
- private FateStatus getFateStatus() throws KeeperException, InterruptedException {
- Instance instance = getConnector().getInstance();
- AdminUtil<String> admin = new AdminUtil<>(false);
- String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
- IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(),
instance.getZooKeepersSessionTimeOut(), secret);
- ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE,
zk);
- FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS,
null, null);
- return fateStatus;
- }
-
private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException
{
BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 7eb7b89..def1a2c 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -75,6 +75,8 @@ public class FateStarvationIT extends AccumuloClusterIT {
}
c.tableOperations().offline(tableName);
+
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index c548f2f..2fea4c6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -32,9 +32,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -42,13 +46,20 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
import org.apache.accumulo.test.TestIngest;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import com.google.common.collect.Iterators;
@@ -189,4 +200,22 @@ public class FunctionalTestUtils {
return result;
}
+ public static void assertNoDanglingFateLocks(Instance instance, AccumuloCluster cluster)
{
+ FateStatus fateStatus = getFateStatus(instance, cluster);
+ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingHeldLocks(), 0,
fateStatus.getDanglingHeldLocks().size());
+ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingWaitingLocks(),
0, fateStatus.getDanglingWaitingLocks().size());
+ }
+
+ private static FateStatus getFateStatus(Instance instance, AccumuloCluster cluster) {
+ try {
+ AdminUtil<String> admin = new AdminUtil<>(false);
+ String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
+ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(),
instance.getZooKeepersSessionTimeOut(), secret);
+ ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) +
Constants.ZFATE, zk);
+ FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS,
null, null);
+ return fateStatus;
+ } catch (KeeperException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db84650e/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
index 6befd7e..e2ad7ae 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RenameIT.java
@@ -69,6 +69,8 @@ public class RenameIT extends AccumuloClusterIT {
c.tableOperations().rename(name2, name1);
vopts.setTableName(name1);
VerifyIngest.verifyIngest(c, vopts, scanOpts);
+
+ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
}
}
|