cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1221467 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/service/
Date Tue, 20 Dec 2011 20:08:20 GMT
Author: jbellis
Date: Tue Dec 20 20:08:20 2011
New Revision: 1221467

URL: http://svn.apache.org/viewvc?rev=1221467&view=rev
Log:
stop thrift service in shutdown hook so we can quiesce MessagingService
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-3335

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
    cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Tue Dec 20 20:08:20 2011
@@ -3,6 +3,8 @@
  * more efficient allocation of small bloom filters (CASSANDRA-3618)
  * CLibrary.createHardLinkWithExec() to check for errors (CASSANDRA-3101)
  * Avoid creating empty and non cleaned writer during compaction (CASSANDRA-3616)
+ * stop thrift service in shutdown hook so we can quiesce MessagingService
+   (CASSANDRA-3335)
 Merged from 0.8:
  * prevent new nodes from thinking down nodes are up forever (CASSANDRA-3626)
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
Tue Dec 20 20:08:20 2011
@@ -473,25 +473,20 @@ public final class MessagingService impl
         callbacks.clear();
     }
 
-    public void shutdown()
+    /**
+     * There isn't a good way to shut down the MessagingService. One problem (but not the
only one)
+     * is that StorageProxy has no way to communicate back to clients, "I'm nominally alive,
but I can't
+     * send that request to the nodes with your data."  Neither TimedOut nor Unavailable
is appropriate
+     * to return in that situation.
+     *
+     * So instead of shutting down MS and letting StorageProxy/clients cope somehow, we shut
down
+     * the Thrift service and then wait for all the outstanding requests to finish or timeout.
+     */
+    public void waitForCallbacks()
     {
-        logger_.info("Shutting down MessageService...");
+        logger_.info("Waiting for messaging service to quiesce");
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut
down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
-
-        try
-        {
-            for (SocketThread th : socketThreads)
-                th.close();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-
-        streamExecutor_.shutdown();
-
-        logger_.info("Waiting for in-progress requests to complete");
         callbacks.shutdown();
     }
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
Tue Dec 20 20:08:20 2011
@@ -347,7 +347,7 @@ public class StorageService implements I
         Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
-        MessagingService.instance().shutdown();
+        MessagingService.instance().waitForCallbacks();
         // give it a second so that task accepted before the MessagingService shutdown gets
submitted to the stage (to avoid RejectedExecutionException)
         try { Thread.sleep(1000L); } catch (InterruptedException e) {}
         StageManager.shutdownNow();
@@ -443,13 +443,15 @@ public class StorageService implements I
                 if (mutationStage.isShutdown())
                     return; // drained already
 
+                stopRPCServer();
                 optionalTasks.shutdown();
                 Gossiper.instance.stop();
-                MessagingService.instance().shutdown();
 
+                // In-progress writes originating here could generate hints to be written,
so shut down MessagingService
+                // before mutation stage, so we can get all the hints saved before shutting
down
+                MessagingService.instance().waitForCallbacks();
                 mutationStage.shutdown();
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-
                 StorageProxy.instance.verifyNoHintsInProgress();
 
                 List<Future<?>> flushes = new ArrayList<Future<?>>();
@@ -2106,7 +2108,7 @@ public class StorageService implements I
             public void run()
             {
                 Gossiper.instance.stop();
-                MessagingService.instance().shutdown();
+                MessagingService.instance().waitForCallbacks();
                 StageManager.shutdownNow();
                 setMode(Mode.DECOMMISSIONED, true);
                 // let op be responsible for killing the process
@@ -2498,10 +2500,12 @@ public class StorageService implements I
             return;
         }
         setMode(Mode.DRAINING, "starting drain process", true);
+        stopRPCServer();
         optionalTasks.shutdown();
         Gossiper.instance.stop();
+
         setMode(Mode.DRAINING, "shutting down MessageService", false);
-        MessagingService.instance().shutdown();
+        MessagingService.instance().waitForCallbacks();
         setMode(Mode.DRAINING, "waiting for streaming", false);
         MessagingService.instance().waitForStreaming();
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Tue Dec 20 20:08:20 2011
@@ -19,21 +19,21 @@
 
 package org.apache.cassandra.thrift;
 
+import java.net.SocketTimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.*;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
 
 /**
@@ -96,7 +96,6 @@ public class CustomTThreadPoolServer ext
                 }
             }
 
-            int failureCount = 0;
             try
             {
                 TTransport client = serverTransport_.accept();
@@ -106,9 +105,11 @@ public class CustomTThreadPoolServer ext
             }
             catch (TTransportException ttx)
             {
+                if (ttx.getCause() instanceof SocketTimeoutException) // thrift sucks
+                    continue;
+
                 if (!stopped_)
                 {
-                    ++failureCount;
                     LOGGER.warn("Transport error occurred during acceptance of message.",
ttx);
                 }
             }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Tue Dec 20 20:08:20 2011
@@ -149,7 +149,7 @@ public class TCustomServerSocket extends
         {
             try
             {
-                serverSocket_.setSoTimeout(0);
+                serverSocket_.setSoTimeout(100);
             }
             catch (SocketException sx)
             {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/ExpiringMap.java
Tue Dec 20 20:08:20 2011
@@ -22,14 +22,19 @@ import java.util.*;
 
 import com.google.common.base.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ExpiringMap<K, V>
 {
+    private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
+
     private static class CacheableObject<T>
     {
         private final T value;
-        private final long age;
+        private final long createdAt;
         private final long expiration;
 
         CacheableObject(T o, long e)
@@ -37,7 +42,7 @@ public class ExpiringMap<K, V>
             assert o != null;
             value = o;
             expiration = e;
-            age = System.currentTimeMillis();
+            createdAt = System.currentTimeMillis();
         }
 
         T getValue()
@@ -45,31 +50,31 @@ public class ExpiringMap<K, V>
             return value;
         }
 
-        boolean isReadyToDie(long start)
+        boolean isReadyToDieAt(long time)
         {
-            return ((start - age) > expiration);
+            return ((time - createdAt) > expiration);
         }
     }
 
     private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K,
CacheableObject<V>>();
     private final Timer timer;
     private static int counter = 0;
-    private final long expiration;
+    private final long defaultExpiration;
 
-    public ExpiringMap(long expiration)
+    public ExpiringMap(long defaultExpiration)
     {
-        this(expiration, null);
+        this(defaultExpiration, null);
     }
 
     /**
      *
-     * @param expiration the TTL for objects in the cache in milliseconds
+     * @param defaultExpiration the TTL for objects in the cache in milliseconds
      */
-    public ExpiringMap(long expiration, final Function<Pair<K,V>, ?> postExpireHook)
+    public ExpiringMap(long defaultExpiration, final Function<Pair<K,V>, ?> postExpireHook)
     {
-        this.expiration = expiration;
+        this.defaultExpiration = defaultExpiration;
 
-        if (expiration <= 0)
+        if (defaultExpiration <= 0)
         {
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
@@ -80,24 +85,28 @@ public class ExpiringMap<K, V>
             public void run()
             {
                 long start = System.currentTimeMillis();
+                int n = 0;
                 for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
                 {
-                    if (entry.getValue().isReadyToDie(start))
+                    if (entry.getValue().isReadyToDieAt(start))
                     {
                         cache.remove(entry.getKey());
+                        n++;
                         if (postExpireHook != null)
                             postExpireHook.apply(new Pair<K, V>(entry.getKey(), entry.getValue().getValue()));
                     }
                 }
+                logger.trace("Expired {} entries", n);
             }
         };
-        timer.schedule(task, expiration / 2, expiration / 2);
+        timer.schedule(task, defaultExpiration / 2, defaultExpiration / 2);
     }
 
     public void shutdown()
     {
         while (!cache.isEmpty())
         {
+            logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size());
             try
             {
                 Thread.sleep(100);
@@ -117,7 +126,7 @@ public class ExpiringMap<K, V>
 
     public V put(K key, V value)
     {
-        return put(key, value, this.expiration);
+        return put(key, value, this.defaultExpiration);
     }
 
     public V put(K key, V value, long timeout)
@@ -141,7 +150,7 @@ public class ExpiringMap<K, V>
     public long getAge(K key)
     {
         CacheableObject<V> co = cache.get(key);
-        return co == null ? 0 : co.age;
+        return co == null ? 0 : co.createdAt;
     }
 
     public int size()

Modified: cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1221467&r1=1221466&r2=1221467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/RemoveTest.java
(original)
+++ cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/service/RemoveTest.java
Tue Dec 20 20:08:20 2011
@@ -31,14 +31,11 @@ import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -88,7 +85,7 @@ public class RemoveTest extends CleanupH
     {
         SinkManager.clear();
         MessagingService.instance().clearCallbacksUnsafe();
-        MessagingService.instance().shutdown();
+        MessagingService.instance().waitForCallbacks();
         ss.setPartitionerUnsafe(oldPartitioner);
     }
 



Mime
View raw message