cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/8] Extend Descriptor to include a format value and refactor reader/writer apis
Date Thu, 23 Oct 2014 15:11:49 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
new file mode 100644
index 0000000..07d867d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableSliceIterator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileDataInput;
+
+/**
+ *  A Cell Iterator over SSTable
+ */
+class SSTableSliceIterator implements OnDiskAtomIterator
+{
+    private final OnDiskAtomIterator reader;
+    private final DecoratedKey key;
+
+    public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ColumnSlice[] slices, boolean reversed)
+    {
+        this.key = key;
+        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
+        this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, slices, reversed);
+    }
+
+    /**
+     * An iterator for a slice within an SSTable
+     * @param sstable Keyspace for the CFS we are reading from
+     * @param file Optional parameter that input is read from.  If null is passed, this class creates an appropriate one automatically.
+     * If this class creates, it will close the underlying file when #close() is called.
+     * If a caller passes a non-null argument, this class will NOT close the underlying file when the iterator is closed (i.e. the caller is responsible for closing the file)
+     * In all cases the caller should explicitly #close() this iterator.
+     * @param key The key the requested slice resides under
+     * @param slices the column slices
+     * @param reversed Results are returned in reverse order iff reversed is true.
+     * @param indexEntry position of the row
+     */
+    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry)
+    {
+        this.key = key;
+        reader = createReader(sstable, indexEntry, file, slices, reversed);
+    }
+
+    private static OnDiskAtomIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ColumnSlice[] slices, boolean reversed)
+    {
+        return slices.length == 1 && slices[0].start.isEmpty() && !reversed
+             ? new SimpleSliceReader(sstable, indexEntry, file, slices[0].finish)
+             : new IndexedSliceReader(sstable, indexEntry, file, slices, reversed);
+    }
+
+    public DecoratedKey getKey()
+    {
+        return key;
+    }
+
+    public ColumnFamily getColumnFamily()
+    {
+        return reader == null ? null : reader.getColumnFamily();
+    }
+
+    public boolean hasNext()
+    {
+        return reader != null && reader.hasNext();
+    }
+
+    public OnDiskAtom next()
+    {
+        return reader.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException
+    {
+        if (reader != null)
+            reader.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
new file mode 100644
index 0000000..9fec303
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SimpleSliceReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+class SimpleSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(SimpleSliceReader.class);
+
+    private final FileDataInput file;
+    private final boolean needsClosing;
+    private final Composite finishColumn;
+    private final CellNameType comparator;
+    private final ColumnFamily emptyColumnFamily;
+    private final Iterator<OnDiskAtom> atomIterator;
+
+    SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, Composite finishColumn)
+    {
+        Tracing.trace("Seeking to partition beginning in data file");
+        this.finishColumn = finishColumn;
+        this.comparator = sstable.metadata.comparator;
+        try
+        {
+            if (input == null)
+            {
+                this.file = sstable.getFileDataInput(indexEntry.position);
+                this.needsClosing = true;
+            }
+            else
+            {
+                this.file = input;
+                input.seek(indexEntry.position);
+                this.needsClosing = false;
+            }
+
+            // Skip key and data size
+            ByteBufferUtil.skipShortLength(file);
+
+            emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
+            emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
+            atomIterator = emptyColumnFamily.metadata().getOnDiskIterator(file, sstable.descriptor.version);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, sstable.getFilename());
+        }
+    }
+
+    protected OnDiskAtom computeNext()
+    {
+        if (!atomIterator.hasNext())
+            return endOfData();
+
+        OnDiskAtom column = atomIterator.next();
+        if (!finishColumn.isEmpty() && comparator.compare(column.name(), finishColumn) > 0)
+            return endOfData();
+
+        return column;
+    }
+
+    public ColumnFamily getColumnFamily()
+    {
+        return emptyColumnFamily;
+    }
+
+    public void close() throws IOException
+    {
+        if (needsClosing)
+            file.close();
+    }
+
+    public DecoratedKey getKey()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index f801dac..dd879c4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -27,6 +27,7 @@ import com.clearspring.analytics.stream.cardinality.ICardinality;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -91,7 +92,7 @@ public class CompactionMetadata extends MetadataComponent
             ByteBufferUtil.writeWithLength(component.cardinalityEstimator.getBytes(), out);
         }
 
-        public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        public CompactionMetadata deserialize(Version version, DataInput in) throws IOException
         {
             int nbAncestors = in.readInt();
             Set<Integer> ancestors = new HashSet<>(nbAncestors);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
index 49ae378..018d4a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.IOException;
 
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -55,5 +56,5 @@ public interface IMetadataComponentSerializer<T extends MetadataComponent>
      * @return Deserialized component
      * @throws IOException
      */
-    T deserialize(Descriptor.Version version, DataInput in) throws IOException;
+    T deserialize(Version version, DataInput in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 7ba2895..152614d 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index a557b88..a501518 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
@@ -235,7 +236,7 @@ public class StatsMetadata extends MetadataComponent
             out.writeBoolean(component.hasLegacyCounterShards);
         }
 
-        public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        public StatsMetadata deserialize(Version version, DataInput in) throws IOException
         {
             EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
@@ -247,7 +248,7 @@ public class StatsMetadata extends MetadataComponent
             StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
             int sstableLevel = in.readInt();
             long repairedAt = 0;
-            if (version.hasRepairedAt)
+            if (version.hasRepairedAt())
                 repairedAt = in.readLong();
 
             int colCount = in.readInt();
@@ -261,7 +262,7 @@ public class StatsMetadata extends MetadataComponent
                 maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
 
             boolean hasLegacyCounterShards = true;
-            if (version.tracksLegacyCounterShards)
+            if (version.tracksLegacyCounterShards())
                 hasLegacyCounterShards = in.readBoolean();
 
             return new StatsMetadata(rowSizes,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
index e00c55c..aed6820 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -82,7 +83,7 @@ public class ValidationMetadata extends MetadataComponent
             out.writeDouble(component.bloomFilterFPChance);
         }
 
-        public ValidationMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        public ValidationMetadata deserialize(Version version, DataInput in) throws IOException
         {
 
             return new ValidationMetadata(in.readUTF(), in.readDouble());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index 2815260..588540d 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -21,9 +21,9 @@ import java.io.*;
 
 public abstract class AbstractDataInput extends InputStream implements DataInput
 {
-    protected abstract void seek(long position) throws IOException;
-    protected abstract long getPosition();
-    protected abstract long getPositionLimit();
+    public abstract void seek(long position) throws IOException;
+    public abstract long getPosition();
+    public abstract long getPositionLimit();
 
     public int skipBytes(int n) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 797b964..4140c95 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -51,7 +51,7 @@ public class DataIntegrityMetadata
         public ChecksumValidator(Descriptor descriptor) throws IOException
         {
             this.descriptor = descriptor;
-            checksum = descriptor.version.hasAllAdlerChecksums ? new Adler32() : new PureJavaCrc32();
+            checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new PureJavaCrc32();
             reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC)));
             chunkSize = reader.readInt();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/FileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java
index d94075c..55809ad 100644
--- a/src/java/org/apache/cassandra/io/util/FileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
 import java.io.Closeable;
 import java.io.DataInput;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 public interface FileDataInput extends DataInput, Closeable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index e590918..3567465 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -17,13 +17,7 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.file.AtomicMoveNotSupportedException;
@@ -442,4 +436,25 @@ public class FileUtils
         }
         return length;
     }
+
+
+    public static void copyTo(DataInput in, OutputStream out, int length) throws IOException
+    {
+        byte[] buffer = new byte[64 * 1024];
+        int copiedBytes = 0;
+
+        while (copiedBytes + buffer.length < length)
+        {
+            in.readFully(buffer);
+            out.write(buffer);
+            copiedBytes += buffer.length;
+        }
+
+        if (copiedBytes < length)
+        {
+            int left = length - copiedBytes;
+            in.readFully(buffer, 0, left);
+            out.write(buffer, 0, left);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index 0479256..574a7fb 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -65,12 +65,12 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
         return segmentOffset + position;
     }
 
-    protected long getPosition()
+    public long getPosition()
     {
         return segmentOffset + position;
     }
 
-    protected long getPositionLimit()
+    public long getPositionLimit()
     {
         return segmentOffset + buffer.capacity();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
index 73ccc1b..45261e0 100644
--- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
@@ -41,17 +41,17 @@ public class MemoryInputStream extends AbstractDataInput implements DataInput
         position += count;
     }
 
-    protected void seek(long pos)
+    public void seek(long pos)
     {
         position = (int) pos;
     }
 
-    protected long getPosition()
+    public long getPosition()
     {
         return position;
     }
 
-    protected long getPositionLimit()
+    public long getPositionLimit()
     {
         return mem.size();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index 8ab432e..4e6f856 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.EstimatedHistogram;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index de3c125..72d8d6e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -74,7 +74,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_12 = 6;
     public static final int VERSION_20 = 7;
     public static final int VERSION_21 = 8;
-    public static final int current_version = VERSION_21;
+    public static final int VERSION_30 = 9;
+    public static final int current_version = VERSION_30;
 
     public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
     public static final byte[] ONE_BYTE = new byte[1];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index e4aff96..15230ea 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.notifications;
 
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableAddedNotification implements INotification
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java b/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java
index 8b0f597..dcaa3b5 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableDeletingNotification.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.notifications;
 
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 /**
  * Fired right before removing an SSTable.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
index c36583c..7ca574b 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
@@ -19,9 +19,8 @@ package org.apache.cassandra.notifications;
 
 import java.util.Collection;
 
-import org.apache.cassandra.io.sstable.SSTableReader;
-
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableListChangedNotification implements INotification
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
index a473a43..d1398bc 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.notifications;
 
 import java.util.Collection;
 
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableRepairStatusChanged implements INotification
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 04a27af..2ad8dc2 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Predicate;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,6 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 670aa0b..fa354e6 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,8 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +43,6 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 1b93c2c..2ffd954 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -33,6 +33,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.util.concurrent.Futures;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,6 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -472,7 +472,8 @@ public class CacheService implements CacheServiceMBean
             out.writeInt(desc.generation);
             out.writeBoolean(true);
             CFMetaData cfm = Schema.instance.getCFMetaData(key.desc.ksname, key.desc.cfname);
-            cfm.comparator.rowIndexEntrySerializer().serialize(entry, out);
+
+            key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
@@ -492,7 +493,7 @@ public class CacheService implements CacheServiceMBean
                 RowIndexEntry.Serializer.skipPromotedIndex(input);
                 return null;
             }
-            RowIndexEntry entry = reader.metadata.comparator.rowIndexEntrySerializer().deserialize(input, reader.descriptor.version);
+            RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version);
             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
index d00842a..a0cf5fc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -28,12 +28,12 @@ import java.util.List;
 import java.util.UUID;
 
 import com.google.common.base.Charsets;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 34cbf02..8c4efcd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -17,16 +17,18 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Throwables;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,15 +39,14 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
+
 /**
  * StreamReader reads from stream and writes to SSTable.
  */
@@ -56,8 +57,9 @@ public class StreamReader
     protected final long estimatedKeys;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamSession session;
-    protected final Descriptor.Version inputVersion;
+    protected final Version inputVersion;
     protected final long repairedAt;
+    protected final SSTableFormat.Type format;
     protected final int sstableLevel;
 
     protected Descriptor desc;
@@ -68,8 +70,9 @@ public class StreamReader
         this.cfId = header.cfId;
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
-        this.inputVersion = new Descriptor.Version(header.version);
+        this.inputVersion = header.format.info.getVersion(header.version);
         this.repairedAt = header.repairedAt;
+        this.format = header.format;
         this.sstableLevel = header.sstableLevel;
     }
 
@@ -91,7 +94,8 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
+        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         try
@@ -99,12 +103,12 @@ public class StreamReader
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
+
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
             return writer;
-        }
-        catch (Throwable e)
+        } catch (Throwable e)
         {
             writer.abort();
             drain(dis, in.getBytesRead());
@@ -115,13 +119,14 @@ public class StreamReader
         }
     }
 
-    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
+    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
     {
         Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
-        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
-        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt, sstableLevel);
+        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
+
+        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index f26e439..a6db58c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -28,8 +28,10 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 48b88c4..8bb34a6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -25,6 +25,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.messages.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 18058c1..a3dd10f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -23,7 +23,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 43bc26a..93903a7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -27,7 +27,7 @@ import java.util.Collection;
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.io.util.FileUtils;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0595e0c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -18,12 +18,15 @@
 package org.apache.cassandra.streaming.compress;
 
 import java.io.DataInputStream;
+
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +34,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
@@ -72,21 +74,25 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
+        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
-        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums());
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         try
         {
             for (Pair<Long, Long> section : sections)
             {
-                long length = section.right - section.left;
+                assert in.getBytesRead() < totalSize;
+                int sectionLength = (int) (section.right - section.left);
+
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
-                while (in.getBytesRead() < length)
+
+                while (in.getBytesRead() < sectionLength)
                 {
                     writeRow(writer, in, cfs);
+
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 001c927..786ff23 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -25,7 +25,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 5e378bc..5266e45 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
@@ -43,6 +44,9 @@ public class FileMessageHeader
     public final int sequenceNumber;
     /** SSTable version */
     public final String version;
+
+    /** SSTable format **/
+    public final SSTableFormat.Type format;
     public final long estimatedKeys;
     public final List<Pair<Long, Long>> sections;
     public final CompressionInfo compressionInfo;
@@ -52,6 +56,7 @@ public class FileMessageHeader
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
+                             SSTableFormat.Type format,
                              long estimatedKeys,
                              List<Pair<Long, Long>> sections,
                              CompressionInfo compressionInfo,
@@ -61,6 +66,7 @@ public class FileMessageHeader
         this.cfId = cfId;
         this.sequenceNumber = sequenceNumber;
         this.version = version;
+        this.format = format;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
         this.compressionInfo = compressionInfo;
@@ -95,6 +101,7 @@ public class FileMessageHeader
         sb.append("cfId: ").append(cfId);
         sb.append(", #").append(sequenceNumber);
         sb.append(", version: ").append(version);
+        sb.append(", format: ").append(format);
         sb.append(", estimated keys: ").append(estimatedKeys);
         sb.append(", transfer size: ").append(size());
         sb.append(", compressed?: ").append(compressionInfo != null);
@@ -128,8 +135,15 @@ public class FileMessageHeader
             UUIDSerializer.serializer.serialize(header.cfId, out, version);
             out.writeInt(header.sequenceNumber);
             out.writeUTF(header.version);
-            out.writeLong(header.estimatedKeys);
 
+            //We can't stream to a node that doesn't understand a new sstable format
+            if (version < StreamMessage.VERSION_30 && header.format != SSTableFormat.Type.LEGACY && header.format != SSTableFormat.Type.BIG)
+                throw new UnsupportedOperationException("Can't stream non-legacy sstables to nodes < 3.0");
+
+            if (version >= StreamMessage.VERSION_30)
+                out.writeUTF(header.format.name);
+
+            out.writeLong(header.estimatedKeys);
             out.writeInt(header.sections.size());
             for (Pair<Long, Long> section : header.sections)
             {
@@ -146,6 +160,11 @@ public class FileMessageHeader
             UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int sequenceNumber = in.readInt();
             String sstableVersion = in.readUTF();
+
+            SSTableFormat.Type format = SSTableFormat.Type.LEGACY;
+            if (version >= StreamMessage.VERSION_30)
+                format = SSTableFormat.Type.validate(in.readUTF());
+
             long estimatedKeys = in.readLong();
             int count = in.readInt();
             List<Pair<Long, Long>> sections = new ArrayList<>(count);
@@ -154,7 +173,7 @@ public class FileMessageHeader
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
             long repairedAt = in.readLong();
             int sstableLevel = in.readInt();
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel);
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, sstableLevel);
         }
 
         public long serializedSize(FileMessageHeader header, int version)
@@ -162,6 +181,10 @@ public class FileMessageHeader
             long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
             size += TypeSizes.NATIVE.sizeof(header.sequenceNumber);
             size += TypeSizes.NATIVE.sizeof(header.version);
+
+            if (version >= StreamMessage.VERSION_30)
+                size += TypeSizes.NATIVE.sizeof(header.format.name);
+
             size += TypeSizes.NATIVE.sizeof(header.estimatedKeys);
 
             size += TypeSizes.NATIVE.sizeof(header.sections.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 8569b88..237fb70 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 13af987..ed4c4ce 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
@@ -74,6 +74,7 @@ public class OutgoingFileMessage extends StreamMessage
         this.header = new FileMessageHeader(sstable.metadata.cfId,
                                             sequenceNumber,
                                             sstable.descriptor.version.toString(),
+                                            sstable.descriptor.formatType,
                                             estimatedKeys,
                                             sections,
                                             compressionInfo,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 372fdd3..20490db 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -32,7 +32,9 @@ import org.apache.cassandra.streaming.StreamSession;
 public abstract class StreamMessage
 {
     /** Streaming protocol version */
-    public static final int CURRENT_VERSION = 3;
+    public static final int VERSION_20 = 2;
+    public static final int VERSION_30 = 3;
+    public static final int CURRENT_VERSION = VERSION_30;
 
     public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 1febbe8..1ab56ce 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.*;
 
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -115,7 +117,7 @@ public class SSTableExport
     }
 
     /**
-     * Serialize a given cell to a List of Objects that jsonMapper knows how to turn into strings.  Format is
+     * Serialize a given cell to a List of Objects that jsonMapper knows how to turn into strings.  Type is
      *
      * human_readable_name, value, timestamp, [flag, [options]]
      *
@@ -318,10 +320,10 @@ public class SSTableExport
         Set<String> excludeSet = new HashSet<String>();
 
         if (excludes != null)
-            excludeSet = new HashSet<String>(Arrays.asList(excludes));
+            excludeSet = new HashSet<>(Arrays.asList(excludes));
 
         SSTableIdentityIterator row;
-        SSTableScanner scanner = reader.getScanner();
+        ICompactionScanner scanner = reader.getScanner();
         try
         {
             outs.println("[");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 05b9dcb..fc4470e 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -26,6 +26,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -42,7 +44,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -302,7 +303,7 @@ public class SSTableImport
         Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
 
         keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
 
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
@@ -375,7 +376,7 @@ public class SSTableImport
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
         parser = getParser(jsonFile); // renewing parser
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         int lineNumber = 1;
         DecoratedKey prevStoredKey = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
index b37d3b4..1f596ad 100644
--- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
@@ -77,7 +77,7 @@ public class SSTableRepairedAtSetter
         for (String fname: fileNames)
         {
             Descriptor descriptor = Descriptor.fromFilename(fname);
-            if (descriptor.version.hasRepairedAt)
+            if (descriptor.version.hasRepairedAt())
             {
                 if (setIsRepaired)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4dffa74..2c92e60 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tools;
 import java.io.File;
 import java.util.*;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -66,7 +67,7 @@ public class StandaloneScrubber
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
             Directories.SSTableLister lister = cfs.directories.sstableLister().skipTemporary(true);
 
-            List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+            List<SSTableReader> sstables = new ArrayList<>();
 
             // Scrub sstables
             for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index e078c3b..8c7e704 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tools;
 import java.io.File;
 import java.util.*;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -109,7 +110,7 @@ public class StandaloneSplitter
 
             String snapshotName = "pre-split-" + System.currentTimeMillis();
 
-            List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+            List<SSTableReader> sstables = new ArrayList<>();
             for (Map.Entry<Descriptor, Set<Component>> fn : parsedFilenames.entrySet())
             {
                 try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 1a8a89f..9aa02f2 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools;
 
 import java.util.*;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -61,7 +62,7 @@ public class StandaloneUpgrader
             else
                 lister.includeBackups(false);
 
-            Collection<SSTableReader> readers = new ArrayList<SSTableReader>();
+            Collection<SSTableReader> readers = new ArrayList<>();
 
             // Upgrade sstables
             for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -73,7 +74,7 @@ public class StandaloneUpgrader
                 try
                 {
                     SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs.metadata);
-                    if (sstable.descriptor.version.equals(Descriptor.Version.CURRENT))
+                    if (sstable.descriptor.version.equals(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion()))
                         continue;
                     readers.add(sstable);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
index 6385e5c..bee8ab0 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -47,17 +47,17 @@ public class EncodedDataInputStream extends AbstractDataInput implements DataInp
         return input.readByte() & 0xFF;
     }
 
-    protected void seek(long position)
+    public void seek(long position)
     {
         throw new UnsupportedOperationException();
     }
 
-    protected long getPosition()
+    public long getPosition()
     {
         throw new UnsupportedOperationException();
     }
 
-    protected long getPositionLimit()
+    public long getPositionLimit()
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 7bc8ef5..28ec975 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
@@ -32,7 +33,6 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -92,7 +92,7 @@ public class LongCompactionsTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
 
-        ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
+        ArrayList<SSTableReader> sstables = new ArrayList<>();
         for (int k = 0; k < sstableCount; k++)
         {
             SortedMap<String,ColumnFamily> rows = new TreeMap<String,ColumnFamily>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 159b641..0530d84 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -29,7 +30,6 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index fd53170..2d5e6fc 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -49,7 +49,7 @@ import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
index 3da4555..c1869b9 100644
--- a/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
+++ b/test/unit/org/apache/cassandra/cache/AutoSavingCacheTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cache;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -29,7 +30,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 213c5b8..21de96f 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 386be01..be64a6f 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -43,6 +43,9 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
@@ -74,12 +77,6 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.IncludingExcludingBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableSimpleWriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.SimpleStrategy;
@@ -1780,12 +1777,12 @@ public class ColumnFamilyStoreTest
             {
                 MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
                 collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
-                return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
-                                         0,
-                                         ActiveRepairService.UNREPAIRED_SSTABLE,
-                                         metadata,
-                                         StorageService.getPartitioner(),
-                                         collector);
+                return SSTableWriter.create(Descriptor.fromFilename(makeFilename(directory, metadata.ksName, metadata.cfName, DatabaseDescriptor.getSSTableFormat())),
+                        0L,
+                        ActiveRepairService.UNREPAIRED_SSTABLE,
+                        metadata,
+                        DatabaseDescriptor.getPartitioner(),
+                        collector);
             }
         };
         writer.newRow(key);
@@ -1837,12 +1834,12 @@ public class ColumnFamilyStoreTest
                 for (int ancestor : ancestors)
                     collector.addAncestor(ancestor);
                 String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
-                return new SSTableWriter(file,
-                                         0,
-                                         ActiveRepairService.UNREPAIRED_SSTABLE,
-                                         metadata,
-                                         StorageService.getPartitioner(),
-                                         collector);
+                return SSTableWriter.create(Descriptor.fromFilename(file),
+                        0L,
+                        ActiveRepairService.UNREPAIRED_SSTABLE,
+                        metadata,
+                        StorageService.getPartitioner(),
+                        collector);
             }
         };
         writer.newRow(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index ad3a6bc..629b414 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -25,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
@@ -179,7 +178,7 @@ public class KeyCacheTest
         for (SSTableReader reader : readers)
             reader.releaseReference();
 
-        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);;
+        Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
         while (StorageService.tasks.getActiveCount() + StorageService.tasks.getQueue().size() > 0);
 
         // after releasing the reference this should drop to 2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/KeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index 27a4e20..01038c4 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -26,6 +26,7 @@ import java.util.*;
 import java.io.IOException;
 
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,7 +50,6 @@ import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.expiringColumn;
 import static org.apache.cassandra.Util.cellname;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index 2a6b952..d2f63cc 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -52,12 +53,10 @@ import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import static org.apache.cassandra.Util.dk;
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 3282b0a..e880d95 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -39,10 +40,10 @@ public class RowIndexEntryTest extends SchemaLoader
     @Test
     public void testSerializedSize() throws IOException
     {
-        final RowIndexEntry simple = new RowIndexEntry(123);
+        final RowIndexEntry<IndexHelper.IndexInfo> simple = new RowIndexEntry<>(123);
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(new SimpleDenseCellNameType(UTF8Type.instance));
+        RowIndexEntry.Serializer serializer = new RowIndexEntry.Serializer(new IndexHelper.IndexInfo.Serializer(new SimpleDenseCellNameType(UTF8Type.instance)));
 
         serializer.serialize(simple, buffer);
 
@@ -70,7 +71,7 @@ public class RowIndexEntryTest extends SchemaLoader
 
         }}.build();
 
-        RowIndexEntry withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
+        RowIndexEntry<IndexHelper.IndexInfo> withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
 
         serializer.serialize(withIndex, buffer);
         Assert.assertEquals(buffer.getLength(), serializer.serializedSize(withIndex));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 24a6787..814b498 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.db.marshal.CounterColumnType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang3.StringUtils;
@@ -56,7 +58,6 @@ import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
@@ -230,7 +231,7 @@ public class ScrubTest
         assert root != null;
         File rootDir = new File(root);
         assert rootDir.isDirectory();
-        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL);
+        Descriptor desc = new Descriptor("jb", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY);
         CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index cd5dc7f..e6e5d55 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,6 +29,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Test;
+
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.KSMetaData;
@@ -41,9 +49,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -110,7 +115,7 @@ public class AntiCompactionTest
         int nonRepairedKeys = 0;
         for (SSTableReader sstable : store.getSSTables())
         {
-            SSTableScanner scanner = sstable.getScanner();
+            ICompactionScanner scanner = sstable.getScanner();
             while (scanner.hasNext())
             {
                 SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
@@ -187,7 +192,7 @@ public class AntiCompactionTest
         int nonRepairedKeys = 0;
         for (SSTableReader sstable : store.getSSTables())
         {
-            SSTableScanner scanner = sstable.getScanner();
+            ICompactionScanner scanner = sstable.getScanner();
             while (scanner.hasNext())
             {
                 SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index 85f5330..1116c9e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -34,7 +35,6 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0368e97e/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 00923b1..5f87ff3 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,7 +36,6 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.Util;
 
 import static org.junit.Assert.assertEquals;


Mime
View raw message