cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1172665 - in /cassandra/branches/cassandra-1.0: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/streaming/
Date Mon, 19 Sep 2011 15:32:49 GMT
Author: slebresne
Date: Mon Sep 19 15:32:49 2011
New Revision: 1172665

URL: http://svn.apache.org/viewvc?rev=1172665&view=rev
Log:
merge from 1.0.0

Modified:
    cassandra/branches/cassandra-1.0/   (props changed)
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/contrib/   (props changed)
    cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
  (props changed)
    cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
  (props changed)
    cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java

Propchange: cassandra/branches/cassandra-1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/branches/cassandra-1.0:1167106,1167185
-/cassandra/branches/cassandra-1.0.0:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /cassandra/trunk:1167085-1167102,1169870

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1172665&r1=1172664&r2=1172665&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Mon Sep 19 15:32:49 2011
@@ -5,6 +5,9 @@
 1.0.0-rc1
  * Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
  * Log message when a full repair operation completes (CASSANDRA-3207)
+ * Fix streamOutSession keeping sstables references forever if the remote end
+   dies (CASSANDRA-3216)
+
 
 1.0.0-beta1
  * removed binarymemtable (CASSANDRA-2692)

Propchange: cassandra/branches/cassandra-1.0/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/branches/cassandra-1.0/contrib:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /cassandra/trunk/contrib:1167085-1167102,1169870

Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1167102,1169870

Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1167102,1169870

Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1167102,1169870

Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1167102,1169870

Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1171098,1172597,1172663
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1167102,1169870

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1172665&r1=1172664&r2=1172665&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Mon Sep 19 15:32:49 2011
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -34,7 +37,7 @@ import org.cliffc.high_scale_lib.NonBloc
 /**
  * This class manages the streaming of multiple files one after the other.
 */
-public class StreamOutSession
+public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class
);
 
@@ -70,12 +73,15 @@ public class StreamOutSession
     private final Pair<InetAddress, Long> context;
     private final Runnable callback;
     private volatile String currentFile;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable
callback)
     {
         this.table = table;
         this.context = context;
         this.callback = callback;
+        Gossiper.instance.register(this);
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public InetAddress getHost()
@@ -123,11 +129,30 @@ public class StreamOutSession
 
     public void close()
     {
-        // Release reference on last file
+        close(true);
+    }
+
+    private void close(boolean success)
+    {
+        // Though unlikely, it is possible for close to be called multiple
+        // time, if the endpoint die at the exact wrong time for instance.
+        if (!isClosed.compareAndSet(false, true))
+        {
+            logger.debug("StreamOutSession {} already closed", getSessionId());
+            return;
+        }
+
+        Gossiper.instance.unregister(this);
+        FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+        // Release reference on last file (or any uncompleted ones)
         for (PendingFile file : files.values())
             file.sstable.releaseReference();
         streams.remove(context);
-        if (callback != null)
+        // Instead of just not calling the callback on failure, we could have
+        // allow to register a specific callback for failures, but we leave
+        // that to a future ticket (likely CASSANDRA-3112)
+        if (callback != null && success)
             callback.run();
     }
 
@@ -179,4 +204,32 @@ public class StreamOutSession
         logger.debug("Files are {}", StringUtils.join(files.values(), ","));
         MessagingService.instance().stream(header, getHost());
     }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
{}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!endpoint.equals(getHost()))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing
a streaming wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        logger.error("StreamOutSession {} failed because {} died or was restarted/removed",
endpoint);
+        close(false);
+    }
 }



Mime
View raw message