cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] git commit: Make StreamSession#closeSession() idempotent
Date Mon, 02 Jun 2014 07:00:37 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 1c33ea367 -> e398c6bb8


Make StreamSession#closeSession() idempotent

Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7262


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/709b9fc3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/709b9fc3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/709b9fc3

Branch: refs/heads/cassandra-2.1
Commit: 709b9fc319f669ee07338a842f36a9d77e8c46a1
Parents: e68ac31
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Jun 2 08:53:32 2014 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Jun 2 08:58:11 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      | 32 ++++++++++++--------
 2 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/709b9fc3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d1d1030..bc95a8d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
  * Add authentication support to shuffle (CASSANDRA-6484)
  * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
+ * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
 Merged from 1.2:
  * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
  * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/709b9fc3/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 30e3fa2..79ad487 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.*;
 import org.slf4j.Logger;
@@ -132,6 +133,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
 
     private int retries;
 
+    private AtomicBoolean isAborted = new AtomicBoolean(false);
+
     public static enum State
     {
         INITIALIZED,
@@ -329,23 +332,26 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
         }
     }
 
-    private void closeSession(State finalState)
+    private synchronized void closeSession(State finalState)
     {
-        state(finalState);
-
-        if (finalState == State.FAILED)
+        if (isAborted.compareAndSet(false, true))
         {
-            for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
-                task.abort();
-        }
+            state(finalState);
 
-        // Note that we shouldn't block on this close because this method is called on the
handler
-        // incoming thread (so we would deadlock).
-        handler.close();
+            if (finalState == State.FAILED)
+            {
+                for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
+                    task.abort();
+            }
+
+            // Note that we shouldn't block on this close because this method is called on
the handler
+            // incoming thread (so we would deadlock).
+            handler.close();
 
-        Gossiper.instance.unregister(this);
-        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        streamResult.handleSessionComplete(this);
+            Gossiper.instance.unregister(this);
+            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+            streamResult.handleSessionComplete(this);
+        }
     }
 
     /**


Mime
View raw message