cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: Fix TimeoutException when there is a firewall issue. patch by Vijay; reviewed by jbellis for CASSANDRA-3533
Date Wed, 10 Apr 2013 17:22:57 GMT
Updated Branches:
  refs/heads/trunk b31f48d30 -> 576efcd81


Fix TimeoutException when there is a firewall issue.
patch by Vijay; reviewed by jbellis for CASSANDRA-3533


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/576efcd8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/576efcd8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/576efcd8

Branch: refs/heads/trunk
Commit: 576efcd8121e70e7d550fdce9432be43690a4b1d
Parents: b31f48d
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Wed Apr 10 10:21:51 2013 -0700
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Wed Apr 10 10:21:51 2013 -0700

----------------------------------------------------------------------
 src/java/org/apache/cassandra/gms/EchoMessage.java |   29 ++++++++++
 src/java/org/apache/cassandra/gms/Gossiper.java    |   44 ++++++++++-----
 .../org/apache/cassandra/net/MessagingService.java |    4 +
 .../apache/cassandra/service/EchoVerbHandler.java  |   21 +++++++
 .../apache/cassandra/service/StorageService.java   |    1 +
 .../apache/cassandra/io/CompactSerializerTest.java |    1 +
 6 files changed, 86 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/src/java/org/apache/cassandra/gms/EchoMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EchoMessage.java b/src/java/org/apache/cassandra/gms/EchoMessage.java
new file mode 100644
index 0000000..3f5f566
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/EchoMessage.java
@@ -0,0 +1,29 @@
+package org.apache.cassandra.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+public class EchoMessage
+{
+    public static IVersionedSerializer<EchoMessage> serializer = new EchoMessageSerializer();
+
+    public static class EchoMessageSerializer implements IVersionedSerializer<EchoMessage>
+    {
+        public void serialize(EchoMessage t, DataOutput out, int version) throws IOException
+        {
+        }
+
+        public EchoMessage deserialize(DataInput in, int version) throws IOException
+        {
+            return new EchoMessage();
+        }
+
+        public long serializedSize(EchoMessage t, int version)
+        {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index ae920e1..04ece7a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -759,21 +761,35 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     }
 
-    private void markAlive(InetAddress addr, EndpointState localState)
+    private void markAlive(final InetAddress addr, final EndpointState localState)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("marking as alive {}", addr);
-        localState.markAlive();
-        localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting
if it was down > aVeryLongTime
-        liveEndpoints.add(addr);
-        unreachableEndpoints.remove(addr);
-        expireTimeEndpointMap.remove(addr);
-        logger.debug("removing expire time for endpoint : " + addr);
-        logger.info("InetAddress {} is now UP", addr);
-        for (IEndpointStateChangeSubscriber subscriber : subscribers)
-            subscriber.onAlive(addr, localState);
-        if (logger.isTraceEnabled())
-            logger.trace("Notified " + subscribers);
+        MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO,
new EchoMessage(), EchoMessage.serializer);
+        logger.trace("Sending a EchoMessage to {}", addr);
+        IAsyncCallback echoHandler = new IAsyncCallback()
+        {
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+
+            public void response(MessageIn msg)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("marking as alive {}", addr);
+                localState.markAlive();
+                localState.updateTimestamp(); // prevents doStatusCheck from racing us and
evicting if it was down > aVeryLongTime
+                liveEndpoints.add(addr);
+                unreachableEndpoints.remove(addr);
+                expireTimeEndpointMap.remove(addr);
+                logger.debug("removing expire time for endpoint : " + addr);
+                logger.info("InetAddress {} is now UP", addr);
+                for (IEndpointStateChangeSubscriber subscriber : subscribers)
+                    subscriber.onAlive(addr, localState);
+                if (logger.isTraceEnabled())
+                    logger.trace("Notified " + subscribers);
+            }
+        };
+        MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
     }
 
     private void markDead(InetAddress addr, EndpointState localState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0a15260..a32500d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
@@ -116,6 +117,7 @@ public final class MessagingService implements MessagingServiceMBean
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
         _TRACE, // dummy verb so we can use MS.droppedMessages
+        ECHO,
         // use as padding for backwards compatability where a previous version needs to validate
a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -152,6 +154,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
+        put(Verb.ECHO, Stage.GOSSIP);
         put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
@@ -190,6 +193,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
         put(Verb.REPLICATION_FINISHED, null);
         put(Verb.COUNTER_MUTATION, CounterMutation.serializer);
+        put(Verb.ECHO, EchoMessage.serializer);
     }};
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/src/java/org/apache/cassandra/service/EchoVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
new file mode 100644
index 0000000..f0cb66d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -0,0 +1,21 @@
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.gms.EchoMessage;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EchoVerbHandler implements IVerbHandler<EchoMessage>
+{
+    private static final Logger logger = LoggerFactory.getLogger(EchoVerbHandler.class);
+
+    public void doVerb(MessageIn<EchoMessage> message, int id)
+    {
+        MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE,
new EchoMessage(), EchoMessage.serializer);
+        logger.trace("Sending a EchoMessage reply {}", message.from);
+        MessagingService.instance().sendReply(echoMessage, id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 36724b9..eed16b9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -256,6 +256,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MIGRATION_REQUEST,
new MigrationRequestVerbHandler());
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT,
new SnapshotVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new
EchoVerbHandler());
 
         // spin up the streaming service so it is available for jmx tools.
         if (StreamingService.instance == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/576efcd8/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
index 2532e0e..a436702 100644
--- a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
@@ -83,6 +83,7 @@ public class CompactSerializerTest extends SchemaLoader
         expectedClassNames.add("ColumnFamilySerializer");
         expectedClassNames.add("CompressionInfoSerializer");
         expectedClassNames.add("ChunkSerializer");
+        expectedClassNames.add("EchoMessageSerializer");
 
         discoveredClassNames = new ArrayList<String>();
         String cp = System.getProperty("java.class.path");


Mime
View raw message