cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/2] git commit: SSTable metadata(Stats.db) format change
Date Fri, 13 Dec 2013 22:52:55 GMT
SSTable metadata(Stats.db) format change

patch by yukim; reviewed by thobbs for CASSANDRA-6356


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74bf5aa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74bf5aa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74bf5aa1

Branch: refs/heads/trunk
Commit: 74bf5aa16e7080360febca1745307a4d7ced32dc
Parents: 84d85ee
Author: Yuki Morishita <yukim@apache.org>
Authored: Fri Dec 13 16:33:26 2013 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Fri Dec 13 16:33:26 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  14 +-
 .../org/apache/cassandra/db/DataTracker.java    |   9 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   4 +-
 .../cassandra/db/commitlog/ReplayPosition.java  |   4 +-
 .../db/compaction/CompactionManager.java        |   3 +-
 .../cassandra/db/compaction/CompactionTask.java |   3 +-
 .../db/compaction/LeveledManifest.java          |  31 +-
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../io/compress/CompressedSequentialWriter.java |   9 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   3 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  50 +-
 .../cassandra/io/sstable/IndexSummary.java      |   9 +-
 .../cassandra/io/sstable/SSTableMetadata.java   | 518 -------------------
 .../cassandra/io/sstable/SSTableReader.java     | 183 ++++---
 .../cassandra/io/sstable/SSTableWriter.java     |  32 +-
 .../io/sstable/metadata/CompactionMetadata.java |  93 ++++
 .../metadata/IMetadataComponentSerializer.java  |  58 +++
 .../sstable/metadata/IMetadataSerializer.java   |  68 +++
 .../metadata/LegacyMetadataSerializer.java      | 156 ++++++
 .../io/sstable/metadata/MetadataCollector.java  | 220 ++++++++
 .../io/sstable/metadata/MetadataComponent.java  |  34 ++
 .../io/sstable/metadata/MetadataSerializer.java | 144 ++++++
 .../io/sstable/metadata/MetadataType.java       |  38 ++
 .../io/sstable/metadata/StatsMetadata.java      | 253 +++++++++
 .../io/sstable/metadata/ValidationMetadata.java |  91 ++++
 .../cassandra/metrics/ColumnFamilyMetrics.java  |   6 +-
 .../cassandra/tools/SSTableLevelResetter.java   |  11 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |  34 +-
 .../cassandra/utils/EstimatedHistogram.java     |  14 +-
 .../cassandra/utils/StreamingHistogram.java     |  20 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   3 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../CompressedRandomAccessReaderTest.java       |   6 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |   6 +-
 .../sstable/SSTableMetadataSerializerTest.java  |  90 ----
 .../metadata/MetadataSerializerTest.java        |  88 ++++
 .../compress/CompressedInputStreamTest.java     |   4 +-
 38 files changed, 1505 insertions(+), 810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e00201..4c74ea9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
  * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
  * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
  * Secondary index support for collections (CASSANDRA-4511)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 
 
 2.0.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4e54af0..a98e30b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -62,6 +62,8 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.service.CacheService;
@@ -486,7 +488,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Directories directories = Directories.create(keyspace, columnfamily);
 
         // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
+        Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
         if (!allGenerations.containsAll(unfinishedGenerations))
@@ -497,7 +499,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // remove new sstables from compactions that didn't complete, and compute
         // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
+        Set<Integer> completedAncestors = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
@@ -506,7 +508,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             Set<Integer> ancestors;
             try
             {
-                ancestors = SSTableMetadata.serializer.deserialize(desc).right;
+                CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
+                ancestors = compactionMetadata.ancestors;
             }
             catch (IOException e)
             {
@@ -595,10 +598,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 if (new File(descriptor.filenameFor(Component.STATS)).exists())
-                {
-                    Pair<SSTableMetadata, Set<Integer>> oldMetadata = SSTableMetadata.serializer.deserialize(descriptor);
-                    LeveledManifest.mutateLevel(oldMetadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
-                }
+                    descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 64b088d..692c5c5 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StorageMetrics;
@@ -460,11 +459,7 @@ public class DataTracker
             allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
             allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
         }
-        if (allColumns > 0)
-        {
-            return allDroppable / allColumns;
-        }
-        return 0;
+        return allColumns > 0 ? allDroppable / allColumns : 0;
     }
 
     public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
@@ -630,4 +625,4 @@ public class DataTracker
             return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 064851a..785f0c2 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.utils.Allocator;
 import org.github.jamm.MemoryMeter;
@@ -381,7 +381,7 @@ public class Memtable
 
         public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.metadata.comparator).replayPosition(context.get());
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context.get());
             return new SSTableWriter(filename,
                                      rows.size(),
                                      cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 354444b..fb78ed3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -130,9 +130,9 @@ public class ReplayPosition implements Comparable<ReplayPosition>
             return new ReplayPosition(in.readLong(), in.readInt());
         }
 
-        public long serializedSize(ReplayPosition object, TypeSizes typeSizes)
+        public long serializedSize(ReplayPosition rp, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            return typeSizes.sizeof(rp.segment) + typeSizes.sizeof(rp.position);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5bcaca9..2090b6f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -738,7 +739,7 @@ public class CompactionManager implements CompactionManagerMBean
                                  expectedBloomFilterSize,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
+                                 new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f4cc500..59f2f2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public class CompactionTask extends AbstractCompactionTask
@@ -281,7 +282,7 @@ public class CompactionTask extends AbstractCompactionTask
                                  keysPerSSTable,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
+                                 new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
     }
 
     protected int getLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 232d1f7..2ec42e4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -188,7 +188,7 @@ public class LeveledManifest
         String metaDataFile = sstable.descriptor.filenameFor(Component.STATS);
         try
         {
-            mutateLevel(Pair.create(sstable.getSSTableMetadata(), sstable.getAncestors()), sstable.descriptor, metaDataFile, 0);
+            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
             sstable.reloadSSTableMetadata();
             add(sstable);
         }
@@ -571,33 +571,4 @@ public class LeveledManifest
         return newLevel;
 
     }
-
-    /**
-     * Scary method mutating existing sstable component
-     *
-     * Tries to do it safely by moving the new file on top of the old one
-     *
-     * Caller needs to reload the sstable metadata (sstableReader.reloadSSTableMetadata())
-     *
-     * @see org.apache.cassandra.io.sstable.SSTableReader#reloadSSTableMetadata()
-     *
-     * @param oldMetadata
-     * @param descriptor
-     * @param filename
-     * @param level
-     * @throws IOException
-     */
-    public static synchronized void mutateLevel(Pair<SSTableMetadata, Set<Integer>> oldMetadata, Descriptor descriptor, String filename, int level) throws IOException
-    {
-        logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), level);
-        SSTableMetadata metadata = SSTableMetadata.copyWithNewSSTableLevel(oldMetadata.left, level);
-        DataOutputStream out = new DataOutputStream(new FileOutputStream(filename + "-tmp"));
-        SSTableMetadata.serializer.legacySerialize(metadata, oldMetadata.right, descriptor, out);
-        out.flush();
-        out.close();
-        // we cant move a file on top of another file in windows:
-        if (!FBUtilities.isUnix())
-            FileUtils.delete(filename);
-        FileUtils.renameWithConfirm(filename + "-tmp", filename);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 383ff00..ef881c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -62,7 +63,7 @@ public class Upgrader
 
     private SSTableWriter createCompactionWriter()
     {
-        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.getComparator());
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
 
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 54b990f..99bbd85 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -21,13 +21,12 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.zip.Adler32;
-import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 
@@ -37,7 +36,7 @@ public class CompressedSequentialWriter extends SequentialWriter
                                         String indexFilePath,
                                         boolean skipIOCache,
                                         CompressionParameters parameters,
-                                        Collector sstableMetadataCollector)
+                                        MetadataCollector sstableMetadataCollector)
     {
         return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
     }
@@ -60,13 +59,13 @@ public class CompressedSequentialWriter extends SequentialWriter
 
     private long originalSize = 0, compressedSize = 0;
 
-    private final Collector sstableMetadataCollector;
+    private final MetadataCollector sstableMetadataCollector;
 
     public CompressedSequentialWriter(File file,
                                       String indexFilePath,
                                       boolean skipIOCache,
                                       CompressionParameters parameters,
-                                      Collector sstableMetadataCollector)
+                                      MetadataCollector sstableMetadataCollector)
     {
         super(file, parameters.chunkLength(), skipIOCache);
         this.compressor = parameters.sstableCompressor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0059fda..6018369 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
 
@@ -56,7 +57,7 @@ public abstract class AbstractSSTableSimpleWriter
             0, // We don't care about the bloom filter
             metadata,
             DatabaseDescriptor.getPartitioner(),
-            SSTableMetadata.createCollector(metadata.comparator));
+            new MetadataCollector(metadata.comparator));
     }
 
     // find available generation and pick up filename from that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fef6a1e..6f84296 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -22,6 +22,9 @@ import java.util.StringTokenizer;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
@@ -43,8 +46,8 @@ public class Descriptor
     // we always incremented the major version.
     public static class Version
     {
-        // This needs to be at the beginning for initialization sake
-        public static final String current_version = "jc";
+        // This needs to be at the begining for initialization sake
+        public static final String current_version = "ka";
 
         // ic (1.2.5): omits per-row bloom filter of column names
         // ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -57,7 +60,8 @@ public class Descriptor
         //             tracks max/min column values (according to comparator)
         // jb (2.0.1): switch from crc32 to adler32 for compression checksums
         //             checksum the compressed data
-        // jc (2.1.0): index summaries can be downsampled and the sampling level is persisted
+        // ka (2.1.0): new Statistics.db file format
+        //             index summaries can be downsampled and the sampling level is persisted
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -72,6 +76,7 @@ public class Descriptor
         public final boolean tracksMaxMinColumnNames;
         public final boolean hasPostCompressionAdlerChecksums;
         public final boolean hasSamplingLevel;
+        public final boolean newStatsFile;
 
         public Version(String version)
         {
@@ -84,7 +89,8 @@ public class Descriptor
             hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
             tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
             hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
-            hasSamplingLevel = version.compareTo("jc") >= 0;
+            hasSamplingLevel = version.compareTo("ka") >= 0;
+            newStatsFile = version.compareTo("ka") >= 0;
         }
 
         /**
@@ -102,11 +108,6 @@ public class Descriptor
             return version.compareTo("ic") >= 0 && version.charAt(0) <= CURRENT.version.charAt(0);
         }
 
-        public boolean isStreamCompatible()
-        {
-            return isCompatible() && version.charAt(0) >= 'j';
-        }
-
         @Override
         public String toString()
         {
@@ -116,11 +117,7 @@ public class Descriptor
         @Override
         public boolean equals(Object o)
         {
-            if (o == this)
-                return true;
-            if (!(o instanceof Version))
-                return false;
-            return version.equals(((Version)o).version);
+            return o == this || o instanceof Version && version.equals(((Version) o).version);
         }
 
         @Override
@@ -256,23 +253,20 @@ public class Descriptor
         return new Descriptor(version, directory, ksname, cfname, generation, temporary);
     }
 
-    /**
-     * @return true if the current Cassandra version can read the given sstable version
-     */
-    public boolean isCompatible()
+    public IMetadataSerializer getMetadataSerializer()
     {
-        return version.isCompatible();
+        if (version.newStatsFile)
+            return new MetadataSerializer();
+        else
+            return new LegacyMetadataSerializer();
     }
 
     /**
-     * @return true if the current Cassandra version can stream the given sstable version
-     * from another node.  This is stricter than opening it locally [isCompatible] because
-     * streaming needs to rebuild all the non-data components, and it only knows how to write
-     * the latest version.
+     * @return true if the current Cassandra version can read the given sstable version
      */
-    public boolean isStreamCompatible()
+    public boolean isCompatible()
     {
-        return version.isStreamCompatible();
+        return version.isCompatible();
     }
 
     @Override
@@ -289,7 +283,11 @@ public class Descriptor
         if (!(o instanceof Descriptor))
             return false;
         Descriptor that = (Descriptor)o;
-        return that.directory.equals(this.directory) && that.generation == this.generation && that.ksname.equals(this.ksname) && that.cfname.equals(this.cfname) && that.temporary == this.temporary;
+        return that.directory.equals(this.directory)
+                       && that.generation == this.generation
+                       && that.ksname.equals(this.ksname)
+                       && that.cfname.equals(this.cfname)
+                       && that.temporary == this.temporary;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 4fc4737..6d8712a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -202,9 +202,14 @@ public class IndexSummary implements Closeable
             FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
         }
 
-        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
+        public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedIndexInterval) throws IOException
         {
             int indexInterval = in.readInt();
+            if (indexInterval != expectedIndexInterval)
+            {
+                throw new IOException(String.format("Cannot read index summary because Index Interval changed from %d to %d.",
+                                                           indexInterval, expectedIndexInterval));
+            }
             int summarySize = in.readInt();
             long offheapSize = in.readLong();
             int samplingLevel, fullSamplingSummarySize;
@@ -229,4 +234,4 @@ public class IndexSummary implements Closeable
     {
         bytes.free();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
deleted file mode 100644
index 8ddfdd7..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.StreamingHistogram;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.EstimatedHistogram;
-
-/**
- * Metadata for a SSTable.
- * Metadata includes:
- *  - estimated row size histogram
- *  - estimated column count histogram
- *  - replay position
- *  - max column timestamp
- *  - max local deletion time
- *  - bloom filter fp chance
- *  - compression ratio
- *  - partitioner
- *  - generations of sstables from which this sstable was compacted, if any
- *  - tombstone drop time histogram
- *
- * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
- * or createDefaultInstance()
- */
-public class SSTableMetadata
-{
-    public static final double NO_BLOOM_FLITER_FP_CHANCE = -1.0;
-    public static final double NO_COMPRESSION_RATIO = -1.0;
-    public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
-
-    public final EstimatedHistogram estimatedRowSize;
-    public final EstimatedHistogram estimatedColumnCount;
-    public final ReplayPosition replayPosition;
-    public final long minTimestamp;
-    public final long maxTimestamp;
-    public final int maxLocalDeletionTime;
-    public final double bloomFilterFPChance;
-    public final double compressionRatio;
-    public final String partitioner;
-    public final StreamingHistogram estimatedTombstoneDropTime;
-    public final int sstableLevel;
-    public final List<ByteBuffer> maxColumnNames;
-    public final List<ByteBuffer> minColumnNames;
-
-    private SSTableMetadata()
-    {
-        this(defaultRowSizeHistogram(),
-             defaultColumnCountHistogram(),
-             ReplayPosition.NONE,
-             Long.MIN_VALUE,
-             Long.MAX_VALUE,
-             Integer.MAX_VALUE,
-             NO_BLOOM_FLITER_FP_CHANCE,
-             NO_COMPRESSION_RATIO,
-             null,
-             defaultTombstoneDropTimeHistogram(),
-             0,
-             Collections.<ByteBuffer>emptyList(),
-             Collections.<ByteBuffer>emptyList());
-    }
-
-    private SSTableMetadata(EstimatedHistogram rowSizes,
-                            EstimatedHistogram columnCounts,
-                            ReplayPosition replayPosition,
-                            long minTimestamp,
-                            long maxTimestamp,
-                            int maxLocalDeletionTime,
-                            double bloomFilterFPChance,
-                            double compressionRatio,
-                            String partitioner,
-                            StreamingHistogram estimatedTombstoneDropTime,
-                            int sstableLevel,
-                            List<ByteBuffer> minColumnNames,
-                            List<ByteBuffer> maxColumnNames)
-    {
-        this.estimatedRowSize = rowSizes;
-        this.estimatedColumnCount = columnCounts;
-        this.replayPosition = replayPosition;
-        this.minTimestamp = minTimestamp;
-        this.maxTimestamp = maxTimestamp;
-        this.maxLocalDeletionTime = maxLocalDeletionTime;
-        this.bloomFilterFPChance = bloomFilterFPChance;
-        this.compressionRatio = compressionRatio;
-        this.partitioner = partitioner;
-        this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
-        this.sstableLevel = sstableLevel;
-        this.minColumnNames = minColumnNames;
-        this.maxColumnNames = maxColumnNames;
-    }
-
-    public static Collector createCollector(AbstractType<?> columnNameComparator)
-    {
-        return new Collector(columnNameComparator);
-    }
-
-    public static Collector createCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
-    {
-        Collector collector = new Collector(columnNameComparator);
-
-        collector.replayPosition(ReplayPosition.getReplayPosition(sstables));
-        collector.sstableLevel(level);
-        // Get the max timestamp of the precompacted sstables
-        // and adds generation of live ancestors
-        for (SSTableReader sstable : sstables)
-        {
-            collector.addAncestor(sstable.descriptor.generation);
-            for (Integer i : sstable.getAncestors())
-            {
-                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                    collector.addAncestor(i);
-            }
-        }
-
-        return collector;
-    }
-
-    /**
-     * Used when updating sstablemetadata files with an sstable level
-     * @param metadata
-     * @param sstableLevel
-     * @return
-     */
-    @Deprecated
-    public static SSTableMetadata copyWithNewSSTableLevel(SSTableMetadata metadata, int sstableLevel)
-    {
-        return new SSTableMetadata(metadata.estimatedRowSize,
-                                   metadata.estimatedColumnCount,
-                                   metadata.replayPosition,
-                                   metadata.minTimestamp,
-                                   metadata.maxTimestamp,
-                                   metadata.maxLocalDeletionTime,
-                                   metadata.bloomFilterFPChance,
-                                   metadata.compressionRatio,
-                                   metadata.partitioner,
-                                   metadata.estimatedTombstoneDropTime,
-                                   sstableLevel,
-                                   metadata.minColumnNames,
-                                   metadata.maxColumnNames);
-
-    }
-
-    static EstimatedHistogram defaultColumnCountHistogram()
-    {
-        // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
-        return new EstimatedHistogram(114);
-    }
-
-    static EstimatedHistogram defaultRowSizeHistogram()
-    {
-        // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
-        return new EstimatedHistogram(150);
-    }
-
-    static StreamingHistogram defaultTombstoneDropTimeHistogram()
-    {
-        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
-    }
-
-    /**
-     * @param gcBefore
-     * @return estimated droppable tombstone ratio at given gcBefore time.
-     */
-    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
-    {
-        long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count();
-        if (estimatedColumnCount > 0)
-        {
-            double droppable = getDroppableTombstonesBefore(gcBefore);
-            return droppable / estimatedColumnCount;
-        }
-        return 0.0f;
-    }
-
-    /**
-     * Get the amount of droppable tombstones
-     * @param gcBefore the gc time
-     * @return amount of droppable tombstones
-     */
-    public double getDroppableTombstonesBefore(int gcBefore)
-    {
-        return estimatedTombstoneDropTime.sum(gcBefore);
-    }
-
-    public static class Collector
-    {
-        protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
-        protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
-        protected ReplayPosition replayPosition = ReplayPosition.NONE;
-        protected long minTimestamp = Long.MAX_VALUE;
-        protected long maxTimestamp = Long.MIN_VALUE;
-        protected int maxLocalDeletionTime = Integer.MIN_VALUE;
-        protected double compressionRatio = NO_COMPRESSION_RATIO;
-        protected Set<Integer> ancestors = new HashSet<Integer>();
-        protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
-        protected int sstableLevel;
-        protected List<ByteBuffer> minColumnNames = Collections.emptyList();
-        protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
-        private final AbstractType<?> columnNameComparator;
-
-        private Collector(AbstractType<?> columnNameComparator)
-        {
-            this.columnNameComparator = columnNameComparator;
-        }
-        public void addRowSize(long rowSize)
-        {
-            estimatedRowSize.add(rowSize);
-        }
-
-        public void addColumnCount(long columnCount)
-        {
-            estimatedColumnCount.add(columnCount);
-        }
-
-        public void mergeTombstoneHistogram(StreamingHistogram histogram)
-        {
-            estimatedTombstoneDropTime.merge(histogram);
-        }
-
-        /**
-         * Ratio is compressed/uncompressed and it is
-         * if you have 1.x then compression isn't helping
-         */
-        public void addCompressionRatio(long compressed, long uncompressed)
-        {
-            compressionRatio = (double) compressed/uncompressed;
-        }
-
-        public void updateMinTimestamp(long potentialMin)
-        {
-            minTimestamp = Math.min(minTimestamp, potentialMin);
-        }
-
-        public void updateMaxTimestamp(long potentialMax)
-        {
-            maxTimestamp = Math.max(maxTimestamp, potentialMax);
-        }
-
-        public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
-        {
-            this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
-        }
-
-        public SSTableMetadata finalizeMetadata(String partitioner, double bloomFilterFPChance)
-        {
-            return new SSTableMetadata(estimatedRowSize,
-                                       estimatedColumnCount,
-                                       replayPosition,
-                                       minTimestamp,
-                                       maxTimestamp,
-                                       maxLocalDeletionTime,
-                                       bloomFilterFPChance,
-                                       compressionRatio,
-                                       partitioner,
-                                       estimatedTombstoneDropTime,
-                                       sstableLevel,
-                                       minColumnNames,
-                                       maxColumnNames);
-        }
-
-        public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
-        {
-            this.estimatedRowSize = estimatedRowSize;
-            return this;
-        }
-
-        public Collector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
-        {
-            this.estimatedColumnCount = estimatedColumnCount;
-            return this;
-        }
-
-        public Collector replayPosition(ReplayPosition replayPosition)
-        {
-            this.replayPosition = replayPosition;
-            return this;
-        }
-
-        public Collector addAncestor(int generation)
-        {
-            this.ancestors.add(generation);
-            return this;
-        }
-
-        void update(long size, ColumnStats stats)
-        {
-            updateMinTimestamp(stats.minTimestamp);
-            /*
-             * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE),
-             * to avoid deserializing an EchoedRow.
-             * This is the reason why it is collected first when calling ColumnFamilyStore.createCompactionWriter
-             * However, for old sstables without timestamp, we still want to update the timestamp (and we know
-             * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true).
-            */
-            updateMaxTimestamp(stats.maxTimestamp);
-            updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
-            addRowSize(size);
-            addColumnCount(stats.columnCount);
-            mergeTombstoneHistogram(stats.tombstoneHistogram);
-            updateMinColumnNames(stats.minColumnNames);
-            updateMaxColumnNames(stats.maxColumnNames);
-        }
-
-        public Collector sstableLevel(int sstableLevel)
-        {
-            this.sstableLevel = sstableLevel;
-            return this;
-        }
-
-        public Collector updateMinColumnNames(List<ByteBuffer> minColumnNames)
-        {
-            if (minColumnNames.size() > 0)
-                this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
-            return this;
-        }
-
-        public Collector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
-        {
-            if (maxColumnNames.size() > 0)
-                this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
-            return this;
-        }
-    }
-
-    public static class SSTableMetadataSerializer
-    {
-        private static final Logger logger = LoggerFactory.getLogger(SSTableMetadataSerializer.class);
-
-        public void serialize(SSTableMetadata sstableStats, Set<Integer> ancestors, DataOutput out) throws IOException
-        {
-            assert sstableStats.partitioner != null;
-
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
-            out.writeLong(sstableStats.minTimestamp);
-            out.writeLong(sstableStats.maxTimestamp);
-            out.writeInt(sstableStats.maxLocalDeletionTime);
-            out.writeDouble(sstableStats.bloomFilterFPChance);
-            out.writeDouble(sstableStats.compressionRatio);
-            out.writeUTF(sstableStats.partitioner);
-            out.writeInt(ancestors.size());
-            for (Integer g : ancestors)
-                out.writeInt(g);
-            StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
-            out.writeInt(sstableStats.sstableLevel);
-            serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
-        }
-
-        private void serializeMinMaxColumnNames(List<ByteBuffer> minColNames, List<ByteBuffer> maxColNames, DataOutput out) throws IOException
-        {
-            out.writeInt(minColNames.size());
-            for (ByteBuffer columnName : minColNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
-            out.writeInt(maxColNames.size());
-            for (ByteBuffer columnName : maxColNames)
-                ByteBufferUtil.writeWithShortLength(columnName, out);
-        }
-        /**
-         * Used to serialize to an old version - needed to be able to update sstable level without a full compaction.
-         *
-         * @deprecated will be removed when it is assumed that the minimum upgrade-from-version is the version that this
-         * patch made it into
-         *
-         * @param sstableStats
-         * @param legacyDesc
-         * @param out
-         * @throws IOException
-         */
-        @Deprecated
-        public void legacySerialize(SSTableMetadata sstableStats, Set<Integer> ancestors, Descriptor legacyDesc, DataOutput out) throws IOException
-        {
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
-            EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
-            out.writeLong(sstableStats.minTimestamp);
-            out.writeLong(sstableStats.maxTimestamp);
-            if (legacyDesc.version.tracksMaxLocalDeletionTime)
-                out.writeInt(sstableStats.maxLocalDeletionTime);
-            if (legacyDesc.version.hasBloomFilterFPChance)
-                out.writeDouble(sstableStats.bloomFilterFPChance);
-            out.writeDouble(sstableStats.compressionRatio);
-            out.writeUTF(sstableStats.partitioner);
-            out.writeInt(ancestors.size());
-            for (Integer g : ancestors)
-                out.writeInt(g);
-            StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
-            out.writeInt(sstableStats.sstableLevel);
-            if (legacyDesc.version.tracksMaxMinColumnNames)
-                serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
-        }
-
-        /**
-         * deserializes the metadata
-         *
-         * returns a pair containing the part of the metadata meant to be kept-in memory and the part
-         * that should not.
-         *
-         * @param descriptor the descriptor
-         * @return a pair containing data that needs to be in memory and data that is potentially big and is not needed
-         *         in memory
-         * @throws IOException
-         */
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor) throws IOException
-        {
-            return deserialize(descriptor, true);
-        }
-
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
-        {
-            logger.debug("Load metadata for {}", descriptor);
-            File statsFile = new File(descriptor.filenameFor(Component.STATS));
-            if (!statsFile.exists())
-            {
-                logger.debug("No sstable stats for {}", descriptor);
-                return Pair.create(new SSTableMetadata(), Collections.<Integer>emptySet());
-            }
-
-            DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
-            try
-            {
-                return deserialize(in, descriptor, loadSSTableLevel);
-            }
-            finally
-            {
-                FileUtils.closeQuietly(in);
-            }
-        }
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc) throws IOException
-        {
-            return deserialize(in, desc, true);
-        }
-
-        public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc, boolean loadSSTableLevel) throws IOException
-        {
-            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
-            EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
-            long minTimestamp = in.readLong();
-            long maxTimestamp = in.readLong();
-            int maxLocalDeletionTime = desc.version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
-            double bloomFilterFPChance = desc.version.hasBloomFilterFPChance ? in.readDouble() : NO_BLOOM_FLITER_FP_CHANCE;
-            double compressionRatio = in.readDouble();
-            String partitioner = in.readUTF();
-            int nbAncestors = in.readInt();
-            Set<Integer> ancestors = new HashSet<Integer>(nbAncestors);
-            for (int i = 0; i < nbAncestors; i++)
-                ancestors.add(in.readInt());
-            StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
-            int sstableLevel = 0;
-
-            if (loadSSTableLevel && in.available() > 0)
-                sstableLevel = in.readInt();
-
-            List<ByteBuffer> minColumnNames;
-            List<ByteBuffer> maxColumnNames;
-            if (desc.version.tracksMaxMinColumnNames)
-            {
-                int colCount = in.readInt();
-                minColumnNames = new ArrayList<ByteBuffer>(colCount);
-                for (int i = 0; i < colCount; i++)
-                {
-                    minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-                }
-                colCount = in.readInt();
-                maxColumnNames = new ArrayList<ByteBuffer>(colCount);
-                for (int i = 0; i < colCount; i++)
-                {
-                    maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
-                }
-            }
-            else
-            {
-                minColumnNames = Collections.emptyList();
-                maxColumnNames = Collections.emptyList();
-            }
-            return Pair.create(new SSTableMetadata(rowSizes,
-                                       columnCounts,
-                                       replayPosition,
-                                       minTimestamp,
-                                       maxTimestamp,
-                                       maxLocalDeletionTime,
-                                       bloomFilterFPChance,
-                                       compressionRatio,
-                                       partitioner,
-                                       tombstoneHistogram,
-                                       sstableLevel,
-                                       minColumnNames,
-                                       maxColumnNames), ancestors);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f82af1f..055f4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -46,9 +46,11 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.service.CacheService;
@@ -123,7 +125,7 @@ public class SSTableReader extends SSTable implements Closeable
 
     private final SSTableDeletingTask deletingTask;
     // not final since we need to be able to change level on a file.
-    private volatile SSTableMetadata sstableMetadata;
+    private volatile StatsMetadata sstableMetadata;
 
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
@@ -173,38 +175,68 @@ public class SSTableReader extends SSTable implements Closeable
         return open(desc, componentsFor(desc), metadata, p);
     }
 
+    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+    {
+        return open(descriptor, components, metadata, partitioner, true);
+    }
+
     public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
     {
         return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
     }
 
+    /**
+     * Open SSTable reader to be used in batch mode(such as sstableloader).
+     *
+     * @param descriptor
+     * @param components
+     * @param metadata
+     * @param partitioner
+     * @return opened SSTableReader
+     * @throws IOException
+     */
     public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
     {
-        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+                                                                                                               EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+                                              descriptor, validationMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
+
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
                                                   System.currentTimeMillis(),
-                                                  sstableMetadata);
+                                                  statsMetadata);
 
         // special implementation of load to use non-pooled SegmentedFile builders
         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
         SegmentedFile.Builder dbuilder = sstable.compression
                                        ? new CompressedSegmentedFile.Builder()
                                        : new BufferedSegmentedFile.Builder();
-        if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
+        if (!sstable.loadSummary(ibuilder, dbuilder))
             sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
         sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
         sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
-
         sstable.bf = FilterFactory.AlwaysPresent;
-        return sstable;
-    }
 
-    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
-    {
-        return open(descriptor, components, metadata, partitioner, true);
+        return sstable;
     }
 
     private static SSTableReader open(Descriptor descriptor,
@@ -213,53 +245,48 @@ public class SSTableReader extends SSTable implements Closeable
                                       IPartitioner partitioner,
                                       boolean validate) throws IOException
     {
-        long start = System.nanoTime();
-        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+        // Minimum components without which we can't do anything
+        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+                                                                                                               EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+        // Check if sstable is created using same partitioner.
+        // Partitioner can be null, which indicates older version of sstable or no stats available.
+        // In that case, we skip the check.
+        String partitionerName = partitioner.getClass().getCanonicalName();
+        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+        {
+            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+                                              descriptor, validationMetadata.partitioner, partitionerName));
+            System.exit(1);
+        }
 
+        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
         SSTableReader sstable = new SSTableReader(descriptor,
                                                   components,
                                                   metadata,
                                                   partitioner,
                                                   System.currentTimeMillis(),
-                                                  sstableMetadata);
+                                                  statsMetadata);
 
-        sstable.load();
+        // load index and filter
+        long start = System.nanoTime();
+        sstable.load(validationMetadata);
+        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
         if (validate)
             sstable.validate();
 
-        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
         if (sstable.getKeyCache() != null)
             logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 
         return sstable;
     }
 
-    private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
-    {
-        assert partitioner != null;
-        // Minimum components without which we can't do anything
-        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
-        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
-        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
-
-        SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
-
-        // Check if sstable is created using same partitioner.
-        // Partitioner can be null, which indicates older version of sstable or no stats available.
-        // In that case, we skip the check.
-        String partitionerName = partitioner.getClass().getCanonicalName();
-        if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
-        {
-            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
-                                       descriptor, sstableMetadata.partitioner, partitionerName));
-            System.exit(1);
-        }
-        return sstableMetadata;
-    }
-
     public static void logOpenException(Descriptor descriptor, IOException e)
     {
         if (e instanceof FileNotFoundException)
@@ -323,7 +350,7 @@ public class SSTableReader extends SSTable implements Closeable
                                       IndexSummary isummary,
                                       IFilter bf,
                                       long maxDataAge,
-                                      SSTableMetadata sstableMetadata)
+                                      StatsMetadata sstableMetadata)
     {
         assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
         return new SSTableReader(desc,
@@ -343,7 +370,7 @@ public class SSTableReader extends SSTable implements Closeable
                           CFMetaData metadata,
                           IPartitioner partitioner,
                           long maxDataAge,
-                          SSTableMetadata sstableMetadata)
+                          StatsMetadata sstableMetadata)
     {
         super(desc, components, metadata, partitioner);
         this.sstableMetadata = sstableMetadata;
@@ -384,7 +411,7 @@ public class SSTableReader extends SSTable implements Closeable
                           IndexSummary indexSummary,
                           IFilter bloomFilter,
                           long maxDataAge,
-                          SSTableMetadata sstableMetadata)
+                          StatsMetadata sstableMetadata)
     {
         this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
 
@@ -436,7 +463,7 @@ public class SSTableReader extends SSTable implements Closeable
         keyCache = CacheService.instance.keyCache;
     }
 
-    private void load() throws IOException
+    private void load(ValidationMetadata validation) throws IOException
     {
         if (metadata.getBloomFilterFpChance() == 1.0)
         {
@@ -444,12 +471,12 @@ public class SSTableReader extends SSTable implements Closeable
             load(false, true);
             bf = FilterFactory.AlwaysPresent;
         }
-        else if (!components.contains(Component.FILTER))
+        else if (!components.contains(Component.FILTER) || validation == null)
         {
             // bf is enabled, but filter component is missing.
             load(true, true);
         }
-        else if (descriptor.version.hasBloomFilterFPChance && sstableMetadata.bloomFilterFPChance != metadata.getBloomFilterFpChance())
+        else if (descriptor.version.hasBloomFilterFPChance && validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
         {
             // bf fp chance in sstable metadata and it has changed since compaction.
             load(true, true);
@@ -462,7 +489,12 @@ public class SSTableReader extends SSTable implements Closeable
         }
     }
 
-    void loadBloomFilter() throws IOException
+    /**
+     * Load bloom filter from Filter.db file.
+     *
+     * @throws IOException
+     */
+    private void loadBloomFilter() throws IOException
     {
         DataInputStream stream = null;
         try
@@ -488,7 +520,7 @@ public class SSTableReader extends SSTable implements Closeable
                                          ? SegmentedFile.getCompressedBuilder()
                                          : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
-        boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
+        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
         if (recreateBloomFilter || !summaryLoaded)
             buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 
@@ -498,6 +530,15 @@ public class SSTableReader extends SSTable implements Closeable
             saveSummary(ibuilder, dbuilder);
     }
 
+    /**
+     * Build index summary(and optionally bloom filter) by reading through Index.db file.
+     *
+     * @param recreateBloomFilter true if recreate bloom filter
+     * @param ibuilder
+     * @param dbuilder
+     * @param summaryLoaded true if index summary is already loaded and not need to build again
+     * @throws IOException
+     */
     private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
     {
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
@@ -552,7 +593,17 @@ public class SSTableReader extends SSTable implements Closeable
         last = getMinimalKey(last);
     }
 
-    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
+    /**
+     * Load index summary from Summary.db file if it exists.
+     *
+     * if loaded index summary has different index interval from current value stored in schema,
+     * then Summary.db file will be deleted and this returns false to rebuild summary.
+     *
+     * @param ibuilder
+     * @param dbuilder
+     * @return true if index summary is loaded successfully from Summary.db file.
+     */
+    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (!descriptor.version.offHeapSummaries || !summariesFile.exists())
@@ -562,15 +613,7 @@ public class SSTableReader extends SSTable implements Closeable
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel);
-            if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
-            {
-                iStream.close();
-                logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.",
-                             toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval());
-                FileUtils.deleteWithConfirm(summariesFile);
-                return false;
-            }
+            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel, metadata.getIndexInterval());
             first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
@@ -578,9 +621,10 @@ public class SSTableReader extends SSTable implements Closeable
         }
         catch (IOException e)
         {
-            logger.debug("Cannot deserialize SSTable Summary: ", e);
+            logger.debug("Cannot deserialize SSTable {} Summary: {}", toString(), e.getMessage());
             // corrupted; delete it and fall back to creating a new summary
             FileUtils.closeQuietly(iStream);
+            // delete it and fall back to creating a new summary
             FileUtils.deleteWithConfirm(summariesFile);
             return false;
         }
@@ -592,6 +636,12 @@ public class SSTableReader extends SSTable implements Closeable
         return true;
     }
 
+    /**
+     * Save index summary to Summary.db file.
+     *
+     * @param ibuilder
+     * @param dbuilder
+     */
     public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
         saveSummary(ibuilder, dbuilder, indexSummary);
@@ -830,7 +880,7 @@ public class SSTableReader extends SSTable implements Closeable
     private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Integer,Integer>> positions = new ArrayList<Pair<Integer,Integer>>();
+        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 
         for (Range<Token> range : Range.normalize(ranges))
         {
@@ -864,7 +914,7 @@ public class SSTableReader extends SSTable implements Closeable
             if (left > right)
                 // empty range
                 continue;
-            positions.add(Pair.create(Integer.valueOf(left), Integer.valueOf(right)));
+            positions.add(Pair.create(left, right));
         }
         return positions;
     }
@@ -924,7 +974,7 @@ public class SSTableReader extends SSTable implements Closeable
     public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
-        List<Pair<Long,Long>> positions = new ArrayList<Pair<Long,Long>>();
+        List<Pair<Long,Long>> positions = new ArrayList<>();
         for (Range<Token> range : Range.normalize(ranges))
         {
             AbstractBounds<RowPosition> keyRange = range.toRowBounds();
@@ -941,7 +991,7 @@ public class SSTableReader extends SSTable implements Closeable
             if (left == right)
                 // empty range
                 continue;
-            positions.add(Pair.create(Long.valueOf(left), Long.valueOf(right)));
+            positions.add(Pair.create(left, right));
         }
         return positions;
     }
@@ -1481,7 +1531,8 @@ public class SSTableReader extends SSTable implements Closeable
     {
         try
         {
-            return SSTableMetadata.serializer.deserialize(descriptor).right;
+            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
+            return compactionMetadata.ancestors;
         }
         catch (IOException e)
         {
@@ -1506,10 +1557,10 @@ public class SSTableReader extends SSTable implements Closeable
      */
     public void reloadSSTableMetadata() throws IOException
     {
-        this.sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
     }
 
-    public SSTableMetadata getSSTableMetadata()
+    public StatsMetadata getSSTableMetadata()
     {
         return sstableMetadata;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5b02d37..60bb8d1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -33,6 +33,10 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -53,7 +57,7 @@ public class SSTableWriter extends SSTable
     private final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
-    private final SSTableMetadata.Collector sstableMetadataCollector;
+    private final MetadataCollector sstableMetadataCollector;
 
     public SSTableWriter(String filename, long keyCount)
     {
@@ -61,7 +65,7 @@ public class SSTableWriter extends SSTable
              keyCount,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
-             SSTableMetadata.createCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
+             new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -93,7 +97,7 @@ public class SSTableWriter extends SSTable
                          long keyCount,
                          CFMetaData metadata,
                          IPartitioner<?> partitioner,
-                         SSTableMetadata.Collector sstableMetadataCollector)
+                         MetadataCollector sstableMetadataCollector)
     {
         super(Descriptor.fromFilename(filename),
               components(metadata),
@@ -308,9 +312,9 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader closeAndOpenReader(long maxDataAge)
     {
-        Pair<Descriptor, SSTableMetadata> p = close();
+        Pair<Descriptor, StatsMetadata> p = close();
         Descriptor newdesc = p.left;
-        SSTableMetadata sstableMetadata = p.right;
+        StatsMetadata sstableMetadata = p.right;
 
         // finalize in-memory state for the reader
         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
@@ -335,36 +339,40 @@ public class SSTableWriter extends SSTable
     }
 
     // Close the writer and return the descriptor to the new sstable and it's metadata
-    public Pair<Descriptor, SSTableMetadata> close()
+    public Pair<Descriptor, StatsMetadata> close()
     {
         // index and filter
         iwriter.close();
         // main data, close will truncate if necessary
         dataFile.close();
         // write sstable statistics
-        SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+        Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
+                                                                                    partitioner.getClass().getCanonicalName(),
                                                                                     metadata.getBloomFilterFpChance());
-        writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
+        writeMetadata(descriptor, metadataComponents);
 
         // save the table of components
         SSTable.appendTOC(descriptor, components);
 
         // remove the 'tmp' marker from all components
-        return Pair.create(rename(descriptor, components), sstableMetadata);
+        return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
     }
 
-    private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata,  Set<Integer> ancestors)
+    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
         try
         {
-            SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream);
+            desc.getMetadataSerializer().serialize(components, out.stream);
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, out.getPath());
         }
-        out.close();
+        finally
+        {
+            out.close();
+        }
     }
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
new file mode 100644
index 0000000..fd0e626
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Compaction related SSTable metadata.
+ *
+ * Only loaded for <b>compacting</b> SSTables at the time of compaction.
+ */
+public class CompactionMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();
+
+    public final Set<Integer> ancestors;
+
+    public CompactionMetadata(Set<Integer> ancestors)
+    {
+        this.ancestors = ancestors;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.COMPACTION;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        CompactionMetadata that = (CompactionMetadata) o;
+        return ancestors == null ? that.ancestors == null : ancestors.equals(that.ancestors);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return ancestors != null ? ancestors.hashCode() : 0;
+    }
+
+    public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
+    {
+        public int serializedSize(CompactionMetadata component) throws IOException
+        {
+            int size = 0;
+            size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
+            for (int g : component.ancestors)
+                size += TypeSizes.NATIVE.sizeof(g);
+            return size;
+        }
+
+        public void serialize(CompactionMetadata component, DataOutput out) throws IOException
+        {
+            out.writeInt(component.ancestors.size());
+            for (int g : component.ancestors)
+                out.writeInt(g);
+        }
+
+        public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+            int nbAncestors = in.readInt();
+            Set<Integer> ancestors = new HashSet<>(nbAncestors);
+            for (int i = 0; i < nbAncestors; i++)
+                ancestors.add(in.readInt());
+            return new CompactionMetadata(ancestors);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
new file mode 100644
index 0000000..53ca138
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Metadata component serializer
+ */
+public interface IMetadataComponentSerializer<T extends MetadataComponent>
+{
+    /**
+     * Calculate and return serialized size.
+     *
+     * @param component MetadataComponent to calculate serialized size
+     * @return serialized size of this component
+     * @throws IOException
+     */
+    int serializedSize(T component) throws IOException;
+
+    /**
+     * Serialize metadata component to given output.
+     *
+     * @param component MetadataComponent to serialize
+     * @param out  serialize destination
+     * @throws IOException
+     */
+    void serialize(T component, DataOutput out) throws IOException;
+
+    /**
+     * Deserialize metadata component from given input.
+     *
+     * @param version serialize version
+     * @param in deserialize source
+     * @return Deserialized component
+     * @throws IOException
+     */
+    T deserialize(Descriptor.Version version, DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
new file mode 100644
index 0000000..0875e5d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Interface for SSTable metadata serializer
+ */
+public interface IMetadataSerializer
+{
+    /**
+     * Serialize given metadata components
+     *
+     * @param components Metadata components to serialize
+     * @throws IOException
+     */
+    void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException;
+
+    /**
+     * Deserialize specified metadata components from given descriptor.
+     *
+     * @param descriptor SSTable descriptor
+     * @return Deserialized metadata components, in deserialized order.
+     * @throws IOException
+     */
+    Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException;
+
+    /**
+     * Deserialized only metadata component specified from given descriptor.
+     *
+     * @param descriptor SSTable descriptor
+     * @param type Metadata component type to deserialize
+     * @return Deserialized metadata component. Can be null if specified type does not exist.
+     * @throws IOException
+     */
+    MetadataComponent deserialize(Descriptor descriptor, MetadataType type) throws IOException;
+
+    /**
+     * Mutate SSTable level
+     *
+     * @param descriptor SSTable descriptor
+     * @param newLevel new SSTable level
+     * @throws IOException
+     */
+    void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
+}


Mime
View raw message