cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject git commit: sstables from stalled repair sessions become live after a reboot and can resurrect deleted data patch by jasobrown, reviewed by yukim for CASSANDRA-6503
Date Thu, 30 Jan 2014 17:52:50 GMT
Updated Branches:
  refs/heads/cassandra-1.2 852e27f2e -> 6ad995e8f


sstables from stalled repair sessions become live after a reboot and can resurrect deleted
data
patch by jasobrown, reviewed by yukim for CASSANDRA-6503


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6ad995e8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6ad995e8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6ad995e8

Branch: refs/heads/cassandra-1.2
Commit: 6ad995e8fa7703e082ea9ce67dc4c1ed0b1fd18a
Parents: 852e27f
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Thu Jan 30 09:51:42 2014 -0800
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Thu Jan 30 09:51:42 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                            |  1 +
 .../cassandra/streaming/IncomingStreamReader.java      |  8 ++++----
 .../apache/cassandra/streaming/StreamInSession.java    | 13 +++++++++----
 3 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad995e8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 110bf50..d85d3a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@
  * Fix preparing with batch and delete from collection (CASSANDRA-6607)
  * Fix ABSC reverse iterator's remove() method (CASSANDRA-6629)
  * Handle host ID conflicts properly (CASSANDRA-6615)
+ * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad995e8/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 0b058fc..940f8de 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -119,8 +119,8 @@ public class IncomingStreamReader
             DataInput dis = new DataInputStream(underliningStream);
             try
             {
-                SSTableReader reader = streamIn(dis, localFile, remoteFile);
-                session.finished(remoteFile, reader);
+                SSTableWriter writer = streamIn(dis, localFile, remoteFile);
+                session.finished(remoteFile, writer);
             }
             catch (IOException ex)
             {
@@ -141,7 +141,7 @@ public class IncomingStreamReader
     /**
      * @throws IOException if reading the remote sstable fails. Will throw an RTE if local
write fails.
      */
-    private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile)
throws IOException
+    private SSTableWriter streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile)
throws IOException
     {
         ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
         DecoratedKey key;
@@ -197,7 +197,7 @@ public class IncomingStreamReader
             }
             StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
             metrics.incomingBytes.inc(totalBytesRead);
-            return writer.closeAndOpenReader();
+            return writer;
         }
         catch (Throwable e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad995e8/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 96c31da..e83a5b6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -24,6 +24,7 @@ import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
@@ -47,7 +48,7 @@ public class StreamInSession extends AbstractStreamSession
     private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID,
StreamInSession>();
 
     private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
-    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
+    private final List<SSTableWriter> writers = new ArrayList<SSTableWriter>();
     private PendingFile current;
     private Socket socket;
     private volatile int retries;
@@ -106,13 +107,13 @@ public class StreamInSession extends AbstractStreamSession
         }
     }
 
-    public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
+    public void finished(PendingFile remoteFile, SSTableWriter writer) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {} (from {}). Sending ack to {}", new Object[] {remoteFile,
getHost(), this});
 
-        assert reader != null;
-        readers.add(reader);
+        assert writer != null;
+        writers.add(writer);
         files.remove(remoteFile);
         if (remoteFile.equals(current))
             current = null;
@@ -163,6 +164,10 @@ public class StreamInSession extends AbstractStreamSession
             HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore,
List<SSTableReader>>();
             try
             {
+                List<SSTableReader> readers = new ArrayList<SSTableReader>();
+                for(SSTableWriter writer : writers)
+                    readers.add(writer.closeAndOpenReader());
+
                 for (SSTableReader sstable : readers)
                 {
                     assert sstable.getTableName().equals(table);


Mime
View raw message