cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1063751 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/service/
Date Wed, 26 Jan 2011 15:00:23 GMT
Author: jbellis
Date: Wed Jan 26 15:00:23 2011
New Revision: 1063751

URL: http://svn.apache.org/viewvc?rev=1063751&view=rev
Log:
add JVM shutdownhook to sync commitlog
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1919

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan 26 15:00:23 2011
@@ -1,6 +1,7 @@
 0.7.2-dev
- * fix potential overflow in nodetool cfstats
- * offline nodes (CASSANDRA-1951)
+ * fix potential overflow in nodetool cfstats (CASSANDRA-2057)
+ * add JVM shutdownhook to sync commitlog (CASSANDRA-1919)
+ * allow nodes to be up without being part of  normal traffic (CASSANDRA-1951)
 
 
 0.7.1

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -61,19 +61,12 @@ public abstract class AbstractCommitLogE
         return completedTaskCount;
     }
 
-    // cassandra is crash-only so there's no need to implement the shutdown methods
-
-    public boolean isShutdown()
-    {
-        return false;
-    }
-
     public boolean isTerminated()
     {
-        return false;
+        throw new UnsupportedOperationException();
     }
 
-    public void shutdown()
+    public boolean isShutdown()
     {
         throw new UnsupportedOperationException();
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -31,6 +31,8 @@ import org.apache.cassandra.utils.Wrappe
 class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService implements ICommitLogExecutorService,
BatchCommitLogExecutorServiceMBean
 {
     private final BlockingQueue<CheaterFutureTask> queue;
+    private final Thread appendingThread;
+    private volatile boolean run = true;
 
     public BatchCommitLogExecutorService()
     {
@@ -44,14 +46,15 @@ class BatchCommitLogExecutorService exte
         {
             public void runMayThrow() throws Exception
             {
-                while (true)
+                while (run)
                 {
-                    processWithSyncBatch();
-                    completedTaskCount++;
+                    if (processWithSyncBatch())
+                        completedTaskCount++;
                 }
             }
         };
-        new Thread(runnable, "COMMIT-LOG-WRITER").start();
+        appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
+        appendingThread.start();
 
         registerMBean(this);
     }
@@ -63,13 +66,15 @@ class BatchCommitLogExecutorService exte
 
     private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
     private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify
this
-    private void processWithSyncBatch() throws Exception
+    private boolean processWithSyncBatch() throws Exception
     {
-        CheaterFutureTask firstTask = queue.take();
+        CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
+        if (firstTask == null)
+            return false;
         if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
         {
             firstTask.run();
-            return;
+            return true;
         }
 
         // attempt to do a bunch of LogRecordAdder ops before syncing
@@ -105,6 +110,7 @@ class BatchCommitLogExecutorService exte
         {
             incompleteTasks.get(i).set(taskValues.get(i));
         }
+        return true;
     }
 
 
@@ -148,6 +154,25 @@ class BatchCommitLogExecutorService exte
         }
     }
 
+    public void shutdown()
+    {
+        new Thread(new WrappedRunnable()
+        {
+            public void runMayThrow() throws InterruptedException, IOException
+            {
+                while (!queue.isEmpty())
+                    Thread.sleep(100);
+                run = false;
+                appendingThread.join();
+            }
+        }, "Commitlog Shutdown").start();
+    }
+
+    public void awaitTermination() throws InterruptedException
+    {
+        appendingThread.join();
+    }
+
     private static class CheaterFutureTask<V> extends FutureTask<V>
     {
         private final Callable rawCallable;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Wed Jan 26 15:00:23 2011
@@ -114,45 +114,9 @@ public class CommitLog
         // All we need to do is create a new one.
         segments.add(new CommitLogSegment());
 
-        if (DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch)
-        {
-            executor = new BatchCommitLogExecutorService();
-        }
-        else
-        {
-            executor = new PeriodicCommitLogExecutorService();
-            final Callable syncer = new Callable()
-            {
-                public Object call() throws Exception
-                {
-                    sync();
-                    return null;
-                }
-            };
-
-            new Thread(new Runnable()
-            {
-                public void run()
-                {
-                    while (true)
-                    {
-                        try
-                        {
-                            executor.submit(syncer).get();
-                            Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError(e);
-                        }
-                        catch (ExecutionException e)
-                        {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
-            }, "PERIODIC-COMMIT-LOG-SYNCER").start();
-        }
+        executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
+                 ? new BatchCommitLogExecutorService()
+                 : new PeriodicCommitLogExecutorService(this);
     }
 
     public void resetUnsafe()
@@ -527,4 +491,10 @@ public class CommitLog
             return null;
         }
     }
+
+    public void shutdownBlocking() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination();
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/ICommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -38,4 +38,9 @@ public interface ICommitLogExecutorServi
      */
     public void add(CommitLog.LogRecordAdder adder);
 
+    /** shuts down the CommitLogExecutor in an orderly fashion */
+    public void shutdown();
+
+    /** Blocks until shutdown is complete. */
+    public void awaitTermination() throws InterruptedException;
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
Wed Jan 26 15:00:23 2011
@@ -21,35 +21,71 @@ package org.apache.cassandra.db.commitlo
  */
 
 
+import java.io.IOException;
 import java.util.concurrent.*;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 class PeriodicCommitLogExecutorService implements ICommitLogExecutorService, PeriodicCommitLogExecutorServiceMBean
 {
     private final BlockingQueue<Runnable> queue;
     protected volatile long completedTaskCount = 0;
+    private final Thread appendingThread;
+    private volatile boolean run = true;
 
-    public PeriodicCommitLogExecutorService()
+    public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        this(1024 * Runtime.getRuntime().availableProcessors());
-    }
-
-    public PeriodicCommitLogExecutorService(int queueSize)
-    {
-        queue = new LinkedBlockingQueue<Runnable>(queueSize);
+        queue = new LinkedBlockingQueue<Runnable>(1024 * Runtime.getRuntime().availableProcessors());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception
             {
-                while (true)
+                while (run)
                 {
-                    queue.take().run();
+                    Runnable r = queue.poll(100, TimeUnit.MILLISECONDS);
+                    if (r == null)
+                        continue;
+                    r.run();
                     completedTaskCount++;
                 }
+                commitLog.sync();
+            }
+        };
+        appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
+        appendingThread.start();
+
+        final Callable syncer = new Callable()
+        {
+            public Object call() throws Exception
+            {
+                commitLog.sync();
+                return null;
             }
         };
-        new Thread(runnable, "COMMIT-LOG-WRITER").start();
+
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                while (run)
+                {
+                    try
+                    {
+                        submit(syncer).get();
+                        Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                    catch (ExecutionException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }, "PERIODIC-COMMIT-LOG-SYNCER").start();
 
         AbstractCommitLogExecutorService.registerMBean(this);
     }
@@ -80,6 +116,25 @@ class PeriodicCommitLogExecutorService i
         return ft;
     }
 
+    public void shutdown()
+    {
+        new Thread(new WrappedRunnable()
+        {
+            public void runMayThrow() throws InterruptedException, IOException
+            {
+                while (!queue.isEmpty())
+                    Thread.sleep(100);
+                run = false;
+                appendingThread.join();
+            }
+        }, "Commitlog Shutdown").start();
+    }
+
+    public void awaitTermination() throws InterruptedException
+    {
+        appendingThread.join();
+    }
+
     public long getPendingTasks()
     {
         return queue.size();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1063751&r1=1063750&r2=1063751&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Wed Jan 26 15:00:23 2011
@@ -33,6 +33,8 @@ import javax.management.ObjectName;
 import com.google.common.base.Charsets;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.locator.*;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
@@ -378,6 +380,22 @@ public class StorageService implements I
             }
         }
 
+        // daemon threads, like our executors', continue to run while shutdown hooks are
invoked
+        Thread drainOnShutdown = new Thread(new WrappedRunnable()
+        {
+            public void runMayThrow() throws ExecutionException, InterruptedException, IOException
+            {
+                ThreadPoolExecutor mutationStage = StageManager.getStage(Stage.MUTATION);
+                if (!mutationStage.isShutdown())
+                {
+                    mutationStage.shutdown();
+                    mutationStage.awaitTermination(1, TimeUnit.SECONDS);
+                    CommitLog.instance.shutdownBlocking();
+                }
+            }
+        });
+        Runtime.getRuntime().addShutdownHook(drainOnShutdown);
+
         if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
         {
             joinTokenRing();
@@ -1899,6 +1917,8 @@ public class StorageService implements I
         ColumnFamilyStore.postFlushExecutor.shutdown();
         ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
 
+        CommitLog.instance.shutdownBlocking();
+
         // want to make sure that any segments deleted as a result of flushing are gone.
         DeletionService.waitFor();
 



Mime
View raw message