cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r943344 - in /cassandra/trunk/src/java/org/apache/cassandra: cache/ICacheExpungeHook.java service/ConsistencyChecker.java utils/ExpiringMap.java
Date Wed, 12 May 2010 00:11:46 GMT
Author: jbellis
Date: Wed May 12 00:11:45 2010
New Revision: 943344

URL: http://svn.apache.org/viewvc?rev=943344&view=rev
Log:
replace ExpiringMap in ConsistencyChecker with ScheduledExecutorService
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1077


Removed:
    cassandra/trunk/src/java/org/apache/cassandra/cache/ICacheExpungeHook.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=943344&r1=943343&r2=943344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Wed May
12 00:11:45 2010
@@ -26,10 +26,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.cache.ICacheExpungeHook;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -40,9 +42,9 @@ import org.apache.cassandra.io.util.Data
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.FBUtilities;
 
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,8 @@ import org.slf4j.LoggerFactory;
 class ConsistencyChecker implements Runnable
 {
     private static Logger logger_ = LoggerFactory.getLogger(ConsistencyChecker.class);
-    private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String,
String>(DatabaseDescriptor.getRpcTimeout());
+
+    private static ScheduledExecutorService executor_ = new ScheduledThreadPoolExecutor(1);
// TODO add JMX
 
     private final String table_;
     private final Row row_;
@@ -140,7 +143,7 @@ class ConsistencyChecker implements Runn
         }
     }
 
-	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+	static class DataRepairHandler implements IAsyncCallback
 	{
 		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
 		private final IResponseResolver<Row> readResponseResolver_;
@@ -172,20 +175,15 @@ class ConsistencyChecker implements Runn
 			responses_.add(message);
             if (responses_.size() == majority_)
             {
-                String messageId = message.getMessageId();
-                readRepairTable_.put(messageId, messageId, this);
-            }
-        }
-
-		public void callMe(String key, String value)
-		{
-            try
-			{
-				readResponseResolver_.resolve(responses_);
-            }
-            catch (Exception ex)
-            {
-                throw new RuntimeException(ex);
+                Runnable runnable = new WrappedRunnable()
+                {
+                    public void runMayThrow() throws IOException, DigestMismatchException
+                    {
+                        readResponseResolver_.resolve(responses_);
+                    }
+                };
+                // give remaining replicas until timeout to reply and get added to responses_
+                executor_.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=943344&r1=943343&r2=943344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Wed May 12 00:11:45
2010
@@ -19,15 +19,14 @@
 package org.apache.cassandra.utils;
 
 import java.util.*;
-import java.util.Map.Entry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.ICacheExpungeHook;
-
 public class ExpiringMap<K, V>
 {
+    private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+
     private class CacheableObject
     {
         private V value_;
@@ -74,7 +73,6 @@ public class ExpiringMap<K, V>
         @Override
         public void run()
         {
-            Map<K, V> expungedValues = new HashMap<K, V>();
             synchronized (cache_)
             {
                 Enumeration<K> e = cache_.keys();
@@ -84,37 +82,16 @@ public class ExpiringMap<K, V>
                     CacheableObject co = cache_.get(key);
                     if (co != null && co.isReadyToDie(expiration_))
                     {
-                        V v = co.getValue();
-                        if (null != v)
-                        {
-                            expungedValues.put(key, v);
-                        }
                         cache_.remove(key);
                     }
                 }
             }
-
-            /* Calling the hooks on the keys that have been expunged */
-            for (Entry<K, V> entry : expungedValues.entrySet())
-            {
-                K key = entry.getKey();
-                V value = entry.getValue();
-                
-                ICacheExpungeHook<K, V> hook = hooks_.remove(key);
-                if (hook != null)
-                {
-                    hook.callMe(key, value);
-                }
-            }
-            expungedValues.clear();
         }
     }
 
     private Hashtable<K, CacheableObject> cache_;
-    private Map<K, ICacheExpungeHook<K, V>> hooks_;
     private Timer timer_;
     private static int counter_ = 0;
-    private static final Logger LOGGER = LoggerFactory.getLogger(ExpiringMap.class);
 
     private void init(long expiration)
     {
@@ -124,7 +101,6 @@ public class ExpiringMap<K, V>
         }
 
         cache_ = new Hashtable<K, CacheableObject>();
-        hooks_ = new Hashtable<K, ICacheExpungeHook<K, V>>();
         timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true);
         timer_.schedule(new CacheMonitor(expiration), expiration, expiration);
     }
@@ -148,12 +124,6 @@ public class ExpiringMap<K, V>
         cache_.put(key, new CacheableObject(value));
     }
 
-    public void put(K key, V value, ICacheExpungeHook<K, V> hook)
-    {
-        put(key, value);
-        hooks_.put(key, hook);
-    }
-
     public V get(K key)
     {
         V result = null;



Mime
View raw message