cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r899042 - in /incubator/cassandra/trunk: ./ interface/gen-java/org/apache/cassandra/service/ src/java/org/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/utils/ test/unit/org/ test/...
Date Thu, 14 Jan 2010 03:18:35 GMT
Author: jbellis
Date: Thu Jan 14 03:18:34 2010
New Revision: 899042

URL: http://svn.apache.org/viewvc?rev=899042&view=rev
Log:
merge from 0.5 branch

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/concurrent/
      - copied from r899039, incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/concurrent/
Modified:
    incubator/cassandra/trunk/   (props changed)
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
  (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
  (props changed)
    incubator/cassandra/trunk/src/java/org/   (props changed)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
    incubator/cassandra/trunk/test/unit/org/   (props changed)

Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-898853
+/incubator/cassandra/branches/cassandra-0.5:888872-899039

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Thu Jan 14 03:18:34 2010
@@ -5,6 +5,8 @@
 0.5.0 final
  * avoid attempting to delete temporary bootstrap files twice (CASSANDRA-681)
  * fix bogus NaN in nodeprobe cfstats output (CASSANDRA-646)
+ * provide a policy for dealing with single thread executors w/ a full queue
+   (CASSANDRA-694)
 
 
 0.5.0 RC3

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-899039
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-899039
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-899039
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-899039
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-899039
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588

Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-899039
 /incubator/cassandra/trunk/src/java/org:749219-769885

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
Thu Jan 14 03:18:34 2010
@@ -1,71 +1,83 @@
-package org.apache.cassandra.concurrent;
-
-import java.util.concurrent.*;
-
-import org.apache.log4j.Logger;
-
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
-{
-    protected static Logger logger = Logger.getLogger(JMXEnabledThreadPoolExecutor.class);
-
-    public DebuggableThreadPoolExecutor(String threadPoolName)
-    {
-        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName));
-    }
-
-    public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
-    {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
-
-        if (maximumPoolSize > 1)
-        {
-            this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-        }
-        else
-        {
-            // preserve task serialization.  this is more complicated than it needs to be,
-            // since TPE rejects if queue.offer reports a full queue.
-            // the easiest option (since most of TPE.execute deals with private members)
-            // appears to be to wrap the given queue class with one whose offer
-            // simply delegates to put().  this would be ugly, since it violates both
-            // the spirit and letter of queue.offer, but effective.
-            // so far, though, all our serialized executors use unbounded queues,
-            // so actually implementing this has not been necessary.
-            this.setRejectedExecutionHandler(new RejectedExecutionHandler()
-            {
-                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
-                {
-                    throw new AssertionError("Blocking serialized executor is not yet implemented");
-                }
-            });
-        }
-    }
-
-    public void afterExecute(Runnable r, Throwable t)
-    {
-        super.afterExecute(r,t);
-
-        // exceptions wrapped by FutureTask
-        if (r instanceof FutureTask)
-        {
-            try
-            {
-                ((FutureTask) r).get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-            catch (ExecutionException e)
-            {
-                logger.error("Error in executor futuretask", e);
-            }
-        }
-
-        // exceptions for non-FutureTask runnables [i.e., added via execute() instead of
submit()]
-        if (t != null)
-        {
-            logger.error("Error in ThreadPoolExecutor", t);
-        }
-    }
-}
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.log4j.Logger;
+
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+{
+    protected static Logger logger = Logger.getLogger(JMXEnabledThreadPoolExecutor.class);
+
+    public DebuggableThreadPoolExecutor(String threadPoolName)
+    {
+        this(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(threadPoolName));
+    }
+
+    public DebuggableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+
+        if (maximumPoolSize > 1)
+        {
+            // clearly strict serialization is not a requirement.  just make the calling
thread execute.
+            this.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        }
+        else
+        {
+            // preserve task serialization.  this is more complicated than it needs to be,
+            // since TPE rejects if queue.offer reports a full queue.  we'll just
+            // override this with a handler that retries until it gets in.  ugly, but effective.
+            // (there is an extensive analysis of the options here at
+            //  http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html)
+            this.setRejectedExecutionHandler(new RejectedExecutionHandler()
+            {
+                public void rejectedExecution(Runnable task, ThreadPoolExecutor executor)
+                {
+                    BlockingQueue<Runnable> queue = executor.getQueue();
+                    while (true)
+                    {
+                        if (executor.isShutdown())
+                            throw new RejectedExecutionException("ThreadPoolExecutor has
shut down");
+                        try
+                        {
+                            if (queue.offer(task, 1000, TimeUnit.MILLISECONDS))
+                                break;
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError(e);    
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    public void afterExecute(Runnable r, Throwable t)
+    {
+        super.afterExecute(r,t);
+
+        // exceptions wrapped by FutureTask
+        if (r instanceof FutureTask)
+        {
+            try
+            {
+                ((FutureTask) r).get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            catch (ExecutionException e)
+            {
+                logger.error("Error in executor futuretask", e);
+            }
+        }
+
+        // exceptions for non-FutureTask runnables [i.e., added via execute() instead of
submit()]
+        if (t != null)
+        {
+            logger.error("Error in ThreadPoolExecutor", t);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu
Jan 14 03:18:34 2010
@@ -80,7 +80,7 @@
                                                Runtime.getRuntime().availableProcessors(),
                                                Integer.MAX_VALUE,
                                                TimeUnit.SECONDS,
-                                               new LinkedBlockingQueue<Runnable>(2
* Runtime.getRuntime().availableProcessors()),
+                                               new LinkedBlockingQueue<Runnable>(1
+ 2 * Runtime.getRuntime().availableProcessors()),
                                                new NamedThreadFactory("FLUSH-SORTER-POOL"));
     private static ExecutorService flushWriter_
             = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java?rev=899042&r1=899041&r2=899042&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/WrappedRunnable.java Thu
Jan 14 03:18:34 2010
@@ -1,18 +1,39 @@
-package org.apache.cassandra.utils;
-
-public abstract class WrappedRunnable implements Runnable
-{
-    public final void run()
-    {
-        try
-        {
-            runMayThrow();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    abstract protected void runMayThrow() throws Exception;
-}
+package org.apache.cassandra.utils;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+public abstract class WrappedRunnable implements Runnable
+{
+    public final void run()
+    {
+        try
+        {
+            runMayThrow();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    abstract protected void runMayThrow() throws Exception;
+}

Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 14 03:18:34 2010
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-898853
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-899039
 /incubator/cassandra/trunk/test/unit/org:749219-768583



Mime
View raw message