cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r765754 - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/Memtable.java test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Date Thu, 16 Apr 2009 20:42:00 GMT
Author: jbellis
Date: Thu Apr 16 20:42:00 2009
New Revision: 765754

URL: http://svn.apache.org/viewvc?rev=765754&view=rev
Log:
make forceFlush block until the flush action is queued on MemtableManager.  That
way calling forceFlush; waitForFlush will be guaranteed that the action waitFF puts
on MtM will run after the flush completes, i.e., the wait will actually do what
it's supposed to.

patch by jbellis; reviewed by Eric Evans for #59

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=765754&r1=765753&r2=765754&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Thu Apr 16 20:42:00
2009
@@ -30,6 +30,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -210,15 +211,8 @@
                 if (!isFrozen_)
                 {
                     isFrozen_ = true;
-                    Runnable flushQueuer = new Runnable()
-                    {
-                        public void run()
-                        {
-                            MemtableManager.instance().submit(cfStore.getColumnFamilyName(),
Memtable.this, cLogCtx);
-                        }
-                    };
                     cfStore.switchMemtable(key, columnFamily, cLogCtx);
-                    executor_.runOnTermination(flushQueuer);
+                    executor_.flushWhenTerminated(cLogCtx);
                     executor_.shutdown();
                 }
                 else
@@ -242,7 +236,8 @@
 
     /*
      * This version is used to switch memtable and force flush.
-     * Flushing is still done in a separate executor -- forceFlush does not block.
+     * Flushing is still done in a separate executor -- forceFlush only blocks
+     * until the flush runnable is queued.
     */
     public void forceflush(ColumnFamilyStore cfStore) throws IOException
     {
@@ -257,10 +252,11 @@
                 rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
             }
             rm.apply();
+            executor_.flushQueuer.get();
         }
-        catch(ColumnFamilyNotDefinedException ex)
+        catch (Exception ex)
         {
-            logger_.debug(LogUtil.throwableToString(ex));
+            throw new RuntimeException(ex);
         }
     }
 
@@ -413,9 +409,9 @@
         columnFamilies_.clear();
     }
 
-    private static class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
+    private class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
     {
-        private ArrayList<Runnable> terminatedHooks = new ArrayList<Runnable>();
+        FutureTask flushQueuer;
 
         public MemtableThreadPoolExecutor()
         {
@@ -426,13 +422,22 @@
         {
             super.terminated();
             runningExecutorServices_.remove(this);
-            for (Runnable hook : terminatedHooks) {
-                hook.run();
+            if (flushQueuer != null)
+            {
+                flushQueuer.run();
             }
         }
 
-        public void runOnTermination(Runnable runnable) {
-            terminatedHooks.add(runnable);
+        public void flushWhenTerminated(final CommitLog.CommitLogContext cLogCtx)
+        {
+            Runnable runnable = new Runnable()
+            {
+                public void run()
+                {
+                    MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx);
+                }
+            };
+            flushQueuer = new FutureTask(runnable, null);
         }
     }
 }

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=765754&r1=765753&r2=765754&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java Thu
Apr 16 20:42:00 2009
@@ -164,7 +164,6 @@
             }
         });
         f.get();
-        Thread.sleep(1000);
     }
 
     private void validateNameSort(Table table)



Mime
View raw message