cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject cassandra git commit: cancel any rejected tasks on shutdown so callers who care can progress
Date Tue, 19 May 2015 14:12:54 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 d693ca12c -> 7af9c6ac8


cancel any rejected tasks on shutdown so callers who care can progress

patch by tjake; reviewed by Sergio Bossa for CASSANDRA-9417


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7af9c6ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7af9c6ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7af9c6ac

Branch: refs/heads/cassandra-2.1
Commit: 7af9c6ac88278d34202fea922606905b326f7811
Parents: d693ca1
Author: T Jake Luciani <jake@apache.org>
Authored: Mon May 18 15:18:24 2015 -0400
Committer: T Jake Luciani <jake@apache.org>
Committed: Tue May 19 10:12:07 2015 -0400

----------------------------------------------------------------------
 .../DebuggableScheduledThreadPoolExecutor.java  |   9 ++
 ...buggableScheduledThreadPoolExecutorTest.java | 115 +++++++++++++++++++
 2 files changed, 124 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7af9c6ac/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index 7226db8..ea0715c 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -31,6 +31,11 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
  *
  * DebuggableScheduledThreadPoolExecutor also catches exceptions during Task execution
  * so that they don't supress subsequent invocations of the task.
+ *
+ * Finally, there is a special rejected execution handler for tasks rejected during the shutdown
hook.
+ *
+ * For fire and forget tasks (like ref tidy) we can safely ignore the exceptions.
+ * For any callers that care to know their task was rejected we cancel passed task.
  */
 public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
 {
@@ -45,6 +50,10 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
                 if (!StorageService.instance.isInShutdownHook())
                     throw new RejectedExecutionException("ScheduledThreadPoolExecutor has
shut down.");
 
+                //Give some notification to the caller the task isn't going to run
+                if (task instanceof Future)
+                    ((Future) task).cancel(false);
+
                 logger.debug("ScheduledThreadPoolExecutor has shut down as part of C* shutdown");
             }
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7af9c6ac/test/unit/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutorTest.java
b/test/unit/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutorTest.java
new file mode 100644
index 0000000..46b2764
--- /dev/null
+++ b/test/unit/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.io.IOException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.StorageService;
+
+public class DebuggableScheduledThreadPoolExecutorTest
+{
+
+    static EmbeddedCassandraService service;
+
+    @BeforeClass
+    public static void startup() throws IOException
+    {
+        //The DSTPE checks for if we are in the service shutdown hook so
+        //to test it we need to start C* internally.
+        service = new EmbeddedCassandraService();
+        service.start();
+    }
+
+    @Test
+    public void testShutdown() throws ExecutionException, InterruptedException, IOException
+    {
+        DebuggableScheduledThreadPoolExecutor testPool = new DebuggableScheduledThreadPoolExecutor("testpool");
+
+        final AtomicInteger value = new AtomicInteger(0);
+
+        //Normal scheduled task
+        ScheduledFuture future = testPool.schedule(new Runnable()
+        {
+            public void run()
+            {
+                value.incrementAndGet();
+            }
+        }, 1, TimeUnit.SECONDS);
+
+        future.get();
+        assert value.get() == 1;
+
+
+        //Shut down before schedule
+        future = testPool.schedule(new Runnable()
+        {
+            public void run()
+            {
+                value.incrementAndGet();
+            }
+        }, 10, TimeUnit.SECONDS);
+
+
+        StorageService.instance.drain();
+        testPool.shutdown();
+
+        future.get();
+        assert value.get() == 2;
+
+
+        //Now shut down verify task isn't just swallowed
+        future = testPool.schedule(new Runnable()
+        {
+            public void run()
+            {
+                value.incrementAndGet();
+            }
+        }, 1, TimeUnit.SECONDS);
+
+
+        try
+        {
+            future.get(2, TimeUnit.SECONDS);
+            Assert.fail("Task should be cancelled");
+        }
+        catch (CancellationException e)
+        {
+
+        }
+        catch (TimeoutException e)
+        {
+            Assert.fail("Task should be cancelled");
+        }
+
+        assert future.isCancelled();
+        assert value.get() == 2;
+    }
+}


Mime
View raw message