cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/3] git commit: Fix duplicate SSTable reference when stream session failed; patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-3306
Date Wed, 31 Oct 2012 16:11:33 GMT
Fix duplicate SSTable reference when stream session failed; patch by yukim reviewed by Sylvain
Lebresne for CASSANDRA-3306


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d401d9f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d401d9f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d401d9f1

Branch: refs/heads/trunk
Commit: d401d9f1deaad08ff0a76e3795ec4b1c0fa72421
Parents: 99b245d
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed Oct 31 10:42:36 2012 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Oct 31 10:42:36 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/streaming/IncomingStreamReader.java  |   12 ++++++++++++
 .../cassandra/streaming/StreamInSession.java       |    6 ++++++
 3 files changed, 19 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69298f..5f5ea89 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * fix compositeType.{get/from}String methods (CASSANDRA-4842)
  * (CQL) fix CREATE COLUMNFAMILY permissions check (CASSANDRA-4864)
  * Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
+ * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
 
 
 1.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 915d3bc..bfb046f 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -31,9 +31,11 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
@@ -60,6 +62,16 @@ public class IncomingStreamReader
         this.socket = socket;
         InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
                            : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+        if (header.pendingFiles.isEmpty() && header.file != null)
+        {
+            // StreamInSession should be created already when receiving 2nd and after files
+            if (!StreamInSession.hasSession(host, header.sessionId))
+            {
+                StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
+                OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(host)),
Long.toString(header.sessionId), new DataOutputStream(socket.getOutputStream()));
+                throw new IOException("Session " + header.sessionId + " already closed.");
+            }
+        }
         session = StreamInSession.get(host, header.sessionId);
         session.setSocket(socket);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d401d9f1/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index e123714..e1bbe41 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -102,6 +102,12 @@ public class StreamInSession extends AbstractStreamSession
         return session;
     }
 
+    public static boolean hasSession(InetAddress host, long sessionId)
+    {
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
+        return sessions.get(context) != null;
+    }
+
     public void setCurrentFile(PendingFile file)
     {
         this.current = file;


Mime
View raw message