cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1161983 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/scheduler/ src/java/org/apache/cassandra/thrift/
Date Fri, 26 Aug 2011 03:54:33 GMT
Author: jbellis
Date: Fri Aug 26 03:54:32 2011
New Revision: 1161983

URL: http://svn.apache.org/viewvc?rev=1161983&view=rev
Log:
Add timeouts to client request schedulers
patch by Stu Hood; reviewed by Melvin Wang for CASSANDRA-3079

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 26 03:54:32 2011
@@ -42,6 +42,7 @@
  * Add "install" command to cassandra.bat (CASSANDRA-292)
  * clean up KSMetadata, CFMetadata from unnecessary
    Thrift<->Avro conversion methods (CASSANDRA-3032)
+ * Add timeouts to client request schedulers (CASSANDRA-3079)
 
 
 0.8.5

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java Fri Aug
26 03:54:32 2011
@@ -20,6 +20,8 @@ package org.apache.cassandra.scheduler;
  * 
  */
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions
object.
  */
@@ -30,8 +32,9 @@ public interface IRequestScheduler
      * 
      * @param t Thread handing the request
      * @param id    Scheduling parameter, an id to distinguish profiles (users/keyspace)
+     * @param timeout   The max time in milliseconds to spend blocking for a slot
      */
-    public void queue(Thread t, String id);
+    public void queue(Thread t, String id, long timeoutMS) throws TimeoutException;
 
     /**
      * A convenience method for indicating when a particular request has completed

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java Fri Aug 26 03:54:32
2011
@@ -34,7 +34,7 @@ public class NoScheduler implements IReq
 
     public NoScheduler() {}
 
-    public void queue(Thread t, String id) {}
+    public void queue(Thread t, String id, long timeoutMS) {}
 
     public void release() {}
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java Fri Aug
26 03:54:32 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.scheduler;
 
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,14 +79,14 @@ public class RoundRobinScheduler impleme
         logger.info("Started the RoundRobin Request Scheduler");
     }
 
-    public void queue(Thread t, String id)
+    public void queue(Thread t, String id, long timeoutMS) throws TimeoutException
     {
         WeightedQueue weightedQueue = getWeightedQueue(id);
 
         try
         {
             queueSize.release();
-            weightedQueue.put(t);
+            weightedQueue.put(t, timeoutMS);
         }
         catch (InterruptedException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java Fri Aug 26
03:54:32 2011
@@ -22,6 +22,8 @@ package org.apache.cassandra.scheduler;
  */
 
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -56,9 +58,9 @@ class WeightedQueue implements WeightedQ
         }
     }
 
-    public void put(Thread t) throws InterruptedException
+    public void put(Thread t, long timeoutMS) throws InterruptedException, TimeoutException
     {
-        queue.put(new WeightedQueue.Entry(t));
+        queue.offer(new WeightedQueue.Entry(t), timeoutMS, TimeUnit.MILLISECONDS);
     }
 
     public Thread poll()

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Aug 26 03:54:32
2011
@@ -122,9 +122,9 @@ public class CassandraServer implements 
         List<Row> rows;
         try
         {
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                schedule();
                 rows = StorageProxy.read(commands, consistency_level);
             }
             finally
@@ -625,23 +625,23 @@ public class CassandraServer implements 
     private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation>
mutations) throws UnavailableException, TimedOutException, InvalidRequestException
     {
         ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level);
+        if (mutations.isEmpty())
+            return;
         try
         {
-            schedule();
-
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                if (!mutations.isEmpty())
-                    StorageProxy.mutate(mutations, consistency_level);
+                StorageProxy.mutate(mutations, consistency_level);
             }
-            catch (TimeoutException e)
+            finally
             {
-                throw new TimedOutException();
+                release();
             }
         }
-        finally
+        catch (TimeoutException e)
         {
-            release();
+            throw new TimedOutException();
         }
     }
 
@@ -686,9 +686,9 @@ public class CassandraServer implements 
             {
                 bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
             }
+            schedule(DatabaseDescriptor.getRpcTimeout());
             try
             {
-                schedule();
                 rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent,
predicate, bounds, range.count), consistency_level);
             }
             finally
@@ -829,9 +829,9 @@ public class CassandraServer implements 
     /**
      * Schedule the current thread for access to the required services
      */
-    private void schedule()
+    private void schedule(long timeoutMS) throws TimeoutException
     {
-        requestScheduler.queue(Thread.currentThread(), state().getSchedulingValue());
+        requestScheduler.queue(Thread.currentThread(), state().getSchedulingValue(), timeoutMS);
     }
 
     /**
@@ -1085,8 +1085,15 @@ public class CassandraServer implements 
         state().hasColumnFamilyAccess(cfname, Permission.WRITE);
         try
         {
-            schedule();
-            StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+            schedule(DatabaseDescriptor.getRpcTimeout());
+            try
+            {
+                StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+            }
+            finally
+            {
+                release();
+            }
         }
         catch (TimeoutException e)
         {
@@ -1096,10 +1103,6 @@ public class CassandraServer implements 
         {
             throw (UnavailableException) new UnavailableException().initCause(e);
         }
-        finally
-        {
-            release();
-        }
     }
 
     public void set_keyspace(String keyspace) throws InvalidRequestException, TException



Mime
View raw message