cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r765343 - in /incubator/cassandra/trunk/src/org/apache/cassandra: concurrent/DebuggableThreadPoolExecutor.java db/Memtable.java
Date Wed, 15 Apr 2009 20:30:44 GMT
Author: jbellis
Date: Wed Apr 15 20:30:43 2009
New Revision: 765343

URL: http://svn.apache.org/viewvc?rev=765343&view=rev
Log:
Move from ExecutorService per ColumnFamily to ES per Memtable.  This allows us to
wait for the ES to quiesce completely before flushing, preventing the possibility
of ConcurrentModificationException when a get scheduled before the switch executes
concurrently with flush.  It also provides a simpler mental model (only one thread
touches memtable at a time, period) which is a valuable property.  Finally, it is
slightly more performant since it avoids hashing the CF name for each operation.

Patch by jbellis; reviewed by Todd Lipcon for #9

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=765343&r1=765342&r2=765343&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Wed Apr 15 20:30:43 2009
@@ -32,7 +32,7 @@
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com
)
  */
 
-public final class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 {
     private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
   
     

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=765343&r1=765342&r2=765343&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Wed Apr 15 20:30:43
2009
@@ -18,15 +18,12 @@
 
 package org.apache.cassandra.db;
 
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.Comparator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -34,7 +31,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -49,6 +45,7 @@
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.service.IPartitioner;
 import org.apache.cassandra.service.StorageService;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com
)
@@ -57,18 +54,19 @@
 public class Memtable implements Comparable<Memtable>
 {
 	private static Logger logger_ = Logger.getLogger( Memtable.class );
-    private static Map<String, ExecutorService> apartments_ = new HashMap<String,
ExecutorService>();
+    private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
     public static final String flushKey_ = "FlushKey";
-    
+
     public static void shutdown()
     {
-    	Set<String> names = apartments_.keySet();
-    	for (String name : names)
-    	{
-    		apartments_.get(name).shutdownNow();
-    	}
+        for (ExecutorService exs : runningExecutorServices_)
+        {
+            exs.shutdownNow();
+        }
     }
 
+    private MemtableThreadPoolExecutor executor_;
+
     private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
     private int thresholdCount_ = DatabaseDescriptor.getMemtableObjectCount()*1024*1024;
     private AtomicInteger currentSize_ = new AtomicInteger(0);
@@ -79,23 +77,15 @@
     private String cfName_;
     /* Creation time of this Memtable */
     private long creationTime_;
-    private boolean isFrozen_ = false;
+    private volatile boolean isFrozen_ = false;
     private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
     /* Lock and Condition for notifying new clients about Memtable switches */
     Lock lock_ = new ReentrantLock();
 
     Memtable(String table, String cfName)
     {
-        if ( apartments_.get(cfName) == null )
-        {
-            apartments_.put(cfName, new DebuggableThreadPoolExecutor( 1,
-                    1,
-                    Integer.MAX_VALUE,
-                    TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<Runnable>(),
-                    new ThreadFactoryImpl("FAST-MEMTABLE-POOL")
-                    ));
-        }
+        executor_ = new MemtableThreadPoolExecutor();
+        runningExecutorServices_.add(executor_);
 
         table_ = table;
         cfName_ = cfName;
@@ -145,28 +135,6 @@
     }
 
     /**
-     * Flushes the current memtable to disk.
-     * 
-     * @author alakshman
-     *
-     */
-    class Flusher implements Runnable
-    {
-        private CommitLog.CommitLogContext cLogCtx_;
-        
-        Flusher(CommitLog.CommitLogContext cLogCtx)
-        {
-            cLogCtx_ = cLogCtx;
-        }
-        
-        public void run()
-        {
-            ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-            MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx_);
-        }
-    }
-
-    /**
      * Compares two Memtable based on creation time.
      * @param rhs Memtable to compare to.
      * @return a negative integer, zero, or a positive integer as this object
@@ -222,8 +190,7 @@
 
     void printExecutorStats()
     {
-    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)apartments_.get(cfName_);
-    	long taskCount = (es.getTaskCount() - es.getCompletedTaskCount());
+    	long taskCount = (executor_.getTaskCount() - executor_.getCompletedTaskCount());
     	logger_.debug("MEMTABLE TASKS : " + taskCount);
     }
 
@@ -232,25 +199,31 @@
      * the memtable. This version will respect the threshold and flush
      * the memtable to disk when the size exceeds the threshold.
     */
-    void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws
IOException
+    void put(String key, ColumnFamily columnFamily, final CommitLog.CommitLogContext cLogCtx)
throws IOException
     {
         if (isThresholdViolated(key) )
         {
             lock_.lock();
             try
             {
-                ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+                final ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
                 if (!isFrozen_)
                 {
                     isFrozen_ = true;
-                    /* Submit this Memtable to be flushed. */
-                    Runnable flusher = new Flusher(cLogCtx);
-                    apartments_.get(cfName_).submit(flusher);   
-                    /* switch the memtable */
+                    Runnable flushQueuer = new Runnable()
+                    {
+                        public void run()
+                        {
+                            MemtableManager.instance().submit(cfStore.getColumnFamilyName(),
Memtable.this, cLogCtx);
+                        }
+                    };
                     cfStore.switchMemtable(key, columnFamily, cLogCtx);
+                    executor_.runOnTermination(flushQueuer);
+                    executor_.shutdown();
                 }
                 else
                 {
+                    // retry the put on the new memtable
                     cfStore.apply(key, columnFamily, cLogCtx);
                 }
             }
@@ -263,7 +236,7 @@
         {
         	printExecutorStats();
         	Runnable putter = new Putter(key, columnFamily);
-        	apartments_.get(cfName_).submit(putter);
+        	executor_.submit(putter);
         }
     }
 
@@ -375,7 +348,7 @@
     	ColumnFamily cf = null;
     	try
     	{
-    		cf = apartments_.get(cfName_).submit(call).get();
+    		cf = executor_.submit(call).get();
     	}
     	catch ( ExecutionException ex )
     	{
@@ -440,4 +413,26 @@
         columnFamilies_.clear();
     }
 
+    private static class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
+    {
+        private ArrayList<Runnable> terminatedHooks = new ArrayList<Runnable>();
+
+        public MemtableThreadPoolExecutor()
+        {
+            super(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryImpl("FAST-MEMTABLE-POOL"));
+        }
+
+        protected void terminated()
+        {
+            super.terminated();
+            runningExecutorServices_.remove(this);
+            for (Runnable hook : terminatedHooks) {
+                hook.run();
+            }
+        }
+
+        public void runOnTermination(Runnable runnable) {
+            terminatedHooks.add(runnable);
+        }
+    }
 }



Mime
View raw message