cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1081925 - in /cassandra/trunk: ./ conf/ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/c...
Date Tue, 15 Mar 2011 20:12:04 GMT
Author: jbellis
Date: Tue Mar 15 20:12:03 2011
New Revision: 1081925

URL: http://svn.apache.org/viewvc?rev=1081925&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7:1026516-1081897
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7:1026516-1081924
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Mar 15 20:12:03 2011
@@ -18,8 +18,10 @@
  * fix tombstone handling in repair and sstable2json (CASSANDRA-2279)
  * clear Built flag in system table when dropping an index (CASSANDRA-2320)
  * validate index names (CASSANDRA-1761)
+ * add memtable_flush_queue_size defaulting to 4 (CASSANDRA-2333)
  * allow job configuration to set the CL used in Hadoop jobs (CASSANDRA-2331)
  * queue secondary indexes for flush before the parent (CASSANDRA-2330)
+ * shut down server for OOM on a Thrift thread (CASSANDRA-2269)
 
 
 >>>>>>> .merge-right.r1081840

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Tue Mar 15 20:12:03 2011
@@ -148,6 +148,11 @@ concurrent_writes: 32
 # By default this will be set to the amount of data directories defined.
 #memtable_flush_writers: 1
 
+# the number of full memtables to allow pending flush, that is,
+# waiting for a writer thread.  At a minimum, this should be set to
+# the maximum number of secondary indexes created on a single CF.
+memtable_flush_queue_size: 4
+
 # Buffer size to use when performing contiguous column slices. 
 # Increase this to the size of the column slices you typically perform
 sliced_buffer_size_in_kb: 64

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1081897
+/cassandra/branches/cassandra-0.7/contrib:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 15 20:12:03 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777,1076891
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1081897
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1081924
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -26,59 +26,74 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This class encorporates some Executor best practices for Cassandra.  Most of the executors
in the system
+ * should use or extend this.  There are two main improvements over a vanilla TPE:
+ *
+ * - If a task throws an exception, the default uncaught exception handler will be invoked;
if there is
+ *   no such handler, the exception will be logged.
+ * - MaximumPoolSize is not supported.  Here is what that means (quoting TPE javadoc):
+ *
+ *     If fewer than corePoolSize threads are running, the Executor always prefers adding
a new thread rather than queuing.
+ *     If corePoolSize or more threads are running, the Executor always prefers queuing a
request rather than adding a new thread.
+ *     If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize,
in which case, the task will be rejected.
+ *
+ *   We don't want this last stage of creating new threads if the queue is full; it makes
it needlessly difficult to
+ *   reason about the system's behavior.  In other words, if DebuggableTPE has allocated
our maximum number of (core)
+ *   threads and the queue is full, we want the enqueuer to block.  But to allow the number
of threads to drop if a
+ *   stage is less busy, core thread timeout is enabled.
+ */
 public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 {
     protected static Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
 
     public DebuggableThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName, priority));
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName, priority));
     }
 
-    public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        allowCoreThreadTimeOut(true);
 
-        if (maximumPoolSize > 1)
+        // preserve task serialization.  this is more complicated than it needs to be,
+        // since TPE rejects if queue.offer reports a full queue.  we'll just
+        // override this with a handler that retries until it gets in.  ugly, but effective.
+        // (there is an extensive analysis of the options here at
+        //  http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+        this.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
-            // clearly strict serialization is not a requirement.  just make the calling
thread execute.
-            this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-        }
-        else
-        {
-            // preserve task serialization.  this is more complicated than it needs to be,
-            // since TPE rejects if queue.offer reports a full queue.  we'll just
-            // override this with a handler that retries until it gets in.  ugly, but effective.
-            // (there is an extensive analysis of the options here at
-            //  http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
-            this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+            public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
             {
-                public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
+                BlockingQueue<Runnable> queue = executor.getQueue();
+                while (true)
                 {
-                    BlockingQueue<Runnable> queue = executor.getQueue();
-                    while (true)
+                    if (executor.isShutdown())
+                        throw new RejectedExecutionException("ThreadPoolExecutor has shut
down");
+                    try
+                    {
+                        if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                            break;
+                    }
+                    catch (InterruptedException e)
                     {
-                        if (executor.isShutdown())
-                            throw new RejectedExecutionException("ThreadPoolExecutor has
shut down");
-                        try
-                        {
-                            if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
-                                break;
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError(e);    
-                        }
+                        throw new AssertionError(e);
                     }
                 }
-            });
-        }
+            }
+        });
     }
 
+    @Override
     public void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r,t);
+        logExceptionsAfterExecute(r, t);
+    }
 
+    public static void logExceptionsAfterExecute(Runnable r, Throwable t)
+    {
         // exceptions wrapped by FutureTask
         if (r instanceof FutureTask<?>)
         {
@@ -92,7 +107,9 @@ public class DebuggableThreadPoolExecuto
             }
             catch (ExecutionException e)
             {
-                if (Thread.getDefaultUncaughtExceptionHandler() != null)
+                if (Thread.getDefaultUncaughtExceptionHandler() == null)
+                    logger.error("Error in ThreadPoolExecutor", e.getCause());
+                else
                     Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
e.getCause());
             }
         }
@@ -100,7 +117,10 @@ public class DebuggableThreadPoolExecuto
         // exceptions for non-FutureTask runnables [i.e., added via execute() instead of
submit()]
         if (t != null)
         {
-            logger.error("Error in ThreadPoolExecutor", t);
+            if (Thread.getDefaultUncaughtExceptionHandler() == null)
+                logger.error("Error in ThreadPoolExecutor", t);
+            else
+                Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
t);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -25,14 +25,13 @@ public class JMXConfigurableThreadPoolEx
 {
 
     public JMXConfigurableThreadPoolExecutor(int corePoolSize,
-                                             int maximumPoolSize, 
-        	                                 long keepAliveTime, 
+        	                                 long keepAliveTime,
         	                                 TimeUnit unit,
                                              BlockingQueue<Runnable> workQueue, 
                                              NamedThreadFactory threadFactory,
                                              String jmxPath)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
jmxPath);
+        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory, jmxPath);
     }
     
 }
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXConfigurableThreadPoolExecutorMBean.java
Tue Mar 15 20:12:03 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.concurrent;
 
 public interface JMXConfigurableThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
 {
-
     void setCorePoolSize(int n);
 
     int getCorePoolSize();
-    
 }
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Tue Mar 15 20:12:03 2011
@@ -38,28 +38,27 @@ public class JMXEnabledThreadPoolExecuto
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName), "internal");
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName), "internal");
     }
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName, String jmxPath)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName), jmxPath);
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName), jmxPath);
     }
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName, int priority)
     {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName, priority), "internal");
+        this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName, priority), "internal");
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
-                                        int maximumPoolSize,
                                         long keepAliveTime,
                                         TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue,
                                         NamedThreadFactory threadFactory,
                                         String jmxPath)
     {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Mar 15
20:12:03 2011
@@ -41,8 +41,8 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
       
-        stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
-        stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Runtime.getRuntime().availableProcessors()));
+        stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Runtime.getRuntime().availableProcessors()));
         stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE,
getConcurrentReplicators()));
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM));
@@ -50,17 +50,12 @@ public class StageManager
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
-        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Math.max(2, Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
     {
-        // avoid running afoul of requirement in DebuggableThreadPoolExecutor that single-threaded
executors
-        // must have unbounded queues
-        assert numThreads > 1 : "multi-threaded stages must have at least 2 threads";
-
         return new JMXEnabledThreadPoolExecutor(numThreads,
-                                                numThreads,
                                                 KEEPALIVE,
                                                 TimeUnit.SECONDS,
                                                 new LinkedBlockingQueue<Runnable>(),
@@ -70,10 +65,7 @@ public class StageManager
     
     private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
     {
-        assert numThreads > 1 : "multi-threaded stages must have at least 2 threads";
-        
         return new JMXConfigurableThreadPoolExecutor(numThreads,
-                                                     numThreads,
                                                      KEEPALIVE,
                                                      TimeUnit.SECONDS,
                                                      new LinkedBlockingQueue<Runnable>(),

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue Mar 15 20:12:03 2011
@@ -115,6 +115,7 @@ public class Config
     public boolean compaction_preheat_key_cache = true;
 
     public boolean incremental_backups = false;
+    public int memtable_flush_queue_size = 4;
 
     public static enum CommitLogSync {
         periodic,

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Mar 15
20:12:03 2011
@@ -1248,4 +1248,9 @@ public class DatabaseDescriptor
     {
         return conf.incremental_backups;
     }
+
+    public static int getFlushQueueSize()
+    {
+        return conf.memtable_flush_queue_size;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar 15 20:12:03
2011
@@ -79,19 +79,17 @@ public class ColumnFamilyStore implement
      * called, all data up to the given context has been persisted to SSTables.
      */
     private static final ExecutorService flushSorter
-            = new JMXEnabledThreadPoolExecutor(1,
-                                               Runtime.getRuntime().availableProcessors(),
+            = new JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
                                                new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
                                                new NamedThreadFactory("FlushSorter"),
                                                "internal");
     private static final ExecutorService flushWriter
-            = new JMXEnabledThreadPoolExecutor(1,
-                                               DatabaseDescriptor.getFlushWriters(),
+            = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                StageManager.KEEPALIVE,
                                                TimeUnit.SECONDS,
-                                               new SynchronousQueue<Runnable>(),
+                                               new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
                                                new NamedThreadFactory("FlushWriter"),
                                                "internal");
     public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue
Mar 15 20:12:03 2011
@@ -33,6 +33,7 @@ import org.apache.log4j.PropertyConfigur
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -363,6 +364,7 @@ public abstract class AbstractCassandraD
         protected void afterExecute(Runnable r, Throwable t)
         {
             super.afterExecute(r, t);
+            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
             state.get().logout();
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Tue Mar
15 20:12:03 2011
@@ -74,36 +74,23 @@ public class WriteResponseHandler extend
 
     protected int determineBlockFor(String table)
     {
-        int blockFor = 0;
         switch (consistencyLevel)
         {
             case ONE:
-                blockFor = 1;
-                break;
+                return 1;
             case ANY:
-                blockFor = 1;
-                break;
+                return 1;
             case TWO:
-                blockFor = 2;
-                break;
+                return 2;
             case THREE:
-                blockFor = 3;
-                break;
+                return 3;
             case QUORUM:
-                blockFor = (writeEndpoints.size() / 2) + 1;
-                break;
+                return (writeEndpoints.size() / 2) + 1;
             case ALL:
-                blockFor = writeEndpoints.size();
-                break;
+                return writeEndpoints.size();
             default:
                 throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel.toString());
         }
-        // at most one node per range can bootstrap at a time, and these will be added to
the write until
-        // bootstrap finishes (at which point we no longer need to write to the old ones).
-        assert 1 <= blockFor && blockFor <= 2 * Table.open(table).getReplicationStrategy().getReplicationFactor()
-            : String.format("invalid response count %d for replication factor %d",
-                            blockFor, Table.open(table).getReplicationStrategy().getReplicationFactor());
-        return blockFor;
     }
 
     public void assureSufficientLiveNodes() throws UnavailableException

Modified: cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java?rev=1081925&r1=1081924&r2=1081925&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutorTest.java
Tue Mar 15 20:12:03 2011
@@ -35,7 +35,6 @@ public class DebuggableThreadPoolExecuto
     {
         LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(1);
         DebuggableThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(1,
-                                                                                 1,
                                                                                  Integer.MAX_VALUE,
                                                                                  TimeUnit.MILLISECONDS,
                                                                                  q,



Mime
View raw message