cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r1065664 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/gms/Gossiper.java src/java/org/apache/cassandra/net/IncomingTcpConnection.java src/java/org/apache/cassandra/net/MessagingService.java
Date Mon, 31 Jan 2011 16:12:57 GMT
Author: gdusbabek
Date: Mon Jan 31 16:12:57 2011
New Revision: 1065664

URL: http://svn.apache.org/viewvc?rev=1065664&view=rev
Log:
ignore messages from the future. keep track of nodes in gossip regardless. patch by gdusbabek,
reviewed by jbellis. CASSANDRA-1970

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan 31 16:12:57 2011
@@ -49,7 +49,8 @@
  * fix math in RandomPartitioner.describeOwnership (CASSANDRA-2071)
  * fix deletion of sstable non-data components (CASSANDRA-2059)
  * avoid blocking gossip while deleting handoff hints (CASSANDRA-2073)
-
+ * ignore messages from newer versions, keep track of nodes in gossip 
+   regardless of version (CASSANDRA-1970)
 
 0.7.0-final
  * fix offsets to ByteBuffer.get (CASSANDRA-1939)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan
31 16:12:57 2011
@@ -26,6 +26,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,6 +142,10 @@ public class Gossiper implements IFailur
      * after removal to prevent nodes from falsely reincarnating during the time when removal
      * gossip gets propagated to all nodes */
     Map<InetAddress, Long> justRemovedEndpoints_ = new ConcurrentHashMap<InetAddress,
Long>();
+    
+    // protocol versions of the other nodes in the cluster
+    private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress,
Integer>();
+    
 
     private Gossiper()
     {
@@ -169,6 +174,20 @@ public class Gossiper implements IFailur
     {
         subscribers_.remove(subscriber);
     }
+    
+    public void setVersion(InetAddress address, int version)
+    {
+        Integer old = versions.put(address, version);
+        EndpointState state = endpointStateMap_.get(address);
+        if (state == null)
+            addSavedEndpoint(address);
+    }
+    
+    public Integer getVersion(InetAddress address)
+    {
+        return versions.get(address);
+    }
+    
 
     public Set<InetAddress> getLiveMembers()
     {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Mon Jan 31 16:12:57 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.net;
 import java.io.*;
 import java.net.Socket;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +53,7 @@ public class IncomingTcpConnection exten
     {
         DataInputStream input;
         boolean isStream;
+        int version;
         try
         {
             // determine the connection type to decide whether to buffer
@@ -62,6 +64,8 @@ public class IncomingTcpConnection exten
             if (!isStream)
                 // we should buffer
                 input = new DataInputStream(new BufferedInputStream(socket.getInputStream(),
4096));
+            version = MessagingService.getBits(header, 15, 8);
+            Gossiper.instance.setVersion(socket.getInetAddress(), version);
         }
         catch (IOException e)
         {
@@ -74,6 +78,12 @@ public class IncomingTcpConnection exten
             {
                 if (isStream)
                 {
+                    if (version > MessagingService.version_)
+                    {
+                        logger.error("Received untranslated stream from newer protcol version.
Terminating connection!");
+                        close();
+                        return;
+                    }
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
@@ -87,12 +97,18 @@ public class IncomingTcpConnection exten
                     byte[] contentBytes = new byte[size];
                     input.readFully(contentBytes);
                     
-                    Message message = Message.serializer().deserialize(new DataInputStream(new
ByteArrayInputStream(contentBytes)));
-                    MessagingService.instance().receive(message);
+                    if (version > MessagingService.version_)
+                        logger.info("Received connection from newer protocol version. Ignorning
message.");
+                    else
+                    {
+                        Message message = Message.serializer().deserialize(new DataInputStream(new
ByteArrayInputStream(contentBytes)));
+                        MessagingService.instance().receive(message);
+                    }
                 }
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());
                 int header = input.readInt();
+                version = MessagingService.getBits(header, 15, 8);
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections
cannot change type: " + isStream;
             }
             catch (EOFException e)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1065664&r1=1065663&r2=1065664&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Jan 31 16:12:57 2011
@@ -61,7 +61,7 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public final class MessagingService implements MessagingServiceMBean
 {
-    private static final int version_ = 1;
+    public static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
     private SerializerType serializerType_ = SerializerType.BINARY;
 



Mime
View raw message