distributedlog-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [12/20] incubator-distributedlog git commit: DL-106: Use namespace after it is closed will throw AlreadyClosedException
Date Wed, 28 Dec 2016 01:05:20 GMT
DL-106: Use namespace after it is closed will throw AlreadyClosedException


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/ab0868cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/ab0868cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/ab0868cc

Branch: refs/heads/master
Commit: ab0868cceae28730e103c0cf587497719bbc2758
Parents: a9cbb2c
Author: Yiming Zang <yzang@twitter.com>
Authored: Fri Aug 12 10:30:39 2016 -0700
Committer: Sijie Guo <sijieg@twitter.com>
Committed: Tue Dec 27 16:49:28 2016 -0800

----------------------------------------------------------------------
 .../BKDistributedLogNamespace.java              | 43 ++++++++----
 .../TestBKDistributedLogNamespace.java          | 72 ++++++++++++++++++++
 2 files changed, 101 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ab0868cc/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 281c637..cae6f6a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -32,6 +32,7 @@ import com.twitter.distributedlog.bk.LedgerAllocator;
 import com.twitter.distributedlog.bk.LedgerAllocatorUtils;
 import com.twitter.distributedlog.callback.NamespaceListener;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LogNotFoundException;
@@ -86,6 +87,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.twitter.distributedlog.impl.BKDLUtils.*;
 
@@ -317,7 +319,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
     private final StatsLogger perLogStatsLogger;
     private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
 
-    protected boolean closed = false;
+    protected AtomicBoolean closed = new AtomicBoolean(false);
 
     private final PermitLimiter writeLimiter;
 
@@ -494,6 +496,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
     @Override
     public void createLog(String logName)
             throws InvalidStreamNameException, IOException {
+        checkState();
         validateName(logName);
         URI uri = FutureUtils.result(metadataStore.createLog(logName));
         createUnpartitionedStreams(conf, uri, Lists.newArrayList(logName));
@@ -502,6 +505,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
     @Override
     public void deleteLog(String logName)
             throws InvalidStreamNameException, LogNotFoundException, IOException {
+        checkState();
         validateName(logName);
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
         if (!uri.isPresent()) {
@@ -532,6 +536,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
                                          Optional<DynamicDistributedLogConfiguration>
dynamicLogConf,
                                          Optional<StatsLogger> perStreamStatsLogger)
             throws InvalidStreamNameException, IOException {
+        checkState();
         validateName(logName);
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
         if (!uri.isPresent()) {
@@ -549,12 +554,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
     @Override
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
+        checkState();
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
         return uri.isPresent() && checkIfLogExists(conf, uri.get(), logName);
     }
 
     @Override
     public Iterator<String> getLogs() throws IOException {
+        checkState();
         return FutureUtils.result(metadataStore.getLogs());
     }
 
@@ -565,6 +572,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
 
     @Override
     public synchronized AccessControlManager createAccessControlManager() throws IOException
{
+        checkState();
         if (null == accessControlManager) {
             String aclRootPath = bkdlConfig.getACLRootPath();
             // Build the access control manager
@@ -614,9 +622,9 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
     }
 
     private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName,
-                                                                DistributedLogConfiguration
conf,
-                                                                String zkServers,
-                                                                StatsLogger statsLogger)
{
+                                                                  DistributedLogConfiguration
conf,
+                                                                  String zkServers,
+                                                                  StatsLogger statsLogger)
{
         RetryPolicy retryPolicy = null;
         if (conf.getZKNumRetries() > 0) {
             retryPolicy = new BoundExponentialBackoffRetryPolicy(
@@ -633,7 +641,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
             .statsLogger(statsLogger)
             .zkAclId(conf.getZkAclId());
         LOG.info("Created shared zooKeeper client builder {}: zkServers = {}, numRetries
= {}, sessionTimeout = {}, retryBackoff = {},"
-                 + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers,
conf.getZKNumRetries(),
+                + " maxRetryBackoff = {}, zkAclId = {}.", new Object[] { zkcName, zkServers,
conf.getZKNumRetries(),
                 conf.getZKSessionTimeoutMilliseconds(), conf.getZKRetryBackoffStartMillis(),
                 conf.getZKRetryBackoffMaxMillis(), conf.getZkAclId() });
         return builder;
@@ -678,7 +686,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
                 .featureProvider(featureProviderOptional)
                 .statsLogger(statsLogger);
         LOG.info("Created shared client builder {} : zkServers = {}, ledgersPath = {}, numIOThreads
= {}",
-                 new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads()
});
+                new Object[] { bkcName, zkServers, ledgersPath, conf.getBKClientNumberIOThreads()
});
         return builder;
     }
 
@@ -711,6 +719,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
      * @throws IOException
      */
     private <T> T withZooKeeperClient(ZooKeeperClientHandler<T> handler) throws
IOException {
+        checkState();
         return handler.handle(sharedWriterZKCForDL);
     }
 
@@ -815,6 +824,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
             Optional<StatsLogger> perStreamStatsLogger)
         throws InvalidStreamNameException, IOException {
         // Make sure the name is well formed
+        checkState();
         validateName(nameOfLogStream);
 
         DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration();
@@ -918,6 +928,7 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
         if (bkdlConfig.isFederatedNamespace()) {
             throw new UnsupportedOperationException("Use DistributedLogNamespace methods
for federated namespace");
         }
+        checkState();
         validateName(nameOfMetadataNode);
         return new ZKMetadataAccessor(nameOfMetadataNode, conf, namespace,
                 sharedWriterZKCBuilderForDL, sharedReaderZKCBuilderForDL, statsLogger);
@@ -1035,6 +1046,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
         }, conf, uri);
     }
 
+    private void checkState() throws IOException {
+        if (closed.get()) {
+            LOG.error("BKDistributedLogNamespace {} is already closed", namespace);
+            throw new AlreadyClosedException("Namespace " + namespace + " is already closed");
+        }
+    }
+
     /**
      * Close the distributed log manager factory, freeing any resources it may hold.
      */
@@ -1043,16 +1061,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
         ZooKeeperClient writerZKC;
         ZooKeeperClient readerZKC;
         AccessControlManager acm;
-        synchronized (this) {
-            if (closed) {
-                return;
-            }
-            closed = true;
+        if (closed.compareAndSet(false, true)) {
             writerZKC = sharedWriterZKCForBK;
             readerZKC = sharedReaderZKCForBK;
             acm = accessControlManager;
+        } else {
+            return;
         }
-
         if (null != acm) {
             acm.close();
             LOG.info("Access Control Manager Stopped.");
@@ -1070,11 +1085,11 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace
{
 
         // Shutdown the schedulers
         SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
-                TimeUnit.MILLISECONDS);
+            TimeUnit.MILLISECONDS);
         LOG.info("Executor Service Stopped.");
         if (scheduler != readAheadExecutor) {
             SchedulerUtils.shutdownScheduler(readAheadExecutor, conf.getSchedulerShutdownTimeoutMs(),
-                    TimeUnit.MILLISECONDS);
+                TimeUnit.MILLISECONDS);
             LOG.info("ReadAhead Executor Service Stopped.");
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ab0868cc/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
index b3724bc..e68b916 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKDistributedLogNamespace.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Sets;
 import com.twitter.distributedlog.callback.NamespaceListener;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.ZKException;
@@ -41,6 +42,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -419,4 +421,74 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase
{
         testConf.setLedgerAllocatorPoolName(null);
         validateBadAllocatorConfiguration(testConf, uri);
     }
+
+    @Test(timeout = 60000)
+    public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
+        URI uri = createDLMURI("/" + runtime.getMethodName());
+        BKDistributedLogNamespace namespace = BKDistributedLogNamespace.newBuilder()
+            .conf(conf)
+            .uri(uri)
+            .build();
+        // before closing the namespace, no exception should be thrown
+        String logName = "test-stream";
+        // create a log
+        namespace.createLog(logName);
+        // log exists
+        Assert.assertTrue(namespace.logExists(logName));
+        // create a dlm
+        DistributedLogManager dlm = namespace.openLog(logName);
+        // do some writes
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
+        for (long i = 0; i < 3; i++) {
+            LogRecord record = DLMTestUtil.getLargeLogRecordInstance(i);
+            writer.write(record);
+        }
+        writer.closeAndComplete();
+        // do some reads
+        LogReader reader = dlm.getInputStream(0);
+        for (long i = 0; i < 3; i++) {
+            Assert.assertEquals(reader.readNext(false).getTransactionId(), i);
+        }
+        namespace.deleteLog(logName);
+        Assert.assertFalse(namespace.logExists(logName));
+
+        // now try to close the namespace
+        namespace.close();
+        try {
+            namespace.createLog(logName);
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+        try {
+            namespace.openLog(logName);
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+        try {
+            namespace.logExists(logName);
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+        try {
+            namespace.getLogs();
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+        try {
+            namespace.deleteLog(logName);
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+        try {
+            namespace.createAccessControlManager();
+            fail("Should throw exception after namespace is closed");
+        } catch (AlreadyClosedException e) {
+            // No-ops
+        }
+    }
 }


Mime
View raw message