cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/2] SSTable metadata(Stats.db) format change
Date Fri, 13 Dec 2013 22:52:54 GMT
Updated Branches:
  refs/heads/trunk 84d85ee70 -> 74bf5aa16


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
new file mode 100644
index 0000000..a691591
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+/**
+ * Serializer for SSTable from legacy versions
+ */
+@Deprecated
+public class LegacyMetadataSerializer extends MetadataSerializer
+{
+    public static final double NO_BLOOM_FILTER_FP_CHANCE = -1.0;
+
+    /**
+     * Legacy serialization is only used for SSTable level reset.
+     */
+    @Override
+    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException
+    {
+        ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION);
+        StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS);
+        CompactionMetadata compaction = (CompactionMetadata) components.get(MetadataType.COMPACTION);
+
+        assert validation != null && stats != null && compaction != null && validation.partitioner != null;
+
+        EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
+        EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
+        ReplayPosition.serializer.serialize(stats.replayPosition, out);
+        out.writeLong(stats.minTimestamp);
+        out.writeLong(stats.maxTimestamp);
+        out.writeInt(stats.maxLocalDeletionTime);
+        out.writeDouble(validation.bloomFilterFPChance);
+        out.writeDouble(stats.compressionRatio);
+        out.writeUTF(validation.partitioner);
+        out.writeInt(compaction.ancestors.size());
+        for (Integer g : compaction.ancestors)
+            out.writeInt(g);
+        StreamingHistogram.serializer.serialize(stats.estimatedTombstoneDropTime, out);
+        out.writeInt(stats.sstableLevel);
+        out.writeInt(stats.minColumnNames.size());
+        for (ByteBuffer columnName : stats.minColumnNames)
+            ByteBufferUtil.writeWithShortLength(columnName, out);
+        out.writeInt(stats.maxColumnNames.size());
+        for (ByteBuffer columnName : stats.maxColumnNames)
+            ByteBufferUtil.writeWithShortLength(columnName, out);
+    }
+
+    /**
+     * Legacy serializer deserialize all components no matter what types are specified.
+     */
+    @Override
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+
+        File statsFile = new File(descriptor.filenameFor(Component.STATS));
+        if (!statsFile.exists() && types.contains(MetadataType.STATS))
+        {
+            components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata());
+        }
+        else
+        {
+            try (DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile))))
+            {
+                EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+                EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
+                ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+                long minTimestamp = in.readLong();
+                long maxTimestamp = in.readLong();
+                int maxLocalDeletionTime = descriptor.version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
+                double bloomFilterFPChance = descriptor.version.hasBloomFilterFPChance ? in.readDouble() : NO_BLOOM_FILTER_FP_CHANCE;
+                double compressionRatio = in.readDouble();
+                String partitioner = in.readUTF();
+                int nbAncestors = in.readInt();
+                Set<Integer> ancestors = new HashSet<>(nbAncestors);
+                for (int i = 0; i < nbAncestors; i++)
+                    ancestors.add(in.readInt());
+                StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
+                int sstableLevel = 0;
+                if (in.available() > 0)
+                    sstableLevel = in.readInt();
+
+                List<ByteBuffer> minColumnNames;
+                List<ByteBuffer> maxColumnNames;
+                if (descriptor.version.tracksMaxMinColumnNames)
+                {
+                    int colCount = in.readInt();
+                    minColumnNames = new ArrayList<>(colCount);
+                    for (int i = 0; i < colCount; i++)
+                    {
+                        minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                    }
+                    colCount = in.readInt();
+                    maxColumnNames = new ArrayList<>(colCount);
+                    for (int i = 0; i < colCount; i++)
+                    {
+                        maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                    }
+                }
+                else
+                {
+                    minColumnNames = Collections.emptyList();
+                    maxColumnNames = Collections.emptyList();
+                }
+
+                if (types.contains(MetadataType.VALIDATION))
+                    components.put(MetadataType.VALIDATION,
+                                   new ValidationMetadata(partitioner, bloomFilterFPChance));
+                if (types.contains(MetadataType.STATS))
+                    components.put(MetadataType.STATS,
+                                   new StatsMetadata(rowSizes,
+                                                     columnCounts,
+                                                     replayPosition,
+                                                     minTimestamp,
+                                                     maxTimestamp,
+                                                     maxLocalDeletionTime,
+                                                     compressionRatio,
+                                                     tombstoneHistogram,
+                                                     sstableLevel,
+                                                     minColumnNames,
+                                                     maxColumnNames));
+                if (types.contains(MetadataType.COMPACTION))
+                    components.put(MetadataType.COMPACTION,
+                                   new CompactionMetadata(ancestors));
+            }
+        }
+        return components;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
new file mode 100644
index 0000000..4b9329f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class MetadataCollector
+{
+    public static final double NO_COMPRESSION_RATIO = -1.0;
+
+    static EstimatedHistogram defaultColumnCountHistogram()
+    {
+        // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
+        return new EstimatedHistogram(114);
+    }
+
+    static EstimatedHistogram defaultRowSizeHistogram()
+    {
+        // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
+        return new EstimatedHistogram(150);
+    }
+
+    static StreamingHistogram defaultTombstoneDropTimeHistogram()
+    {
+        return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+    }
+
+    public static StatsMetadata defaultStatsMetadata()
+    {
+        return new StatsMetadata(defaultRowSizeHistogram(),
+                                 defaultColumnCountHistogram(),
+                                 ReplayPosition.NONE,
+                                 Long.MIN_VALUE,
+                                 Long.MAX_VALUE,
+                                 Integer.MAX_VALUE,
+                                 NO_COMPRESSION_RATIO,
+                                 defaultTombstoneDropTimeHistogram(),
+                                 0,
+                                 Collections.<ByteBuffer>emptyList(),
+                                 Collections.<ByteBuffer>emptyList());
+    }
+
+    protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
+    protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
+    protected ReplayPosition replayPosition = ReplayPosition.NONE;
+    protected long minTimestamp = Long.MAX_VALUE;
+    protected long maxTimestamp = Long.MIN_VALUE;
+    protected int maxLocalDeletionTime = Integer.MIN_VALUE;
+    protected double compressionRatio = NO_COMPRESSION_RATIO;
+    protected Set<Integer> ancestors = new HashSet<>();
+    protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
+    protected int sstableLevel;
+    protected List<ByteBuffer> minColumnNames = Collections.emptyList();
+    protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+    private final AbstractType<?> columnNameComparator;
+
+    public MetadataCollector(AbstractType<?> columnNameComparator)
+    {
+        this.columnNameComparator = columnNameComparator;
+    }
+
+    public MetadataCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
+    {
+        this(columnNameComparator);
+
+        replayPosition(ReplayPosition.getReplayPosition(sstables));
+        sstableLevel(level);
+        // Get the max timestamp of the precompacted sstables
+        // and adds generation of live ancestors
+        for (SSTableReader sstable : sstables)
+        {
+            addAncestor(sstable.descriptor.generation);
+            for (Integer i : sstable.getAncestors())
+            {
+                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                    addAncestor(i);
+            }
+        }
+    }
+
+    public void addRowSize(long rowSize)
+    {
+        estimatedRowSize.add(rowSize);
+    }
+
+    public void addColumnCount(long columnCount)
+    {
+        estimatedColumnCount.add(columnCount);
+    }
+
+    public void mergeTombstoneHistogram(StreamingHistogram histogram)
+    {
+        estimatedTombstoneDropTime.merge(histogram);
+    }
+
+    /**
+     * Ratio is compressed/uncompressed and it is
+     * if you have 1.x then compression isn't helping
+     */
+    public void addCompressionRatio(long compressed, long uncompressed)
+    {
+        compressionRatio = (double) compressed/uncompressed;
+    }
+
+    public void updateMinTimestamp(long potentialMin)
+    {
+        minTimestamp = Math.min(minTimestamp, potentialMin);
+    }
+
+    public void updateMaxTimestamp(long potentialMax)
+    {
+        maxTimestamp = Math.max(maxTimestamp, potentialMax);
+    }
+
+    public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+    {
+        this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+    }
+
+    public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
+    {
+        this.estimatedRowSize = estimatedRowSize;
+        return this;
+    }
+
+    public MetadataCollector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
+    {
+        this.estimatedColumnCount = estimatedColumnCount;
+        return this;
+    }
+
+    public MetadataCollector replayPosition(ReplayPosition replayPosition)
+    {
+        this.replayPosition = replayPosition;
+        return this;
+    }
+
+    public MetadataCollector addAncestor(int generation)
+    {
+        this.ancestors.add(generation);
+        return this;
+    }
+
+    public void update(long size, ColumnStats stats)
+    {
+        updateMinTimestamp(stats.minTimestamp);
+        updateMaxTimestamp(stats.maxTimestamp);
+        updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
+        addRowSize(size);
+        addColumnCount(stats.columnCount);
+        mergeTombstoneHistogram(stats.tombstoneHistogram);
+        updateMinColumnNames(stats.minColumnNames);
+        updateMaxColumnNames(stats.maxColumnNames);
+    }
+
+    public MetadataCollector sstableLevel(int sstableLevel)
+    {
+        this.sstableLevel = sstableLevel;
+        return this;
+    }
+
+    public MetadataCollector updateMinColumnNames(List<ByteBuffer> minColumnNames)
+    {
+        if (minColumnNames.size() > 0)
+            this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
+        return this;
+    }
+
+    public MetadataCollector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
+    {
+        if (maxColumnNames.size() > 0)
+            this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
+        return this;
+    }
+
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance)
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+        components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
+        components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
+                                                             estimatedColumnCount,
+                                                             replayPosition,
+                                                             minTimestamp,
+                                                             maxTimestamp,
+                                                             maxLocalDeletionTime,
+                                                             compressionRatio,
+                                                             estimatedTombstoneDropTime,
+                                                             sstableLevel,
+                                                             minColumnNames,
+                                                             maxColumnNames));
+        components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors));
+        return components;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
new file mode 100644
index 0000000..bf8a9af
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataComponent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+/**
+ * MetadataComponent is a component for SSTable metadata and serialized to Stats.db.
+ */
+public abstract class MetadataComponent implements Comparable<MetadataComponent>
+{
+    /**
+     * @return Metadata component type
+     */
+    public abstract MetadataType getType();
+
+    public int compareTo(MetadataComponent o)
+    {
+        return this.getType().compareTo(o.getType());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
new file mode 100644
index 0000000..d7962de
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.*;
+import java.util.*;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Metadata serializer for SSTables version >= 'k'.
+ *
+ * <pre>
+ * File format := | number of components (4 bytes) | toc | component1 | component2 | ... |
+ * toc         := | component type (4 bytes) | position of component |
+ * </pre>
+ *
+ * IMetadataComponent.Type's ordinal() defines the order of serialization.
+ */
+public class MetadataSerializer implements IMetadataSerializer
+{
+    private static final Logger logger = LoggerFactory.getLogger(MetadataSerializer.class);
+
+    public void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException
+    {
+        // sort components by type
+        List<MetadataComponent> sortedComponents = Lists.newArrayList(components.values());
+        Collections.sort(sortedComponents);
+
+        // write number of component
+        out.writeInt(components.size());
+        // build and write toc
+        int lastPosition = 4 + (8 * sortedComponents.size());
+        for (MetadataComponent component : sortedComponents)
+        {
+            MetadataType type = component.getType();
+            // serialize type
+            out.writeInt(type.ordinal());
+            // serialize position
+            out.writeInt(lastPosition);
+            lastPosition += type.serializer.serializedSize(component);
+        }
+        // serialize components
+        for (MetadataComponent component : sortedComponents)
+        {
+            component.getType().serializer.serialize(component, out);
+        }
+    }
+
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components;
+        logger.debug("Load metadata for {}", descriptor);
+        File statsFile = new File(descriptor.filenameFor(Component.STATS));
+        if (!statsFile.exists())
+        {
+            logger.debug("No sstable stats for {}", descriptor);
+            components = Maps.newHashMap();
+            components.put(MetadataType.STATS, MetadataCollector.defaultStatsMetadata());
+        }
+        else
+        {
+            try (RandomAccessReader r = RandomAccessReader.open(statsFile))
+            {
+                components = deserialize(descriptor, r, types);
+            }
+        }
+        return components;
+    }
+
+    public MetadataComponent deserialize(Descriptor descriptor, MetadataType type) throws IOException
+    {
+        return deserialize(descriptor, EnumSet.of(type)).get(type);
+    }
+
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, FileDataInput in, EnumSet<MetadataType> types) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
+        // read number of components
+        int numComponents = in.readInt();
+        // read toc
+        Map<MetadataType, Integer> toc = new HashMap<>(numComponents);
+        for (int i = 0; i < numComponents; i++)
+        {
+            toc.put(MetadataType.values()[in.readInt()], in.readInt());
+        }
+        for (MetadataType type : types)
+        {
+            MetadataComponent component = null;
+            if (toc.containsKey(type))
+            {
+                in.seek(toc.get(type));
+                component = type.serializer.deserialize(descriptor.version, in);
+            }
+            components.put(type, component);
+        }
+        return components;
+    }
+
+    public void mutateLevel(Descriptor descriptor, int newLevel) throws IOException
+    {
+        logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), newLevel);
+        Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+        StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+        // mutate level
+        currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+        Descriptor tmpDescriptor = descriptor.asTemporary(true);
+
+        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
+        {
+            serialize(currentComponents, out);
+            out.flush();
+        }
+        // we cant move a file on top of another file in windows:
+        if (!FBUtilities.isUnix())
+            FileUtils.delete(descriptor.filenameFor(Component.STATS));
+        FileUtils.renameWithConfirm(tmpDescriptor.filenameFor(Component.STATS), descriptor.filenameFor(Component.STATS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
new file mode 100644
index 0000000..9717da1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+/**
+ * Defines Metadata component type.
+ */
+public enum MetadataType
+{
+    /** Metadata only used for SSTable validation */
+    VALIDATION(ValidationMetadata.serializer),
+    /** Metadata only used at compaction */
+    COMPACTION(CompactionMetadata.serializer),
+    /** Metadata always keep in memory */
+    STATS(StatsMetadata.serializer);
+
+    public final IMetadataComponentSerializer<MetadataComponent> serializer;
+
+    private MetadataType(IMetadataComponentSerializer<MetadataComponent> serializer)
+    {
+        this.serializer = serializer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
new file mode 100644
index 0000000..8055c77
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+/**
+ * SSTable metadata that always stay on heap.
+ */
+public class StatsMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
+
+    public final EstimatedHistogram estimatedRowSize;
+    public final EstimatedHistogram estimatedColumnCount;
+    public final ReplayPosition replayPosition;
+    public final long minTimestamp;
+    public final long maxTimestamp;
+    public final int maxLocalDeletionTime;
+    public final double compressionRatio;
+    public final StreamingHistogram estimatedTombstoneDropTime;
+    public final int sstableLevel;
+    public final List<ByteBuffer> maxColumnNames;
+    public final List<ByteBuffer> minColumnNames;
+
+    public StatsMetadata(EstimatedHistogram estimatedRowSize,
+                         EstimatedHistogram estimatedColumnCount,
+                         ReplayPosition replayPosition,
+                         long minTimestamp,
+                         long maxTimestamp,
+                         int maxLocalDeletionTime,
+                         double compressionRatio,
+                         StreamingHistogram estimatedTombstoneDropTime,
+                         int sstableLevel,
+                         List<ByteBuffer> minColumnNames,
+                         List<ByteBuffer> maxColumnNames)
+    {
+        this.estimatedRowSize = estimatedRowSize;
+        this.estimatedColumnCount = estimatedColumnCount;
+        this.replayPosition = replayPosition;
+        this.minTimestamp = minTimestamp;
+        this.maxTimestamp = maxTimestamp;
+        this.maxLocalDeletionTime = maxLocalDeletionTime;
+        this.compressionRatio = compressionRatio;
+        this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
+        this.sstableLevel = sstableLevel;
+        this.minColumnNames = minColumnNames;
+        this.maxColumnNames = maxColumnNames;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.STATS;
+    }
+
+    /**
+     * @param gcBefore gc time in seconds
+     * @return estimated droppable tombstone ratio at given gcBefore time.
+     */
+    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
+    {
+        long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count();
+        if (estimatedColumnCount > 0)
+        {
+            double droppable = getDroppableTombstonesBefore(gcBefore);
+            return droppable / estimatedColumnCount;
+        }
+        return 0.0f;
+    }
+
+    /**
+     * @param gcBefore gc time in seconds
+     * @return amount of droppable tombstones
+     */
+    public double getDroppableTombstonesBefore(int gcBefore)
+    {
+        return estimatedTombstoneDropTime.sum(gcBefore);
+    }
+
+    public StatsMetadata mutateLevel(int newLevel)
+    {
+        return new StatsMetadata(estimatedRowSize,
+                                 estimatedColumnCount,
+                                 replayPosition,
+                                 minTimestamp,
+                                 maxTimestamp,
+                                 maxLocalDeletionTime,
+                                 compressionRatio,
+                                 estimatedTombstoneDropTime,
+                                 newLevel,
+                                 maxColumnNames,
+                                 minColumnNames);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        StatsMetadata that = (StatsMetadata) o;
+        return new EqualsBuilder()
+                       .append(estimatedRowSize, that.estimatedRowSize)
+                       .append(estimatedColumnCount, that.estimatedColumnCount)
+                       .append(replayPosition, that.replayPosition)
+                       .append(minTimestamp, that.minTimestamp)
+                       .append(maxTimestamp, that.maxTimestamp)
+                       .append(maxLocalDeletionTime, that.maxLocalDeletionTime)
+                       .append(compressionRatio, that.compressionRatio)
+                       .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
+                       .append(sstableLevel, that.sstableLevel)
+                       .append(maxColumnNames, that.maxColumnNames)
+                       .append(minColumnNames, that.minColumnNames)
+                       .build();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return new HashCodeBuilder()
+                       .append(estimatedRowSize)
+                       .append(estimatedColumnCount)
+                       .append(replayPosition)
+                       .append(minTimestamp)
+                       .append(maxTimestamp)
+                       .append(maxLocalDeletionTime)
+                       .append(compressionRatio)
+                       .append(estimatedTombstoneDropTime)
+                       .append(sstableLevel)
+                       .append(maxColumnNames)
+                       .append(minColumnNames)
+                       .build();
+    }
+
+    public static class StatsMetadataSerializer implements IMetadataComponentSerializer<StatsMetadata>
+    {
+        public int serializedSize(StatsMetadata component) throws IOException
+        {
+            int size = 0;
+            size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
+            size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
+            size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
+            size += 8 + 8 + 4 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double)
+            size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
+            size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
+            // min column names
+            size += 4;
+            for (ByteBuffer columnName : component.minColumnNames)
+                size += 2 + columnName.remaining(); // with short length
+            // max column names
+            size += 4;
+            for (ByteBuffer columnName : component.maxColumnNames)
+                size += 2 + columnName.remaining(); // with short length
+            return size;
+        }
+
+        public void serialize(StatsMetadata component, DataOutput out) throws IOException
+        {
+            EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
+            EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
+            ReplayPosition.serializer.serialize(component.replayPosition, out);
+            out.writeLong(component.minTimestamp);
+            out.writeLong(component.maxTimestamp);
+            out.writeInt(component.maxLocalDeletionTime);
+            out.writeDouble(component.compressionRatio);
+            StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
+            out.writeInt(component.sstableLevel);
+            out.writeInt(component.minColumnNames.size());
+            for (ByteBuffer columnName : component.minColumnNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+            out.writeInt(component.maxColumnNames.size());
+            for (ByteBuffer columnName : component.maxColumnNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+        }
+
+        public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+            EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
+            ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+            long minTimestamp = in.readLong();
+            long maxTimestamp = in.readLong();
+            int maxLocalDeletionTime = version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
+            double compressionRatio = in.readDouble();
+            StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
+            int sstableLevel = in.readInt();
+            List<ByteBuffer> minColumnNames;
+            List<ByteBuffer> maxColumnNames;
+            if (version.tracksMaxMinColumnNames)
+            {
+                int colCount = in.readInt();
+                minColumnNames = new ArrayList<>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+                colCount = in.readInt();
+                maxColumnNames = new ArrayList<>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+            }
+            else
+            {
+                minColumnNames = Collections.emptyList();
+                maxColumnNames = Collections.emptyList();
+            }
+            return new StatsMetadata(rowSizes,
+                                     columnCounts,
+                                     replayPosition,
+                                     minTimestamp,
+                                     maxTimestamp,
+                                     maxLocalDeletionTime,
+                                     compressionRatio,
+                                     tombstoneHistogram,
+                                     sstableLevel,
+                                     minColumnNames,
+                                     maxColumnNames);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
new file mode 100644
index 0000000..c2722f5
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/ValidationMetadata.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * SSTable metadata component used only for validating SSTable.
+ *
+ * This part is read before opening main Data.db file for validation
+ * and discarded immediately after that.
+ */
+public class ValidationMetadata extends MetadataComponent
+{
+    public static final IMetadataComponentSerializer serializer = new ValidationMetadataSerializer();
+
+    public final String partitioner;
+    public final double bloomFilterFPChance;
+
+    public ValidationMetadata(String partitioner, double bloomFilterFPChance)
+    {
+        this.partitioner = partitioner;
+        this.bloomFilterFPChance = bloomFilterFPChance;
+    }
+
+    public MetadataType getType()
+    {
+        return MetadataType.VALIDATION;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ValidationMetadata that = (ValidationMetadata) o;
+        return Double.compare(that.bloomFilterFPChance, bloomFilterFPChance) == 0 && partitioner.equals(that.partitioner);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result;
+        long temp;
+        result = partitioner.hashCode();
+        temp = Double.doubleToLongBits(bloomFilterFPChance);
+        result = 31 * result + (int) (temp ^ (temp >>> 32));
+        return result;
+    }
+
+    public static class ValidationMetadataSerializer implements IMetadataComponentSerializer<ValidationMetadata>
+    {
+        public int serializedSize(ValidationMetadata component) throws IOException
+        {
+            return TypeSizes.NATIVE.sizeof(component.partitioner) + 8;
+        }
+
+        public void serialize(ValidationMetadata component, DataOutput out) throws IOException
+        {
+            out.writeUTF(component.partitioner);
+            out.writeDouble(component.bloomFilterFPChance);
+        }
+
+        public ValidationMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+        {
+
+            return new ValidationMetadata(in.readUTF(), in.readDouble());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index b61cb57..bb5d465 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -25,8 +25,8 @@ import com.yammer.metrics.util.RatioGauge;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
 /**
@@ -166,13 +166,13 @@ public class ColumnFamilyMetrics
                 int total = 0;
                 for (SSTableReader sstable : cfs.getSSTables())
                 {
-                    if (sstable.getCompressionRatio() != SSTableMetadata.NO_COMPRESSION_RATIO)
+                    if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO)
                     {
                         sum += sstable.getCompressionRatio();
                         total++;
                     }
                 }
-                return total != 0 ? (double) sum / total : 0;
+                return total != 0 ? sum / total : 0;
             }
         });
         readLatency = new LatencyMetrics(factory, "Read");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index 999f440..aa555fc 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -23,11 +23,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 
 /**
  * Reset level to 0 on a given set of sstables
@@ -65,9 +64,9 @@ public class SSTableLevelResetter
             {
                 foundSSTable = true;
                 Descriptor descriptor = sstable.getKey();
-                Pair<SSTableMetadata, Set<Integer>> metadata = SSTableMetadata.serializer.deserialize(descriptor);
-                out.println("Changing level from " + metadata.left.sstableLevel + " to 0 on " + descriptor.filenameFor(Component.DATA));
-                LeveledManifest.mutateLevel(metadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
+                StatsMetadata metadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
+                out.println("Changing level from " + metadata.sstableLevel + " to 0 on " + descriptor.filenameFor(Component.DATA));
+                descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 5cfd778..aeec284 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -19,9 +19,11 @@ package org.apache.cassandra.tools;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.EnumSet;
+import java.util.Map;
 
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.*;
 
 /**
  * Shows the contents of sstable metadata
@@ -43,21 +45,31 @@ public class SSTableMetadataViewer
         for (String fname : args)
         {
             Descriptor descriptor = Descriptor.fromFilename(fname);
-            SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+            Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer().deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+            ValidationMetadata validation = (ValidationMetadata) metadata.get(MetadataType.VALIDATION);
+            StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
+            CompactionMetadata compaction = (CompactionMetadata) metadata.get(MetadataType.COMPACTION);
 
             out.printf("SSTable: %s%n", descriptor);
-            out.printf("Partitioner: %s%n", metadata.partitioner);
-            out.printf("Maximum timestamp: %s%n", metadata.maxTimestamp);
-            out.printf("SSTable max local deletion time: %s%n", metadata.maxLocalDeletionTime);
-            out.printf("Compression ratio: %s%n", metadata.compressionRatio);
-            out.printf("Estimated droppable tombstones: %s%n", metadata.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
-            out.printf("SSTable Level: %d%n", metadata.sstableLevel);
-            out.println(metadata.replayPosition);
-            printHistograms(metadata, out);
+            if (validation != null)
+            {
+                out.printf("Partitioner: %s%n", validation.partitioner);
+                out.printf("Bloom Filter FP chance: %f%n", validation.bloomFilterFPChance);
+            }
+            if (stats != null)
+            {
+                out.printf("Maximum timestamp: %s%n", stats.maxTimestamp);
+                out.printf("SSTable max local deletion time: %s%n", stats.maxLocalDeletionTime);
+                out.printf("Compression ratio: %s%n", stats.compressionRatio);
+                out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
+                out.printf("SSTable Level: %d%n", stats.sstableLevel);
+                out.println(stats.replayPosition);
+            }
+            printHistograms(stats, out);
         }
     }
 
-    private static void printHistograms(SSTableMetadata metadata, PrintStream out)
+    private static void printHistograms(StatsMetadata metadata, PrintStream out)
     {
         long[] offsets = metadata.estimatedRowSize.getBucketOffsets();
         long[] ersh = metadata.estimatedRowSize.getBuckets(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index 1b8ffe1..b05abe7 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -337,9 +337,19 @@ public class EstimatedHistogram
             return new EstimatedHistogram(offsets, buckets);
         }
 
-        public long serializedSize(EstimatedHistogram object, TypeSizes typeSizes)
+        public long serializedSize(EstimatedHistogram eh, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            int size = 0;
+
+            long[] offsets = eh.getBucketOffsets();
+            long[] buckets = eh.getBuckets(false);
+            size += typeSizes.sizeof(buckets.length);
+            for (int i = 0; i < buckets.length; i++)
+            {
+                size += typeSizes.sizeof(offsets[i == 0 ? 0 : i - 1]);
+                size += typeSizes.sizeof(buckets[i]);
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index c4ba956..daa702f 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -17,15 +17,16 @@
  */
 package org.apache.cassandra.utils;
 
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.ISerializer;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
+
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+
 /**
  * Histogram that can be constructed from streaming of data.
  *
@@ -50,13 +51,13 @@ public class StreamingHistogram
     public StreamingHistogram(int maxBinSize)
     {
         this.maxBinSize = maxBinSize;
-        bin = new TreeMap<Double, Long>();
+        bin = new TreeMap<>();
     }
 
     private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
     {
         this.maxBinSize = maxBinSize;
-        this.bin = new TreeMap<Double, Long>(bin);
+        this.bin = new TreeMap<>(bin);
     }
 
     /**
@@ -185,7 +186,7 @@ public class StreamingHistogram
         {
             int maxBinSize = in.readInt();
             int size = in.readInt();
-            Map<Double, Long> tmp = new HashMap<Double, Long>(size);
+            Map<Double, Long> tmp = new HashMap<>(size);
             for (int i = 0; i < size; i++)
             {
                 tmp.put(in.readDouble(), in.readLong());
@@ -196,7 +197,12 @@ public class StreamingHistogram
 
         public long serializedSize(StreamingHistogram histogram, TypeSizes typeSizes)
         {
-            throw new UnsupportedOperationException();
+            long size = typeSizes.sizeof(histogram.maxBinSize);
+            Map<Double, Long> entries = histogram.getAsMap();
+            size += typeSizes.sizeof(entries.size());
+            // size of entries = size * (8(double) + 8(long))
+            size += entries.size() * (8 + 8);
+            return size;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index e9d7336..ace32e7 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.marshal.LexicalUUIDType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1593,7 +1594,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         {
             protected SSTableWriter getWriter()
             {
-                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+                MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
                 collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
                 return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
                                          0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index a008de1..f8c3c00 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -183,7 +183,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         {
             assertTrue(s.getSSTableLevel() != 6);
             strategy.manifest.remove(s);
-            LeveledManifest.mutateLevel(Pair.create(s.getSSTableMetadata(), s.getAncestors()), s.descriptor, s.descriptor.filenameFor(Component.STATS), 6);
+            s.descriptor.getMetadataSerializer().mutateLevel(s.descriptor, 6);
             s.reloadSSTableMetadata();
             strategy.manifest.add(s);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index ee32a0e..fa96270 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.*;
 
 import static org.junit.Assert.assertEquals;
@@ -55,7 +55,7 @@ public class CompressedRandomAccessReaderTest
 
         try
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(BytesType.instance).replayPosition(null);
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
                 : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
@@ -106,7 +106,7 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
+        MetadataCollector sstableMetadataCollector = new MetadataCollector(BytesType.instance).replayPosition(null);
         SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 8d013f9..e9cca75 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -82,7 +82,7 @@ public class IndexSummaryTest
         dos.writeUTF("JUNK");
         FileUtils.closeQuietly(dos);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false);
+        IndexSummary is = IndexSummary.serializer.deserialize(dis, DatabaseDescriptor.getPartitioner(), false, 1);
         for (int i = 0; i < 100; i++)
             assertEquals(i, is.binarySearch(random.left.get(i)));
         // read the junk
@@ -106,7 +106,7 @@ public class IndexSummaryTest
         DataOutputStream dos = new DataOutputStream(aos);
         IndexSummary.serializer.serialize(summary, dos, false);
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(aos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false);
+        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1);
 
         assertEquals(1, loaded.size());
         assertEquals(summary.getPosition(0), loaded.getPosition(0));
@@ -253,4 +253,4 @@ public class IndexSummaryTest
         assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(0, 2, indexInterval));
         assertEquals(64 * indexInterval, Downsampling.getEffectiveIndexIntervalAfterIndex(1, 2, indexInterval));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
deleted file mode 100644
index 13b2f26..0000000
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.io.sstable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.RandomPartitioner;
-import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.Pair;
-
-public class SSTableMetadataSerializerTest
-{
-    @Test
-    public void testSerialization() throws IOException
-    {
-        EstimatedHistogram rowSizes = new EstimatedHistogram(
-            new long[] { 1L, 2L },
-            new long[] { 3L, 4L, 5L });
-        EstimatedHistogram columnCounts = new EstimatedHistogram(
-            new long[] { 6L, 7L },
-            new long[] { 8L, 9L, 10L });
-        ReplayPosition rp = new ReplayPosition(11L, 12);
-        long minTimestamp = 2162517136L;
-        long maxTimestamp = 4162517136L;
-
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance)
-                                                             .estimatedRowSize(rowSizes)
-                                                             .estimatedColumnCount(columnCounts)
-                                                             .replayPosition(rp);
-        collector.updateMinTimestamp(minTimestamp);
-        collector.updateMaxTimestamp(maxTimestamp);
-        SSTableMetadata originalMetadata = collector.finalizeMetadata(RandomPartitioner.class.getCanonicalName(), 0.1);
-
-        ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(byteOutput);
-        
-        Set<Integer> ancestors = new HashSet<Integer>();
-        ancestors.addAll(Arrays.asList(1,2,3,4));
-
-        SSTableMetadata.serializer.serialize(originalMetadata, ancestors, out);
-
-        ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
-        DataInputStream in = new DataInputStream(byteInput);
-        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, new File("."), "", "", 0, false);
-        Pair<SSTableMetadata, Set<Integer>> statsPair = SSTableMetadata.serializer.deserialize(in, desc);
-        SSTableMetadata stats = statsPair.left;
-
-        assert stats.estimatedRowSize.equals(originalMetadata.estimatedRowSize);
-        assert stats.estimatedRowSize.equals(rowSizes);
-        assert stats.estimatedColumnCount.equals(originalMetadata.estimatedColumnCount);
-        assert stats.estimatedColumnCount.equals(columnCounts);
-        assert stats.replayPosition.equals(originalMetadata.replayPosition);
-        assert stats.replayPosition.equals(rp);
-        assert stats.minTimestamp == minTimestamp;
-        assert stats.maxTimestamp == maxTimestamp;
-        assert stats.minTimestamp == originalMetadata.minTimestamp;
-        assert stats.maxTimestamp == originalMetadata.maxTimestamp;
-        assert stats.bloomFilterFPChance == originalMetadata.bloomFilterFPChance;
-        assert RandomPartitioner.class.getCanonicalName().equals(stats.partitioner);
-        assert ancestors.equals(statsPair.right);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
new file mode 100644
index 0000000..e0b87cd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetadataSerializerTest
+{
+    @Test
+    public void testSerialization() throws IOException
+    {
+        EstimatedHistogram rowSizes = new EstimatedHistogram(new long[] { 1L, 2L },
+                                                             new long[] { 3L, 4L, 5L });
+        EstimatedHistogram columnCounts = new EstimatedHistogram(new long[] { 6L, 7L },
+                                                                 new long[] { 8L, 9L, 10L });
+        ReplayPosition rp = new ReplayPosition(11L, 12);
+        long minTimestamp = 2162517136L;
+        long maxTimestamp = 4162517136L;
+
+        MetadataCollector collector = new MetadataCollector(BytesType.instance)
+                                                      .estimatedRowSize(rowSizes)
+                                                      .estimatedColumnCount(columnCounts)
+                                                      .replayPosition(rp);
+        collector.updateMinTimestamp(minTimestamp);
+        collector.updateMaxTimestamp(maxTimestamp);
+
+        Set<Integer> ancestors = Sets.newHashSet(1, 2, 3, 4);
+        for (int i : ancestors)
+            collector.addAncestor(i);
+
+        String partitioner = RandomPartitioner.class.getCanonicalName();
+        double bfFpChance = 0.1;
+        Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance);
+
+        MetadataSerializer serializer = new MetadataSerializer();
+        // Serialize to tmp file
+        File statsFile = File.createTempFile(Component.STATS.name, null);
+        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(statsFile)))
+        {
+            serializer.serialize(originalMetadata, out);
+        }
+
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, false);
+        try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+        {
+            Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
+
+            for (MetadataType type : MetadataType.values())
+            {
+                assertEquals(originalMetadata.get(type), deserialized.get(type));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 027c84c..efe51c6 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -60,7 +60,7 @@ public class CompressedInputStreamTest
         // write compressed data file of longs
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance);
+        MetadataCollector collector = new MetadataCollector(BytesType.instance);
         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();


Mime
View raw message