cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [10/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:34 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index e766e34..bf07402 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -23,14 +23,14 @@ package org.apache.cassandra.service.paxos;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.UUID;
 
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareResponse
@@ -49,7 +49,7 @@ public class PrepareResponse
 
     public PrepareResponse(boolean promised, Commit inProgressCommit, Commit mostRecentCommit)
     {
-        assert inProgressCommit.key == mostRecentCommit.key;
+        assert inProgressCommit.update.partitionKey().equals(mostRecentCommit.update.partitionKey());
         assert inProgressCommit.update.metadata() == mostRecentCommit.update.metadata();
 
         this.promised = promised;
@@ -68,38 +68,53 @@ public class PrepareResponse
         public void serialize(PrepareResponse response, DataOutputPlus out, int version)
throws IOException
         {
             out.writeBoolean(response.promised);
-            ByteBufferUtil.writeWithShortLength(response.inProgressCommit.key, out);
-            UUIDSerializer.serializer.serialize(response.inProgressCommit.ballot, out, version);
-            ColumnFamily.serializer.serialize(response.inProgressCommit.update, out, version);
-            UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out, version);
-            ColumnFamily.serializer.serialize(response.mostRecentCommit.update, out, version);
+            Commit.serializer.serialize(response.inProgressCommit, out, version);
+
+            if (version < MessagingService.VERSION_30)
+            {
+                UUIDSerializer.serializer.serialize(response.mostRecentCommit.ballot, out,
version);
+                PartitionUpdate.serializer.serialize(response.mostRecentCommit.update, out,
version);
+            }
+            else
+            {
+                Commit.serializer.serialize(response.mostRecentCommit, out, version);
+            }
         }
 
         public PrepareResponse deserialize(DataInput in, int version) throws IOException
         {
             boolean success = in.readBoolean();
-            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-            return new PrepareResponse(success,
-                                       new Commit(key,
-                                                  UUIDSerializer.serializer.deserialize(in,
version),
-                                                  ColumnFamily.serializer.deserialize(in,
-                                                                                      ArrayBackedSortedColumns.factory,
-                                                                                      ColumnSerializer.Flag.LOCAL,
version)),
-                                       new Commit(key,
-                                                  UUIDSerializer.serializer.deserialize(in,
version),
-                                                  ColumnFamily.serializer.deserialize(in,
-                                                                                      ArrayBackedSortedColumns.factory,
-                                                                                      ColumnSerializer.Flag.LOCAL,
version)));
+            Commit inProgress = Commit.serializer.deserialize(in, version);
+            Commit mostRecent;
+            if (version < MessagingService.VERSION_30)
+            {
+                UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
+                PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version,
SerializationHelper.Flag.LOCAL, inProgress.update.partitionKey());
+                mostRecent = new Commit(ballot, update);
+            }
+            else
+            {
+                mostRecent = Commit.serializer.deserialize(in, version);
+            }
+            return new PrepareResponse(success, inProgress, mostRecent);
         }
 
         public long serializedSize(PrepareResponse response, int version)
         {
-            return 1
-                   + 2 + response.inProgressCommit.key.remaining()
-                   + UUIDSerializer.serializer.serializedSize(response.inProgressCommit.ballot,
version)
-                   + ColumnFamily.serializer.serializedSize(response.inProgressCommit.update,
version)
-                   + UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot,
version)
-                   + ColumnFamily.serializer.serializedSize(response.mostRecentCommit.update,
version);
+            TypeSizes sizes = TypeSizes.NATIVE;
+            long size = sizes.sizeof(response.promised)
+                      + Commit.serializer.serializedSize(response.inProgressCommit, version);
+
+            if (version < MessagingService.VERSION_30)
+            {
+                size += UUIDSerializer.serializer.serializedSize(response.mostRecentCommit.ballot,
version);
+                size += PartitionUpdate.serializer.serializedSize(response.mostRecentCommit.update,
version, sizes);
+            }
+            else
+            {
+                size += Commit.serializer.serializedSize(response.mostRecentCommit, version);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d..66eb220 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -18,31 +18,35 @@
 package org.apache.cassandra.streaming;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Throwables;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
 
+import com.google.common.collect.UnmodifiableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.ning.compress.lzf.LZFInputStream;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 
@@ -60,6 +64,7 @@ public class StreamReader
     protected final long repairedAt;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
+    protected final SerializationHeader.Component header;
 
     protected Descriptor desc;
 
@@ -69,10 +74,11 @@ public class StreamReader
         this.cfId = header.cfId;
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
-        this.inputVersion = header.format.info.getVersion(header.version);
+        this.inputVersion = header.version;
         this.repairedAt = header.repairedAt;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
+        this.header = header.header;
     }
 
     /**
@@ -98,17 +104,18 @@ public class StreamReader
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion,
header.toHeader(cfs.metadata));
         try
         {
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
-
+                writePartition(deserializer, writer, cfs);
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
             return writer;
-        } catch (Throwable e)
+        }
+        catch (Throwable e)
         {
             writer.abort();
             drain(dis, in.getBytesRead());
@@ -126,7 +133,7 @@ public class StreamReader
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir),
format));
 
-        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
+        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata));
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -156,10 +163,141 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws
IOException
+    protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer,
ColumnFamilyStore cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
-        writer.appendFromStream(key, cfs.metadata, in, inputVersion);
-        cfs.invalidateCachedRow(key);
+        DecoratedKey key = deserializer.newPartition();
+        writer.append(deserializer);
+        deserializer.checkForExceptions();
+        cfs.invalidateCachedPartition(key);
+    }
+
+    public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered>
implements UnfilteredRowIterator
+    {
+        private final CFMetaData metadata;
+        private final DataInput in;
+        private final SerializationHeader header;
+        private final SerializationHelper helper;
+
+        private DecoratedKey key;
+        private DeletionTime partitionLevelDeletion;
+        private SSTableSimpleIterator iterator;
+        private Row staticRow;
+        private IOException exception;
+
+        private final CounterFilteredRow counterRow;
+
+        public StreamDeserializer(CFMetaData metadata, DataInput in, Version version, SerializationHeader
header)
+        {
+            assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes";
+            this.metadata = metadata;
+            this.in = in;
+            this.helper = new SerializationHelper(version.correspondingMessagingVersion(),
SerializationHelper.Flag.PRESERVE_SIZE);
+            this.header = header;
+            this.counterRow = metadata.isCounter() ? new CounterFilteredRow() : null;
+        }
+
+        public DecoratedKey newPartition() throws IOException
+        {
+            key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+            partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
+            iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
+            staticRow = iterator.readStaticRow();
+            return key;
+        }
+
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+
+        public PartitionColumns columns()
+        {
+            // We don't know which columns we'll get so assume it can be all of them
+            return metadata.partitionColumns();
+        }
+
+        public boolean isReverseOrder()
+        {
+            return false;
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return key;
+        }
+
+        public DeletionTime partitionLevelDeletion()
+        {
+            return partitionLevelDeletion;
+        }
+
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        public RowStats stats()
+        {
+            return header.stats();
+        }
+
+        public boolean hasNext()
+        {
+            try
+            {
+                return iterator.hasNext();
+            }
+            catch (IOError e)
+            {
+                if (e.getCause() != null && e.getCause() instanceof IOException)
+                {
+                    exception = (IOException)e.getCause();
+                    return false;
+                }
+                throw e;
+            }
+        }
+
+        public Unfiltered next()
+        {
+            // Note that in practice we know that IOException will be thrown by hasNext(),
because that's
+            // where the actual reading happens, so we don't bother catching RuntimeException
here (contrarily
+            // to what we do in hasNext)
+            Unfiltered unfiltered = iterator.next();
+            return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
+                 ? maybeMarkLocalToBeCleared((Row) unfiltered)
+                 : unfiltered;
+        }
+
+        private Row maybeMarkLocalToBeCleared(Row row)
+        {
+            return metadata.isCounter()
+                 ? counterRow.setTo(row)
+                 : row;
+        }
+
+        public void checkForExceptions() throws IOException
+        {
+            if (exception != null)
+                throw exception;
+        }
+
+        public void close()
+        {
+        }
+    }
+
+    private static class CounterFilteredRow extends WrappingRow
+    {
+        protected Cell filterCell(Cell cell)
+        {
+            if (!cell.isCounterCell())
+                return cell;
+
+            ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(cell.value());
+            return marked == cell.value()
+                 ? cell
+                 : Cells.create(cell.column(), true, marked, cell.livenessInfo(), cell.path());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 44522db..d27c4e2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -24,8 +24,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 
@@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -316,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         {
             for (ColumnFamilyStore cfStore : stores)
             {
-                final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+                final List<AbstractBounds<PartitionPosition>> rowBoundsList =
new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(Range.makeRowRange(range));
                 refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
@@ -327,7 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                         Set<SSTableReader> sstables = Sets.newHashSet();
                         if (filteredSSTables != null)
                         {
-                            for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+                            for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList)
                             {
                                 // sstableInBounds may contain early opened sstables
                                 for (SSTableReader sstable : view.sstablesInBounds(rowBounds))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94..47832f0 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -79,6 +79,7 @@ public class CompressedStreamReader extends StreamReader
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel),
compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion,
header.toHeader(cfs.metadata));
         try
         {
             for (Pair<Long, Long> section : sections)
@@ -92,8 +93,7 @@ public class CompressedStreamReader extends StreamReader
 
                 while (in.getBytesRead() < sectionLength)
                 {
-                    writeRow(writer, in, cfs);
-
+                    writePartition(deserializer, writer, cfs);
                     // when compressed, report total bytes of compressed chunks read since
remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(),
totalSize);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index e9c99fe..b8e7979 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -23,11 +23,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
@@ -43,7 +47,7 @@ public class FileMessageHeader
     public final UUID cfId;
     public final int sequenceNumber;
     /** SSTable version */
-    public final String version;
+    public final Version version;
 
     /** SSTable format **/
     public final SSTableFormat.Type format;
@@ -52,16 +56,18 @@ public class FileMessageHeader
     public final CompressionInfo compressionInfo;
     public final long repairedAt;
     public final int sstableLevel;
+    public final SerializationHeader.Component header;
 
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
-                             String version,
+                             Version version,
                              SSTableFormat.Type format,
                              long estimatedKeys,
                              List<Pair<Long, Long>> sections,
                              CompressionInfo compressionInfo,
                              long repairedAt,
-                             int sstableLevel)
+                             int sstableLevel,
+                             SerializationHeader.Component header)
     {
         this.cfId = cfId;
         this.sequenceNumber = sequenceNumber;
@@ -72,6 +78,7 @@ public class FileMessageHeader
         this.compressionInfo = compressionInfo;
         this.repairedAt = repairedAt;
         this.sstableLevel = sstableLevel;
+        this.header = header;
     }
 
     /**
@@ -134,7 +141,7 @@ public class FileMessageHeader
         {
             UUIDSerializer.serializer.serialize(header.cfId, out, version);
             out.writeInt(header.sequenceNumber);
-            out.writeUTF(header.version);
+            out.writeUTF(header.version.toString());
 
             //We can't stream to a node that doesn't understand a new sstable format
             if (version < StreamMessage.VERSION_22 && header.format != SSTableFormat.Type.LEGACY
&& header.format != SSTableFormat.Type.BIG)
@@ -153,13 +160,16 @@ public class FileMessageHeader
             CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
             out.writeLong(header.repairedAt);
             out.writeInt(header.sstableLevel);
+
+            if (version >= StreamMessage.VERSION_30)
+                SerializationHeader.serializer.serialize(header.header, out);
         }
 
         public FileMessageHeader deserialize(DataInput in, int version) throws IOException
         {
             UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int sequenceNumber = in.readInt();
-            String sstableVersion = in.readUTF();
+            Version sstableVersion = DatabaseDescriptor.getSSTableFormat().info.getVersion(in.readUTF());
 
             SSTableFormat.Type format = SSTableFormat.Type.LEGACY;
             if (version >= StreamMessage.VERSION_22)
@@ -173,14 +183,18 @@ public class FileMessageHeader
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in,
MessagingService.current_version);
             long repairedAt = in.readLong();
             int sstableLevel = in.readInt();
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys,
sections, compressionInfo, repairedAt, sstableLevel);
+            SerializationHeader.Component header = version >= StreamMessage.VERSION_30
+                                                 ? SerializationHeader.serializer.deserialize(sstableVersion,
in)
+                                                 : null;
+
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys,
sections, compressionInfo, repairedAt, sstableLevel, header);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
             size += TypeSizes.NATIVE.sizeof(header.sequenceNumber);
-            size += TypeSizes.NATIVE.sizeof(header.version);
+            size += TypeSizes.NATIVE.sizeof(header.version.toString());
 
             if (version >= StreamMessage.VERSION_22)
                 size += TypeSizes.NATIVE.sizeof(header.format.name);
@@ -195,6 +209,10 @@ public class FileMessageHeader
             }
             size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
             size += TypeSizes.NATIVE.sizeof(header.sstableLevel);
+
+            if (version >= StreamMessage.VERSION_30)
+                size += SerializationHeader.serializer.serializedSize(header.header);
+
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 5b34bd8..82e6620 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -70,13 +70,14 @@ public class OutgoingFileMessage extends StreamMessage
         }
         this.header = new FileMessageHeader(sstable.metadata.cfId,
                                             sequenceNumber,
-                                            sstable.descriptor.version.toString(),
+                                            sstable.descriptor.version,
                                             sstable.descriptor.formatType,
                                             estimatedKeys,
                                             sections,
                                             compressionInfo,
                                             repairedAt,
-                                            keepSSTableLevel ? sstable.getSSTableLevel()
: 0);
+                                            keepSSTableLevel ? sstable.getSSTableLevel()
: 0,
+                                            sstable.header == null ? null : sstable.header.toComponent());
     }
 
     public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession
session) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index d4e8a81..3db2dbf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -34,7 +34,8 @@ public abstract class StreamMessage
     /** Streaming protocol version */
     public static final int VERSION_20 = 2;
     public static final int VERSION_22 = 3;
-    public static final int CURRENT_VERSION = VERSION_22;
+    public static final int VERSION_30 = 4;
+    public static final int CURRENT_VERSION = VERSION_30;
 
     public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version,
StreamSession session) throws IOException
     {


Mime
View raw message