cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1163898 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java src/java/org/apache/cassandra/scheduler/WeightedQueue.java
Date Thu, 01 Sep 2011 03:14:12 GMT
Author: jbellis
Date: Thu Sep  1 03:14:12 2011
New Revision: 1163898

URL: http://svn.apache.org/viewvc?rev=1163898&view=rev
Log:
Properly throw timeouts, decrement the count of waiters on timeout, fix off-by-one in taskCount
patch by Stu Hood; reviewed by Ryan King for CASSANDRA-3096

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep  1 03:14:12 2011
@@ -42,7 +42,7 @@
  * Add "install" command to cassandra.bat (CASSANDRA-292)
  * clean up KSMetadata, CFMetadata from unnecessary
    Thrift<->Avro conversion methods (CASSANDRA-3032)
- * Add timeouts to client request schedulers (CASSANDRA-3079)
+ * Add timeouts to client request schedulers (CASSANDRA-3079, 3096)
  * Cli to use hashes rather than array of hashes for strategy options (CASSANDRA-3081)
  * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087)
  * Improvements of the CLI `describe` command (CASSANDRA-2630)

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java Thu Sep
 1 03:14:12 2011
@@ -43,7 +43,6 @@ public class RoundRobinScheduler impleme
 
     //Map of queue id to weighted queue
     private final NonBlockingHashMap<String, WeightedQueue> queues;
-    private static boolean started = false;
 
     private final Semaphore taskCount;
 
@@ -56,12 +55,12 @@ public class RoundRobinScheduler impleme
 
     public RoundRobinScheduler(RequestSchedulerOptions options)
     {
-        assert !started;
-
         defaultWeight = options.default_weight;
         weights = options.weights;
 
-        taskCount = new Semaphore(options.throttle_limit);
+        // the task count is acquired for the first time _after_ releasing a thread, so we
pre-decrement
+        taskCount = new Semaphore(options.throttle_limit - 1);
+
         queues = new NonBlockingHashMap<String, WeightedQueue>();
         Runnable runnable = new Runnable()
         {
@@ -75,7 +74,6 @@ public class RoundRobinScheduler impleme
         };
         Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
         scheduler.start();
-        started = true;
         logger.info("Started the RoundRobin Request Scheduler");
     }
 
@@ -86,7 +84,21 @@ public class RoundRobinScheduler impleme
         try
         {
             queueSize.release();
-            weightedQueue.put(t, timeoutMS);
+            try
+            {
+                weightedQueue.put(t, timeoutMS);
+                // the scheduler will release us when a slot is available
+            }
+            catch (TimeoutException e)
+            {
+                queueSize.acquireUninterruptibly();
+                throw e;
+            }
+            catch (InterruptedException e)
+            {
+                queueSize.acquireUninterruptibly();
+                throw e;
+            }
         }
         catch (InterruptedException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java Thu Sep  1
03:14:12 2011
@@ -60,7 +60,8 @@ class WeightedQueue implements WeightedQ
 
     public void put(Thread t, long timeoutMS) throws InterruptedException, TimeoutException
     {
-        queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS);
+        if (!queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS))
+            throw new TimeoutException("Failed to acquire request scheduler slot for '" +
key + "'");
     }
 
     public Thread poll()



Mime
View raw message