cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject git commit: Keep sstable level when bootstrapping
Date Mon, 13 Oct 2014 09:27:06 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e473769fb -> 0de0b8c03


Keep sstable level when bootstrapping

Patch by marcuse; reviewed by iamaleksey for CASSANDRA-7460


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0de0b8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0de0b8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0de0b8c0

Branch: refs/heads/trunk
Commit: 0de0b8c0372e825e834b1ffd9685d3db87d21378
Parents: e473769
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Oct 7 07:35:53 2014 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Oct 13 11:24:00 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                         |  1 +
 .../cassandra/db/compaction/LeveledManifest.java    | 14 ++++++++++++++
 .../org/apache/cassandra/dht/RangeStreamer.java     |  8 ++------
 .../apache/cassandra/io/sstable/SSTableLoader.java  |  2 +-
 .../apache/cassandra/io/sstable/SSTableWriter.java  | 10 ++++++++--
 .../cassandra/net/IncomingStreamingConnection.java  |  2 +-
 .../org/apache/cassandra/repair/LocalSyncTask.java  |  2 +-
 .../cassandra/repair/StreamingRepairTask.java       |  2 +-
 .../cassandra/streaming/ConnectionHandler.java      |  3 ++-
 .../cassandra/streaming/StreamCoordinator.java      |  8 +++++---
 .../org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++---
 .../apache/cassandra/streaming/StreamReader.java    |  7 ++++---
 .../cassandra/streaming/StreamResultFuture.java     |  9 +++++----
 .../apache/cassandra/streaming/StreamSession.java   |  9 ++++++++-
 .../cassandra/streaming/StreamTransferTask.java     |  2 +-
 .../streaming/messages/FileMessageHeader.java       | 11 +++++++++--
 .../streaming/messages/OutgoingFileMessage.java     |  5 +++--
 .../streaming/messages/StreamInitMessage.java       |  9 +++++++--
 .../cassandra/streaming/messages/StreamMessage.java |  2 +-
 .../org/apache/cassandra/tools/SSTableImport.java   |  4 ++--
 .../apache/cassandra/io/sstable/SSTableUtils.java   |  2 +-
 .../cassandra/streaming/StreamTransferTaskTest.java |  2 +-
 .../apache/cassandra/tools/SSTableExportTest.java   | 16 ++++++++--------
 23 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f602c0e..b6a3766 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
  * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
  * Support for scripting languages in user-defined functions (CASSANDRA-7526)
  * Support for aggregation functions (CASSANDRA-4914)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a0836a8..6d3bf69 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 
 public class LeveledManifest
@@ -330,6 +331,19 @@ public class LeveledManifest
                 return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
             }
         }
+
+        // during bootstrap we only do size tiering in L0 to make sure
+        // the streamed files can be placed in their original levels
+        if (StorageService.instance.isBootstrapMode())
+        {
+            List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
+            if (!mostInteresting.isEmpty())
+            {
+                logger.info("Bootstrapping - doing STCS in L0");
+                return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
+            }
+            return null;
+        }
         // LevelDB gives each level a score of how much data it contains vs its ideal amount,
and
         // compacts the level with the highest score. But this falls apart spectacularly
once you
         // get behind.  Consider this set of levels:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index be58d77..388834f 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -109,16 +109,12 @@ public class RangeStreamer
         this.tokens = tokens;
         this.address = address;
         this.description = description;
-        this.streamPlan = new StreamPlan(description);
+        this.streamPlan = new StreamPlan(description, true);
     }
 
     public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
     {
-        this.metadata = metadata;
-        this.tokens = null;
-        this.address = address;
-        this.description = description;
-        this.streamPlan = new StreamPlan(description);
+        this(metadata, null, address, description);
     }
 
     public void addSourceFilter(ISourceFilter filter)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index fbd583c..991fa1d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -157,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b5e7d02..ef8cd51 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -62,6 +62,7 @@ import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
@@ -84,14 +85,19 @@ public class SSTableWriter extends SSTable
     private final MetadataCollector sstableMetadataCollector;
     private final long repairedAt;
 
-    public SSTableWriter(String filename, long keyCount, long repairedAt)
+    public SSTableWriter(String filename, long keyCount, long repairedAt, int sstableLevel)
     {
         this(filename,
              keyCount,
              repairedAt,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
-             new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
+             new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator).sstableLevel(sstableLevel));
+    }
+
+    public SSTableWriter(String filename, long keyCount)
+    {
+        this(filename, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
     }
 
     private static Set<Component> components(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 003bbf9..de18d50 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because
we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description,
init.from, socket, init.isForOutgoing, version);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description,
init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 38f63ce..b34c508 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -57,7 +57,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
         InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
 
         logger.info(String.format("[repair #%s] Performing streaming repair of %d ranges
with %s", desc.sessionId, differences.size(), dst));
-        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dst, desc.keyspace, differences,
desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f30eb6f..1472720 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -50,7 +50,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
     public void run()
     {
         logger.info(String.format("[streaming task #%s] Performing streaming repair of %d
ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
-        new StreamPlan("Repair", repairedAt, 1).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(request.dst, desc.keyspace, request.ranges,
desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 6092046..7a7ccbf 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -179,7 +179,8 @@ public class ConnectionHandler
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing);
+                    isForOutgoing,
+                    session.keepSSTableLevel());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             getWriteChannel(socket).write(messageBuf);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 71a853c..130bd45 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -45,11 +45,13 @@ public class StreamCoordinator
     private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
+    private final boolean keepSSTableLevel;
 
-    public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory factory)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory
factory)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -233,7 +235,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, factory, streamSessions.size());
+                StreamSession session = new StreamSession(peer, factory, streamSessions.size(),
keepSSTableLevel);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -265,7 +267,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, factory, id);
+                session = new StreamSession(peer, factory, id, keepSSTableLevel);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index ca448a3..5aa1bc6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,14 +47,19 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false);
     }
 
-    public StreamPlan(String description, long repairedAt, int connectionsPerHost)
+    public StreamPlan(String description, boolean keepSSTableLevels)
+    {
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels);
+    }
+
+    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean
keepSSTableLevels)
     {
         this.description = description;
         this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, new DefaultConnectionFactory());
+        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new
DefaultConnectionFactory());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index b6e1aaf..34cbf02 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -58,6 +58,7 @@ public class StreamReader
     protected final StreamSession session;
     protected final Descriptor.Version inputVersion;
     protected final long repairedAt;
+    protected final int sstableLevel;
 
     protected Descriptor desc;
 
@@ -69,6 +70,7 @@ public class StreamReader
         this.sections = header.sections;
         this.inputVersion = new Descriptor.Version(header.version);
         this.repairedAt = header.repairedAt;
+        this.sstableLevel = header.sstableLevel;
     }
 
     /**
@@ -78,7 +80,7 @@ public class StreamReader
      */
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
+        logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt,
sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@ -119,8 +121,7 @@ public class StreamReader
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
-
-        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt);
+        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt,
sstableLevel);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f28a937..b8a5234 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -69,9 +69,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description)
+    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels)
     {
-        this(planId, description, new StreamCoordinator(0, new DefaultConnectionFactory()));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory()));
     }
 
     static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler>
listeners, StreamCoordinator coordinator)
@@ -101,7 +101,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     InetAddress from,
                                                                     Socket socket,
                                                                     boolean isForOutgoing,
-                                                                    int version) throws IOException
+                                                                    int version,
+                                                                    boolean keepSSTableLevel)
throws IOException
     {
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -109,7 +110,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId,
sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving side is for
JMX exposure.
-            future = new StreamResultFuture(planId, description);
+            future = new StreamResultFuture(planId, description, keepSSTableLevel);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 2efa00d..560a9fa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -133,6 +133,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     private int retries;
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
+    private final boolean keepSSTableLevel;
 
     public static enum State
     {
@@ -153,13 +154,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * @param peer Address of streaming peer
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, StreamConnectionFactory factory, int index)
+    public StreamSession(InetAddress peer, StreamConnectionFactory factory, int index, boolean
keepSSTableLevel)
     {
         this.peer = peer;
         this.index = index;
         this.factory = factory;
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(peer);
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     public UUID planId()
@@ -177,6 +179,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return streamResult == null ? null : streamResult.description;
     }
 
+    public boolean keepSSTableLevel()
+    {
+        return keepSSTableLevel;
+    }
+
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index b840ee5..18058c1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -50,7 +50,7 @@ public class StreamTransferTask extends StreamTask
     public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long,
Long>> sections, long repairedAt)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(),
estimatedKeys, sections, repairedAt);
+        OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(),
estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 284820e..5e378bc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -47,6 +47,7 @@ public class FileMessageHeader
     public final List<Pair<Long, Long>> sections;
     public final CompressionInfo compressionInfo;
     public final long repairedAt;
+    public final int sstableLevel;
 
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
@@ -54,7 +55,8 @@ public class FileMessageHeader
                              long estimatedKeys,
                              List<Pair<Long, Long>> sections,
                              CompressionInfo compressionInfo,
-                             long repairedAt)
+                             long repairedAt,
+                             int sstableLevel)
     {
         this.cfId = cfId;
         this.sequenceNumber = sequenceNumber;
@@ -63,6 +65,7 @@ public class FileMessageHeader
         this.sections = sections;
         this.compressionInfo = compressionInfo;
         this.repairedAt = repairedAt;
+        this.sstableLevel = sstableLevel;
     }
 
     /**
@@ -96,6 +99,7 @@ public class FileMessageHeader
         sb.append(", transfer size: ").append(size());
         sb.append(", compressed?: ").append(compressionInfo != null);
         sb.append(", repairedAt: ").append(repairedAt);
+        sb.append(", level: ").append(sstableLevel);
         sb.append(')');
         return sb.toString();
     }
@@ -134,6 +138,7 @@ public class FileMessageHeader
             }
             CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
             out.writeLong(header.repairedAt);
+            out.writeInt(header.sstableLevel);
         }
 
         public FileMessageHeader deserialize(DataInput in, int version) throws IOException
@@ -148,7 +153,8 @@ public class FileMessageHeader
                 sections.add(Pair.create(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in,
MessagingService.current_version);
             long repairedAt = in.readLong();
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys,
sections, compressionInfo, repairedAt);
+            int sstableLevel = in.readInt();
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys,
sections, compressionInfo, repairedAt, sstableLevel);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
@@ -165,6 +171,7 @@ public class FileMessageHeader
                 size += TypeSizes.NATIVE.sizeof(section.right);
             }
             size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+            size += TypeSizes.NATIVE.sizeof(header.sstableLevel);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 466e2cb..13af987 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -60,7 +60,7 @@ public class OutgoingFileMessage extends StreamMessage
     public FileMessageHeader header;
     public SSTableReader sstable;
 
-    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys,
List<Pair<Long, Long>> sections, long repairedAt)
+    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys,
List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
     {
         super(Type.FILE);
         this.sstable = sstable;
@@ -77,7 +77,8 @@ public class OutgoingFileMessage extends StreamMessage
                                             estimatedKeys,
                                             sections,
                                             compressionInfo,
-                                            repairedAt);
+                                            repairedAt,
+                                            keepSSTableLevel ? sstable.getSSTableLevel()
: 0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index a9ec4ae..0937f71 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -46,14 +46,16 @@ public class StreamInitMessage
 
     // true if this init message is to connect for outgoing message on receiving side
     public final boolean isForOutgoing;
+    public final boolean keepSSTableLevel;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description,
boolean isForOutgoing)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description,
boolean isForOutgoing, boolean keepSSTableLevel)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.description = description;
         this.isForOutgoing = isForOutgoing;
+        this.keepSSTableLevel = keepSSTableLevel;
     }
 
     /**
@@ -105,6 +107,7 @@ public class StreamInitMessage
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
+            out.writeBoolean(message.keepSSTableLevel);
         }
 
         public StreamInitMessage deserialize(DataInput in, int version) throws IOException
@@ -114,7 +117,8 @@ public class StreamInitMessage
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
-            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator);
+            boolean keepSSTableLevel = in.readBoolean();
+            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator,
keepSSTableLevel);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -124,6 +128,7 @@ public class StreamInitMessage
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
+            size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index e527db1..372fdd3 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 3;
 
     public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int
version, StreamSession session) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index e678aaa..05b9dcb 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -302,7 +302,7 @@ public class SSTableImport
         Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
 
         keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
 
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
@@ -375,7 +375,7 @@ public class SSTableImport
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
         parser = getParser(jsonFile); // renewing parser
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
 
         int lineNumber = 1;
         DecoratedKey prevStoredKey = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 157f89b..57c9477 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -211,7 +211,7 @@ public class SSTableUtils
         public SSTableReader write(int expectedSize, Appender appender) throws IOException
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation)
: new File(dest.filenameFor(Component.DATA));
-            SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize,
ActiveRepairService.UNREPAIRED_SSTABLE);
+            SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize);
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
             // mark all components for removal

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 16fa77b..d84f9b7 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -63,7 +63,7 @@ public class StreamTransferTaskTest
         String ks = KEYSPACE1;
         String cf = "Standard1";
 
-        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null,
0);
+        StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), null,
0, true);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0de0b8c0/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 2009c0c..f93e168 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -97,7 +97,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colA"), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
@@ -134,7 +134,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         int nowInSec = (int)(System.currentTimeMillis() / 1000) + 42; //live for 42 seconds
         // Add rowA
@@ -191,7 +191,7 @@ public class SSTableExportTest
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("name"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
@@ -231,7 +231,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Counter1");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Counter1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(BufferCounterCell.createLocal(Util.cellname("colA"), 42, System.currentTimeMillis(),
Long.MIN_VALUE));
@@ -263,7 +263,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "ValuesWithQuotes");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "ValuesWithQuotes");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(new BufferCell(Util.cellname("data"), UTF8Type.instance.fromString("{\"foo\":\"bar\"}")));
@@ -295,7 +295,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colName"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
@@ -357,7 +357,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "UUIDKeys");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "UUIDKeys");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
 
         // Add a row
         cfamily.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L));
@@ -387,7 +387,7 @@ public class SSTableExportTest
     {
         File tempSS = tempSSTableFile(KEYSPACE1, "AsciiKeys");
         ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "AsciiKeys");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE,
0);
 
         // Add a row
         cfamily.addColumn(column("column", "value", 1L));


Mime
View raw message