cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/2] git commit: trying again to commit #4872 via git apply
Date Mon, 18 Feb 2013 15:24:51 GMT
Updated Branches:
  refs/heads/trunk 0e2847872 -> 278a5e860


trying again to commit #4872 via git apply


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/278a5e86
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/278a5e86
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/278a5e86

Branch: refs/heads/trunk
Commit: 278a5e86001b1fd094da2e0d05ea48e8f7eb9e1f
Parents: 5bd57cb
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Feb 18 09:24:23 2013 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Feb 18 09:24:23 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 build.xml                                          |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   21 +-
 src/java/org/apache/cassandra/db/Directories.java  |   10 +-
 .../db/compaction/AbstractCompactionStrategy.java  |    5 +
 .../cassandra/db/compaction/CompactionManager.java |    5 +-
 .../cassandra/db/compaction/CompactionTask.java    |    4 +-
 .../db/compaction/LegacyLeveledManifest.java       |  140 +++++++
 .../db/compaction/LeveledCompactionStrategy.java   |   21 +-
 .../cassandra/db/compaction/LeveledManifest.java   |  296 +++++----------
 .../apache/cassandra/db/compaction/Scrubber.java   |    4 +-
 .../cassandra/io/sstable/SSTableMetadata.java      |   99 +++++-
 .../apache/cassandra/io/sstable/SSTableReader.java |   30 ++-
 .../apache/cassandra/service/CassandraDaemon.java  |   16 +
 .../cassandra/tools/SSTableMetadataViewer.java     |    1 +
 .../apache/cassandra/utils/StreamingHistogram.java |   21 +
 ...Keyspace1-legacyleveled-hf-0-CompressionInfo.db |  Bin 0 -> 46 bytes
 .../Keyspace1/Keyspace1-legacyleveled-hf-0-Data.db |  Bin 0 -> 70 bytes
 .../Keyspace1-legacyleveled-hf-0-Filter.db         |  Bin 0 -> 16 bytes
 .../Keyspace1-legacyleveled-hf-0-Index.db          |  Bin 0 -> 14 bytes
 .../Keyspace1-legacyleveled-hf-0-Statistics.db     |  Bin 0 -> 4340 bytes
 .../Keyspace1/Keyspace1-legacyleveled-hf-0-TOC.txt |    6 +
 ...Keyspace1-legacyleveled-hf-1-CompressionInfo.db |  Bin 0 -> 46 bytes
 .../Keyspace1/Keyspace1-legacyleveled-hf-1-Data.db |  Bin 0 -> 70 bytes
 .../Keyspace1-legacyleveled-hf-1-Filter.db         |  Bin 0 -> 16 bytes
 .../Keyspace1-legacyleveled-hf-1-Index.db          |  Bin 0 -> 14 bytes
 .../Keyspace1-legacyleveled-hf-1-Statistics.db     |  Bin 0 -> 4340 bytes
 ...Keyspace1-legacyleveled-hf-2-CompressionInfo.db |  Bin 0 -> 46 bytes
 .../Keyspace1/Keyspace1-legacyleveled-hf-2-Data.db |  Bin 0 -> 70 bytes
 .../Keyspace1-legacyleveled-hf-2-Filter.db         |  Bin 0 -> 16 bytes
 .../Keyspace1-legacyleveled-hf-2-Index.db          |  Bin 0 -> 14 bytes
 .../Keyspace1-legacyleveled-hf-2-Statistics.db     |  Bin 0 -> 4340 bytes
 .../hf/Keyspace1/legacyleveled.json                |   27 ++
 .../LongLeveledCompactionStrategyTest.java         |    4 +
 .../db/compaction/LegacyLeveledManifestTest.java   |   92 +++++
 .../compaction/LeveledCompactionStrategyTest.java  |   67 ++++
 36 files changed, 644 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a42ae5..58f38df 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.3
+ * Move sstable level information into the Stats component, removing the
+   need for a separate Manifest file (CASSANDRA-4872)
  * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
  * make index_interval configurable per columnfamily (CASSANDRA-3961)
  * add default_tim_to_live (CASSANDRA-3974)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 9cec6ee..06475d9 100644
--- a/build.xml
+++ b/build.xml
@@ -1120,6 +1120,7 @@
     <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
       <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
+      <jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
       <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
     </testmacro>
   </target>
@@ -1128,6 +1129,7 @@
     <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
       <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
+      <jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
       <jvmarg value="-Dcassandra.test.compression=true"/>
       <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
     </testmacro>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6bee607..c7059d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,6 +33,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -590,6 +592,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                          Descriptor.Version.CURRENT,
                                                          descriptor));
 
+            // force foreign sstables to level 0
+            try
+            {
+                if (new File(descriptor.filenameFor(Component.STATS)).exists())
+                {
+                    SSTableMetadata oldMetadata = SSTableMetadata.serializer.deserialize(descriptor);
+                    LeveledManifest.mutateLevel(oldMetadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
+                }
+            }
+            catch (IOException e)
+            {
+                SSTableReader.logOpenException(entry.getKey(), e);
+                continue;
+            }
+
             Descriptor newDescriptor = new Descriptor(descriptor.version,
                                                       descriptor.directory,
                                                       descriptor.ksname,
@@ -1969,10 +1986,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return intern(name);
     }
 
-    public SSTableWriter createCompactionWriter(long estimatedRows, File location, Collection<SSTableReader> sstables)
+    public SSTableWriter createCompactionWriter(OperationType operationType, long estimatedRows, File location, Collection<SSTableReader> sstables)
     {
         ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
         SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
+        sstableMetadataCollector.sstableLevel(compactionStrategy.getNextLevel(sstables, operationType));
 
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors
@@ -1980,6 +1998,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             sstableMetadataCollector.updateMinTimestamp(sstable.getMinTimestamp());
             sstableMetadataCollector.updateMaxTimestamp(sstable.getMaxTimestamp());
+
             sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
             for (Integer i : sstable.getAncestors())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index c0af0da..6177bea 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -398,6 +398,7 @@ public class Directories
         }
     }
 
+    @Deprecated
     public File tryGetLeveledManifest()
     {
         for (File dir : sstableDirectories)
@@ -413,14 +414,7 @@ public class Directories
         return null;
     }
 
-    public File getOrCreateLeveledManifest()
-    {
-        File manifestFile = tryGetLeveledManifest();
-        if (manifestFile == null)
-            manifestFile = new File(sstableDirectories[0], cfname + LeveledManifest.EXTENSION);
-        return manifestFile;
-    }
-
+    @Deprecated
     public void snapshotLeveledManifest(String snapshotName)
     {
         File manifest = tryGetLeveledManifest();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 356289c..00736d5 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -243,4 +243,9 @@ public abstract class AbstractCompactionStrategy
         uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
         return uncheckedOptions;
     }
+
+    public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
+    {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 01cee9d..51d4883 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -612,7 +612,7 @@ public class CompactionManager implements CompactionManagerMBean
                         AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
                         if (compactedRow.isEmpty())
                             continue;
-                        writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
+                        writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
                         writer.append(compactedRow);
                         totalkeysWritten++;
                     }
@@ -694,6 +694,7 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
+                                                  OperationType compactionType,
                                                   File compactionFileLocation,
                                                   int expectedBloomFilterSize,
                                                   SSTableWriter writer,
@@ -702,7 +703,7 @@ public class CompactionManager implements CompactionManagerMBean
         if (writer == null)
         {
             FileUtils.createDirectory(compactionFileLocation);
-            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
+            writer = cfs.createCompactionWriter(compactionType, expectedBloomFilterSize, compactionFileLocation, sstables);
         }
         return writer;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d0d872b..fb48fd6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -147,7 +147,7 @@ public class CompactionTask extends AbstractCompactionTask
                 return;
             }
 
-            SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, sstableDirectory, toCompact);
+            SSTableWriter writer = cfs.createCompactionWriter(compactionType, keysPerSSTable, sstableDirectory, toCompact);
             writers.add(writer);
             while (iter.hasNext())
             {
@@ -185,7 +185,7 @@ public class CompactionTask extends AbstractCompactionTask
                 {
                     // tmp = false because later we want to query it with descriptor from SSTableReader
                     cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = cfs.createCompactionWriter(keysPerSSTable, sstableDirectory, toCompact);
+                    writer = cfs.createCompactionWriter(compactionType, keysPerSSTable, sstableDirectory, toCompact);
                     writers.add(writer);
                     cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
new file mode 100644
index 0000000..1bb4619
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/LegacyLeveledManifest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.util.FileUtils;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * This class was added to be able to migrate pre-CASSANDRA-4782 leveled manifests into the sstable metadata
+ *
+ * @deprecated since it can be removed in a future revision.
+ */
+@Deprecated
+public class LegacyLeveledManifest
+{
+    private static final Logger logger = LoggerFactory.getLogger(LegacyLeveledManifest.class);
+
+    private Map<Integer, Integer> sstableLevels;
+
+    private LegacyLeveledManifest(File path) throws IOException
+    {
+        sstableLevels = new HashMap<Integer, Integer>();
+        ObjectMapper m = new ObjectMapper();
+        JsonNode rootNode = m.readValue(path, JsonNode.class);
+        JsonNode generations = rootNode.get("generations");
+        assert generations.isArray();
+        for (JsonNode generation : generations)
+        {
+            int level = generation.get("generation").getIntValue();
+            JsonNode generationValues = generation.get("members");
+            for (JsonNode generationValue : generationValues)
+            {
+                sstableLevels.put(generationValue.getIntValue(), level);
+            }
+        }
+    }
+
+    private int levelOf(int sstableGeneration)
+    {
+        return sstableLevels.containsKey(sstableGeneration) ? sstableLevels.get(sstableGeneration) : 0;
+    }
+
+    /**
+     * We need to migrate if there is a legacy leveledmanifest json-file
+     * <p/>
+     * If there is no jsonfile, we can just start as normally, sstable level will be at 0 for all tables.
+     *
+     * @param keyspace
+     * @param columnFamily
+     * @return
+     */
+    public static boolean manifestNeedsMigration(String keyspace, String columnFamily)
+    {
+        return Directories.create(keyspace, columnFamily).tryGetLeveledManifest() != null;
+    }
+
+    public static void migrateManifests(String keyspace, String columnFamily) throws IOException
+    {
+        logger.info("Migrating manifest for {}/{}", keyspace, columnFamily);
+
+        snapshotWithoutCFS(keyspace, columnFamily);
+        Directories directories = Directories.create(keyspace, columnFamily);
+        File manifestFile = directories.tryGetLeveledManifest();
+        if (manifestFile == null)
+            return;
+
+        LegacyLeveledManifest legacyManifest = new LegacyLeveledManifest(manifestFile);
+        for (Map.Entry<Descriptor, Set<Component>> entry : directories.sstableLister().includeBackups(false).skipTemporary(true).list().entrySet())
+        {
+            Descriptor d = entry.getKey();
+            SSTableMetadata oldMetadata = SSTableMetadata.serializer.deserialize(d, false);
+            String metadataFilename = d.filenameFor(Component.STATS);
+            LeveledManifest.mutateLevel(oldMetadata, d, metadataFilename, legacyManifest.levelOf(d.generation));
+        }
+        FileUtils.deleteWithConfirm(manifestFile);
+    }
+
+    /**
+     * Snapshot a CF without having to load the sstables in that directory
+     *
+     * @param keyspace
+     * @param columnFamily
+     * @throws IOException
+     */
+    public static void snapshotWithoutCFS(String keyspace, String columnFamily) throws IOException
+    {
+        Directories directories = Directories.create(keyspace, columnFamily);
+        String snapshotName = "pre-sstablemetamigration";
+        logger.info("Snapshotting {}, {} to {}", keyspace, columnFamily, snapshotName);
+
+        for (Map.Entry<Descriptor, Set<Component>> entry : directories.sstableLister().includeBackups(false).skipTemporary(true).list().entrySet())
+        {
+            Descriptor descriptor = entry.getKey();
+            File snapshotDirectoryPath = Directories.getSnapshotDirectory(descriptor, snapshotName);
+            for (Component component : entry.getValue())
+            {
+                File sourceFile = new File(descriptor.filenameFor(component));
+                File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+                FileUtils.createHardLink(sourceFile, targetLink);
+            }
+        }
+
+        File manifestFile = directories.tryGetLeveledManifest();
+        if (manifestFile != null)
+        {
+            File snapshotDirectory = new File(new File(manifestFile.getParentFile(), Directories.SNAPSHOT_SUBDIR), snapshotName);
+            File target = new File(snapshotDirectory, manifestFile.getName());
+            FileUtils.createHardLink(manifestFile, target);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index eac4b4a..358598b 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -143,19 +143,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         else if (notification instanceof SSTableListChangedNotification)
         {
             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            switch (listChangedNotification.compactionType)
-            {
-                // Cleanup, scrub and updateSSTable shouldn't promote (see #3989)
-                case CLEANUP:
-                case SCRUB:
-                case UPGRADE_SSTABLES:
-                case TOMBSTONE_COMPACTION: // Also when performing tombstone removal.
-                    manifest.replace(listChangedNotification.removed, listChangedNotification.added);
-                    break;
-                default:
-                    manifest.promote(listChangedNotification.removed, listChangedNotification.added);
-                    break;
-            }
+            manifest.replace(listChangedNotification.removed, listChangedNotification.added);
         }
     }
 
@@ -168,7 +156,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
-            byLevel.get(manifest.levelOf(sstable)).add(sstable);
+            byLevel.get(sstable.getSSTableLevel()).add(sstable);
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
         for (Integer level : byLevel.keySet())
@@ -322,4 +310,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
 
         return uncheckedOptions;
     }
+
+    public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
+    {
+        return manifest.getNextLevel(sstables, operationType);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 8bf12eb..d241a85 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.File;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
 
@@ -28,21 +29,15 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class LeveledManifest
@@ -60,7 +55,6 @@ public class LeveledManifest
 
     private final ColumnFamilyStore cfs;
     private final List<SSTableReader>[] generations;
-    private final Map<SSTableReader, Integer> sstableGenerations;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInBytes;
 
@@ -78,7 +72,6 @@ public class LeveledManifest
             generations[i] = new ArrayList<SSTableReader>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
-        sstableGenerations = new HashMap<SSTableReader, Integer>();
     }
 
     static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize)
@@ -89,78 +82,27 @@ public class LeveledManifest
     public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable<SSTableReader> sstables)
     {
         LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize);
-        load(cfs, manifest, sstables);
 
         // ensure all SSTables are in the manifest
         for (SSTableReader ssTableReader : sstables)
         {
-            if (manifest.levelOf(ssTableReader) < 0)
-                manifest.add(ssTableReader);
+            manifest.add(ssTableReader);
         }
-
-        return manifest;
-    }
-
-    private static void load(ColumnFamilyStore cfs, LeveledManifest manifest, Iterable<SSTableReader> sstables)
-    {
-        File manifestFile = tryGetManifest(cfs);
-        if (manifestFile == null)
-            return;
-
-        try
-        {
-            parseManifest(manifest, sstables, manifestFile);
-        }
-        catch (Exception e)
-        {
-            logger.debug("Error parsing manifest", e);
-            File oldFile = new File(manifestFile.getPath().replace(EXTENSION, "-old.json"));
-            if (oldFile.exists())
-            {
-                try
-                {
-                    parseManifest(manifest, sstables, oldFile);
-                    return;
-                }
-                catch (Exception old)
-                {
-                    logger.debug("Old manifest present but corrupt", old);
-                }
-            }
-            logger.warn("Manifest present but corrupt. Cassandra will re-level {} from scratch", cfs.getColumnFamilyName());
-        }
-    }
-
-    private static void parseManifest(LeveledManifest manifest, Iterable<SSTableReader> sstables, File manifestFile) throws IOException
-    {
-        ObjectMapper m = new ObjectMapper();
-        JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
-        JsonNode generations = rootNode.get("generations");
-        assert generations.isArray();
-        for (JsonNode generation : generations)
+        for (int i = 1; i < manifest.getAllLevelSize().length; i++)
         {
-            int level = generation.get("generation").getIntValue();
-            JsonNode generationValues = generation.get("members");
-            for (JsonNode generationValue : generationValues)
-            {
-                for (SSTableReader ssTableReader : sstables)
-                {
-                    if (ssTableReader.descriptor.generation == generationValue.getIntValue())
-                    {
-                        logger.debug("Loading {} at L{}", ssTableReader, level);
-                        manifest.add(ssTableReader, level);
-                    }
-                }
-            }
+            manifest.repairOverlappingSSTables(i);
         }
+        return manifest;
     }
 
     public synchronized void add(SSTableReader reader)
     {
+        int level = reader.getSSTableLevel();
+        assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
         logDistribution();
-        logger.debug("Adding {} to L0", reader);
-        add(reader, 0);
-        serialize();
+
+        logger.debug("Adding {} to L{}", reader, level);
+        generations[level].add(reader);
     }
 
     /**
@@ -169,15 +111,18 @@ public class LeveledManifest
      */
     private int skipLevels(int newLevel, Iterable<SSTableReader> added)
     {
+        // Note that we now check if the sstables included in the compaction, *before* the compaction, fit in the next level.
+        // This is needed since we need to decide before the actual compaction what level they will be in.
+        // This should be safe, we might skip levels where the compacted data could have fit but that should be ok.
         while (maxBytesForLevel(newLevel) < SSTableReader.getTotalBytes(added)
-            && generations[(newLevel + 1)].isEmpty())
+               && generations[(newLevel + 1)].isEmpty())
         {
             newLevel++;
         }
         return newLevel;
     }
 
-    public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+    public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
     {
         assert !Iterables.isEmpty(removed); // use add() instead of promote when adding new sstables
         logDistribution();
@@ -186,44 +131,25 @@ public class LeveledManifest
 
         // the level for the added sstables is the max of the removed ones,
         // plus one if the removed were all on the same level
-        int minimumLevel = Integer.MAX_VALUE;
-        int maximumLevel = 0;
         for (SSTableReader sstable : removed)
         {
-            int thisLevel = remove(sstable);
-            assert thisLevel >= 0;
-            maximumLevel = Math.max(maximumLevel, thisLevel);
-            minimumLevel = Math.min(minimumLevel, thisLevel);
+            remove(sstable);
         }
 
         // it's valid to do a remove w/o an add (e.g. on truncate)
         if (!added.iterator().hasNext())
             return;
 
-        int newLevel;
-        if (minimumLevel == 0 && maximumLevel == 0 && SSTable.getTotalBytes(removed) <= maxSSTableSizeInBytes)
-        {
-            // special case for tiny L0 sstables; see CASSANDRA-4341
-            newLevel = 0;
-        }
-        else
-        {
-            newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
-            newLevel = skipLevels(newLevel, added);
-            assert newLevel > 0;
-        }
         if (logger.isDebugEnabled())
-            logger.debug("Adding [{}] at L{}", toString(added), newLevel);
+            logger.debug("Adding [{}]", toString(added));
 
-        lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last;
+        int minLevel = Integer.MAX_VALUE;
         for (SSTableReader ssTableReader : added)
-            add(ssTableReader, newLevel);
-
-        // Fix overlapping sstables from CASSANDRA-4321/4411
-        if (newLevel != 0)
-            repairOverlappingSSTables(newLevel);
-
-        serialize();
+        {
+            minLevel = Math.min(minLevel, ssTableReader.getSSTableLevel());
+            add(ssTableReader);
+        }
+        lastCompactedKeys[minLevel] = SSTable.sstableOrdering.max(added).last;
     }
 
     public synchronized void repairOverlappingSSTables(int level)
@@ -235,8 +161,9 @@ public class LeveledManifest
         {
             if (previous != null && current.first.compareTo(previous.last) <= 0)
             {
-                logger.error(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s].  This is caused by a bug in Cassandra 1.1.0 .. 1.1.3.  Sending back to L0.  If you have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable",
-                                           level, previous, previous.first, previous.last, current, current.first, current.last));
+                logger.warn(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s].  This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact that you have dropped sstables from another node into the data directory. " +
+                                          "Sending back to L0.  If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable",
+                                          level, previous, previous.first, previous.last, current, current.first, current.last));
                 outOfOrderSSTables.add(current);
             }
             else
@@ -249,30 +176,23 @@ public class LeveledManifest
         {
             for (SSTableReader sstable : outOfOrderSSTables)
                 sendBackToL0(sstable);
-            serialize();
         }
     }
 
-    public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
-    {
-        // replace is for compaction operation that operate on exactly one sstable, with no merging.
-        // Thus, removed will be exactly one sstable, and added will be 0 or 1.
-        assert Iterables.size(removed) == 1 : Iterables.size(removed);
-        assert Iterables.size(added) <= 1 : Iterables.size(added);
-        logDistribution();
-        logger.debug("Replacing {} with {}", removed, added);
-
-        int level = remove(removed.iterator().next());
-        if (!Iterables.isEmpty(added))
-            add(added.iterator().next(), level);
-
-        serialize();
-    }
-
     private synchronized void sendBackToL0(SSTableReader sstable)
     {
         remove(sstable);
-        add(sstable, 0);
+        String metaDataFile = sstable.descriptor.filenameFor(Component.STATS);
+        try
+        {
+            mutateLevel(sstable.getSSTableMetadata(), sstable.descriptor, metaDataFile, 0);
+            sstable.reloadSSTableMetadata();
+            add(sstable);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Could not reload sstable meta data", e);
+        }
     }
 
     private String toString(Iterable<SSTableReader> sstables)
@@ -284,7 +204,7 @@ public class LeveledManifest
                    .append('-')
                    .append(sstable.descriptor.generation)
                    .append("(L")
-                   .append(levelOf(sstable))
+                   .append(sstable.getSSTableLevel())
                    .append("), ");
         }
         return builder.toString();
@@ -382,31 +302,15 @@ public class LeveledManifest
         }
     }
 
-    int levelOf(SSTableReader sstable)
-    {
-        Integer level = sstableGenerations.get(sstable);
-        if (level == null)
-            return -1;
-
-        return level.intValue();
-    }
-
-    private int remove(SSTableReader reader)
+    @VisibleForTesting
+    public int remove(SSTableReader reader)
     {
-        int level = levelOf(reader);
-        assert level >= 0 : reader + " not present in manifest";
+        int level = reader.getSSTableLevel();
+        assert level >= 0 : reader + " not present in manifest: "+level;
         generations[level].remove(reader);
-        sstableGenerations.remove(reader);
         return level;
     }
 
-    private void add(SSTableReader sstable, int level)
-    {
-        assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
-        generations[level].add(sstable);
-        sstableGenerations.put(sstable, Integer.valueOf(level));
-    }
-
     private static Set<SSTableReader> overlapping(Collection<SSTableReader> candidates, Iterable<SSTableReader> others)
     {
         assert !candidates.isEmpty();
@@ -569,57 +473,6 @@ public class LeveledManifest
         return ageSortedCandidates;
     }
 
-    public static File tryGetManifest(ColumnFamilyStore cfs)
-    {
-        return cfs.directories.tryGetLeveledManifest();
-    }
-
-    public synchronized void serialize()
-    {
-        File manifestFile = cfs.directories.getOrCreateLeveledManifest();
-        File oldFile = new File(manifestFile.getPath().replace(EXTENSION, "-old.json"));
-        File tmpFile = new File(manifestFile.getPath().replace(EXTENSION, "-tmp.json"));
-
-        JsonFactory f = new JsonFactory();
-        try
-        {
-            JsonGenerator g = f.createJsonGenerator(tmpFile, JsonEncoding.UTF8);
-            g.useDefaultPrettyPrinter();
-            g.writeStartObject();
-            g.writeArrayFieldStart("generations");
-            for (int level = 0; level < generations.length; level++)
-            {
-                g.writeStartObject();
-                g.writeNumberField("generation", level);
-                g.writeArrayFieldStart("members");
-                for (SSTableReader ssTableReader : generations[level])
-                    g.writeNumber(ssTableReader.descriptor.generation);
-                g.writeEndArray(); // members
-
-                g.writeEndObject(); // generation
-            }
-            g.writeEndArray(); // for field generations
-            g.writeEndObject(); // write global object
-            g.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, tmpFile);
-        }
-
-        if (oldFile.exists() && manifestFile.exists())
-            FileUtils.deleteWithConfirm(oldFile);
-
-        if (manifestFile.exists())
-            FileUtils.renameWithConfirm(manifestFile, oldFile);
-
-        assert tmpFile.exists();
-
-        FileUtils.renameWithConfirm(tmpFile, manifestFile);
-
-        logger.debug("Saved manifest {}", manifestFile);
-    }
-
     @Override
     public String toString()
     {
@@ -662,4 +515,63 @@ public class LeveledManifest
                      new Object[] {Arrays.toString(estimated), cfs.table.getName(), cfs.name });
         return Ints.checkedCast(tasks);
     }
+
+    public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
+    {
+        int maximumLevel = Integer.MIN_VALUE;
+        int minimumLevel = Integer.MAX_VALUE;
+        for (SSTableReader sstable : sstables)
+        {
+            maximumLevel = Math.max(sstable.getSSTableLevel(), maximumLevel);
+            minimumLevel = Math.min(sstable.getSSTableLevel(), minimumLevel);
+        }
+        switch(operationType)
+        {
+            case SCRUB:
+            case TOMBSTONE_COMPACTION:
+            case CLEANUP:
+            case UPGRADE_SSTABLES:
+                return minimumLevel;
+        }
+
+        int newLevel;
+        if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTable.getTotalBytes(sstables) < maxSSTableSizeInBytes)
+        {
+            newLevel = 0;
+        }
+        else
+        {
+            newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
+            newLevel = skipLevels(newLevel, sstables);
+            assert newLevel > 0;
+        }
+        return newLevel;
+
+    }
+
+    /**
+     * Scary method mutating existing sstable component
+     *
+     * Tries to do it safely by moving the new file on top of the old one
+     *
+     * Caller needs to reload the sstable metadata (sstableReader.reloadSSTableMetadata())
+     *
+     * @see org.apache.cassandra.io.sstable.SSTableReader#reloadSSTableMetadata()
+     *
+     * @param oldMetadata
+     * @param descriptor
+     * @param filename
+     * @param level
+     * @throws IOException
+     */
+    public static synchronized void mutateLevel(SSTableMetadata oldMetadata, Descriptor descriptor, String filename, int level) throws IOException
+    {
+        logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), level);
+        SSTableMetadata metadata = SSTableMetadata.copyWithNewSSTableLevel(oldMetadata, level);
+        DataOutputStream dos = new DataOutputStream(new FileOutputStream(filename + "-tmp"));
+        SSTableMetadata.serializer.legacySerialize(metadata, descriptor, dos);
+        dos.flush();
+        dos.close();
+        FileUtils.renameWithConfirm(filename + "-tmp", filename);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 194da7b..41aad18 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -112,7 +112,7 @@ public class Scrubber implements Closeable
             }
 
             // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+            writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
 
             AbstractCompactedRow prevRow = null;
 
@@ -269,7 +269,7 @@ public class Scrubber implements Closeable
 
         if (!outOfOrderRows.isEmpty())
         {
-            SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+            SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
             for (AbstractCompactedRow row : outOfOrderRows)
                 inOrderWriter.append(row);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index bb4ded4..bd28ed1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -57,6 +57,7 @@ public class SSTableMetadata
     public final String partitioner;
     public final Set<Integer> ancestors;
     public final StreamingHistogram estimatedTombstoneDropTime;
+    public final int sstableLevel;
 
     private SSTableMetadata()
     {
@@ -68,11 +69,12 @@ public class SSTableMetadata
              NO_COMPRESSION_RATIO,
              null,
              Collections.<Integer>emptySet(),
-             defaultTombstoneDropTimeHistogram());
+             defaultTombstoneDropTimeHistogram(),
+             0);
     }
 
     private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long minTimestamp,
-            long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime)
+            long maxTimestamp, double cr, String partitioner, Set<Integer> ancestors, StreamingHistogram estimatedTombstoneDropTime, int sstableLevel)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
@@ -83,6 +85,7 @@ public class SSTableMetadata
         this.partitioner = partitioner;
         this.ancestors = ancestors;
         this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
+        this.sstableLevel = sstableLevel;
     }
 
     public static SSTableMetadata createDefaultInstance()
@@ -95,6 +98,28 @@ public class SSTableMetadata
         return new Collector();
     }
 
+    /**
+     * Used when updating sstablemetadata files with an sstable level
+     * @param metadata
+     * @param sstableLevel
+     * @return
+     */
+    @Deprecated
+    public static SSTableMetadata copyWithNewSSTableLevel(SSTableMetadata metadata, int sstableLevel)
+    {
+        return new SSTableMetadata(metadata.estimatedRowSize,
+                                   metadata.estimatedColumnCount,
+                                   metadata.replayPosition,
+                                   metadata.minTimestamp,
+                                   metadata.maxTimestamp,
+                                   metadata.compressionRatio,
+                                   metadata.partitioner,
+                                   metadata.ancestors,
+                                   metadata.estimatedTombstoneDropTime,
+                                   sstableLevel);
+
+    }
+
     static EstimatedHistogram defaultColumnCountHistogram()
     {
         // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
@@ -147,6 +172,7 @@ public class SSTableMetadata
         protected double compressionRatio = NO_COMPRESSION_RATIO;
         protected Set<Integer> ancestors = new HashSet<Integer>();
         protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
+        protected int sstableLevel;
 
         public void addRowSize(long rowSize)
         {
@@ -192,7 +218,8 @@ public class SSTableMetadata
                                        compressionRatio,
                                        partitioner,
                                        ancestors,
-                                       estimatedTombstoneDropTime);
+                                       estimatedTombstoneDropTime,
+                                       sstableLevel);
         }
 
         public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -234,6 +261,13 @@ public class SSTableMetadata
             addColumnCount(stats.columnCount);
             mergeTombstoneHistogram(stats.tombstoneHistogram);
         }
+
+        public Collector sstableLevel(int sstableLevel)
+        {
+            this.sstableLevel = sstableLevel;
+            return this;
+        }
+
     }
 
     public static class SSTableMetadataSerializer
@@ -255,10 +289,54 @@ public class SSTableMetadata
             for (Integer g : sstableStats.ancestors)
                 dos.writeInt(g);
             StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, dos);
+            dos.writeInt(sstableStats.sstableLevel);
+        }
+
+        /**
+         * Used to serialize to an old version - needed to be able to update sstable level without a full compaction.
+         *
+         * @deprecated will be removed when it is assumed that the minimum upgrade-from-version is the version that this
+         * patch made it into
+         *
+         * @param sstableStats
+         * @param legacyDesc
+         * @param dos
+         * @throws IOException
+         */
+        @Deprecated
+        public void legacySerialize(SSTableMetadata sstableStats, Descriptor legacyDesc, DataOutput dos) throws IOException
+        {
+            EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, dos);
+            EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, dos);
+            if (legacyDesc.version.metadataIncludesReplayPosition)
+                ReplayPosition.serializer.serialize(sstableStats.replayPosition, dos);
+            if (legacyDesc.version.tracksMinTimestamp)
+                dos.writeLong(sstableStats.minTimestamp);
+            if (legacyDesc.version.tracksMaxTimestamp)
+                dos.writeLong(sstableStats.maxTimestamp);
+            if (legacyDesc.version.hasCompressionRatio)
+                dos.writeDouble(sstableStats.compressionRatio);
+            if (legacyDesc.version.hasPartitioner)
+                dos.writeUTF(sstableStats.partitioner);
+            if (legacyDesc.version.hasAncestors)
+            {
+                dos.writeInt(sstableStats.ancestors.size());
+                for (Integer g : sstableStats.ancestors)
+                    dos.writeInt(g);
+            }
+            if (legacyDesc.version.tracksTombstones)
+                StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, dos);
+
+            dos.writeInt(sstableStats.sstableLevel);
         }
 
         public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
         {
+            return deserialize(descriptor, true);
+        }
+
+        public SSTableMetadata deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
+        {
             logger.debug("Load metadata for {}", descriptor);
             File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
             if (!statsFile.exists())
@@ -270,16 +348,20 @@ public class SSTableMetadata
             DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
             try
             {
-                return deserialize(dis, descriptor);
+                return deserialize(dis, descriptor, loadSSTableLevel);
             }
             finally
             {
                 FileUtils.closeQuietly(dis);
             }
         }
-
         public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc) throws IOException
         {
+            return deserialize(dis, desc, true);
+        }
+
+        public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc, boolean loadSSTableLevel) throws IOException
+        {
             EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
             ReplayPosition replayPosition = desc.version.metadataIncludesReplayPosition
@@ -308,7 +390,12 @@ public class SSTableMetadata
             StreamingHistogram tombstoneHistogram = desc.version.tracksTombstones
                                                    ? StreamingHistogram.serializer.deserialize(dis)
                                                    : defaultTombstoneDropTimeHistogram();
-            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram);
+            int sstableLevel = 0;
+
+            if (loadSSTableLevel && dis.available() > 0)
+                sstableLevel = dis.readInt();
+
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, minTimestamp, maxTimestamp, compressionRatio, partitioner, ancestors, tombstoneHistogram, sstableLevel);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 712c854..afabba5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -95,8 +95,8 @@ public class SSTableReader extends SSTable
     private final AtomicBoolean isCompacted = new AtomicBoolean(false);
     private final AtomicBoolean isSuspect = new AtomicBoolean(false);
     private final SSTableDeletingTask deletingTask;
-
-    private final SSTableMetadata sstableMetadata;
+    // not final since we need to be able to change level on a file.
+    private volatile SSTableMetadata sstableMetadata;
 
     public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
     {
@@ -1122,6 +1122,32 @@ public class SSTableReader extends SSTable
         return sstableMetadata.ancestors;
     }
 
+    public int getSSTableLevel()
+    {
+        return sstableMetadata.sstableLevel;
+    }
+
+    /**
+     * Reloads the sstable metadata from disk.
+     *
+     * Called after level is changed on sstable, for example if the sstable is dropped to L0
+     *
+     * Might be possible to remove in future versions
+     *
+     * @throws IOException
+     */
+    public void reloadSSTableMetadata() throws IOException
+    {
+        this.sstableMetadata = components.contains(Component.STATS)
+                             ? SSTableMetadata.serializer.deserialize(descriptor)
+                             : SSTableMetadata.createDefaultInstance();
+    }
+
+    public SSTableMetadata getSSTableMetadata()
+    {
+        return sstableMetadata;
+    }
+
     public RandomAccessReader openDataReader(boolean skipIOCache)
     {
         return compression

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0a7b957..1979a52 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.SetMultimap;
+
+import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -199,6 +202,19 @@ public class CassandraDaemon
         {
             for (CFMetaData cfm : Schema.instance.getTableMetaData(table).values())
             {
+                if (LegacyLeveledManifest.manifestNeedsMigration(table,cfm.cfName))
+                {
+                    try
+                    {
+                        LegacyLeveledManifest.migrateManifests(table, cfm.cfName);
+                    }
+                    catch (IOException e)
+                    {
+                        logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+                        System.exit(100);
+                    }
+                }
+
                 ColumnFamilyStore.scrubDataDirectories(table, cfm.cfName);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index c1f0332..03f4887 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -51,6 +51,7 @@ public class SSTableMetadataViewer
             out.printf("Maximum timestamp: %s%n", metadata.maxTimestamp);
             out.printf("Compression ratio: %s%n", metadata.compressionRatio);
             out.printf("Estimated droppable tombstones: %s%n", metadata.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
+            out.printf("SSTable Level: %d%n", metadata.sstableLevel);
             out.println(metadata.replayPosition);
             printHistograms(metadata, out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index 3e09539..749e295 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -24,6 +24,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.*;
+import com.google.common.base.Objects;
 
 /**
  * Histogram that can be constructed from streaming of data.
@@ -198,4 +199,24 @@ public class StreamingHistogram
             throw new UnsupportedOperationException();
         }
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof StreamingHistogram))
+            return false;
+
+        StreamingHistogram that = (StreamingHistogram) o;
+        return maxBinSize == that.maxBinSize && bin.equals(that.bin);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(bin.hashCode(), maxBinSize);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-CompressionInfo.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-CompressionInfo.db
new file mode 100644
index 0000000..af783d6
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-CompressionInfo.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Data.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Data.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Data.db
new file mode 100644
index 0000000..854a1c9
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Filter.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Filter.db
new file mode 100644
index 0000000..210481f
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Filter.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Index.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Index.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Index.db
new file mode 100644
index 0000000..52c9a6c
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Statistics.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Statistics.db
new file mode 100644
index 0000000..295a303
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-TOC.txt b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-TOC.txt
new file mode 100644
index 0000000..f8e8810
--- /dev/null
+++ b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-0-TOC.txt
@@ -0,0 +1,6 @@
+CompressionInfo.db
+Index.db
+TOC.txt
+Filter.db
+Statistics.db
+Data.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-CompressionInfo.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-CompressionInfo.db
new file mode 100644
index 0000000..af783d6
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-CompressionInfo.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Data.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Data.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Data.db
new file mode 100644
index 0000000..854a1c9
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Filter.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Filter.db
new file mode 100644
index 0000000..210481f
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Filter.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Index.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Index.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Index.db
new file mode 100644
index 0000000..52c9a6c
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Statistics.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Statistics.db
new file mode 100644
index 0000000..295a303
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-1-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-CompressionInfo.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-CompressionInfo.db
new file mode 100644
index 0000000..af783d6
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-CompressionInfo.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Data.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Data.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Data.db
new file mode 100644
index 0000000..854a1c9
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Filter.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Filter.db
new file mode 100644
index 0000000..210481f
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Filter.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Index.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Index.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Index.db
new file mode 100644
index 0000000..52c9a6c
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Statistics.db b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Statistics.db
new file mode 100644
index 0000000..295a303
Binary files /dev/null and b/test/data/migration-sstables/hf/Keyspace1/Keyspace1-legacyleveled-hf-2-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/data/migration-sstables/hf/Keyspace1/legacyleveled.json
----------------------------------------------------------------------
diff --git a/test/data/migration-sstables/hf/Keyspace1/legacyleveled.json b/test/data/migration-sstables/hf/Keyspace1/legacyleveled.json
new file mode 100644
index 0000000..1fc9c01
--- /dev/null
+++ b/test/data/migration-sstables/hf/Keyspace1/legacyleveled.json
@@ -0,0 +1,27 @@
+{
+  "generations" : [ {
+    "generation" : 0,
+    "members" : [ 0 ]
+  }, {
+    "generation" : 1,
+    "members" : [ 1 ]
+  }, {
+    "generation" : 2,
+    "members" : [ 2 ]
+  }, {
+    "generation" : 3,
+    "members" : [ ]
+  }, {
+    "generation" : 4,
+    "members" : [ ]
+  }, {
+    "generation" : 5,
+    "members" : [ ]
+  }, {
+    "generation" : 6,
+    "members" : [ ]
+  }, {
+    "generation" : 7,
+    "members" : [ ]
+  } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index beae23d..29404ad 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -124,5 +124,9 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
                }
             }
         }
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assert sstable.getSSTableLevel() == sstable.getSSTableLevel();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
new file mode 100644
index 0000000..deb326b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTest.java
@@ -0,0 +1,92 @@
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMetadata;
+import org.apache.cassandra.io.util.FileUtils;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LegacyLeveledManifestTest
+{
+    private File destDir;
+    @Before
+    public void setup()
+    {
+        String root = System.getProperty("migration-sstable-root");
+        File rootDir = new File(root + File.separator + "hf" + File.separator + "Keyspace1");
+        destDir = Directories.create("Keyspace1", "legacyleveled").getDirectoryForNewSSTables(0);
+        FileUtils.createDirectory(destDir);
+        for (File srcFile : rootDir.listFiles())
+        {
+            File destFile = new File(destDir, srcFile.getName());
+            FileUtils.createHardLink(srcFile,destFile);
+            assert destFile.exists() : destFile.getAbsoluteFile();
+        }
+    }
+    @After
+    public void tearDown()
+    {
+        FileUtils.deleteRecursive(destDir);
+    }
+
+    @Test
+    public void migrateTest() throws IOException
+    {
+        assertTrue(LegacyLeveledManifest.manifestNeedsMigration("Keyspace1", "legacyleveled"));
+    }
+
+    @Test
+    public void doMigrationTest() throws IOException, InterruptedException
+    {
+        LegacyLeveledManifest.migrateManifests("Keyspace1","legacyleveled");
+
+        for (int i = 0; i <= 2; i++)
+        {
+            Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+"Keyspace1-legacyleveled-hf-"+i+"-Statistics.db");
+            SSTableMetadata metadata = SSTableMetadata.serializer.deserialize(descriptor);
+            assertEquals(metadata.sstableLevel, i);
+        }
+    }
+
+    /**
+     * Validate that the rewritten stats file is the same as the original one.
+     * @throws IOException
+     */
+    @Test
+    public void validateSSTableMetadataTest() throws IOException
+    {
+        Map<Descriptor, SSTableMetadata> beforeMigration = new HashMap<Descriptor, SSTableMetadata>();
+        for (int i = 0; i <= 2; i++)
+        {
+            Descriptor descriptor = Descriptor.fromFilename(destDir+File.separator+"Keyspace1-legacyleveled-hf-"+i+"-Statistics.db");
+            beforeMigration.put(descriptor, SSTableMetadata.serializer.deserialize(descriptor, false));
+        }
+
+        LegacyLeveledManifest.migrateManifests("Keyspace1","legacyleveled");
+
+        for (Map.Entry<Descriptor, SSTableMetadata> entry : beforeMigration.entrySet())
+        {
+            SSTableMetadata newMetadata = SSTableMetadata.serializer.deserialize(entry.getKey());
+            SSTableMetadata oldMetadata = entry.getValue();
+            assertEquals(newMetadata.estimatedRowSize, oldMetadata.estimatedRowSize);
+            assertEquals(newMetadata.estimatedColumnCount, oldMetadata.estimatedColumnCount);
+            assertEquals(newMetadata.replayPosition, oldMetadata.replayPosition);
+            assertEquals(newMetadata.minTimestamp, oldMetadata.minTimestamp);
+            assertEquals(newMetadata.maxTimestamp, oldMetadata.maxTimestamp);
+            assertEquals(newMetadata.compressionRatio, oldMetadata.compressionRatio, 0.01);
+            assertEquals(newMetadata.partitioner, oldMetadata.partitioner);
+            assertEquals(newMetadata.ancestors, oldMetadata.ancestors);
+            assertEquals(newMetadata.estimatedTombstoneDropTime, oldMetadata.estimatedTombstoneDropTime);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278a5e86/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index e9cf236..ebc320f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -18,8 +18,10 @@
 package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 
+import com.google.common.collect.Iterables;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -30,11 +32,14 @@ import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class LeveledCompactionStrategyTest extends SchemaLoader
 {
@@ -124,4 +129,66 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         // scanner.getCurrentPosition should be equal to total bytes of L1 sstables
         assert scanner.getCurrentPosition() == SSTable.getTotalBytes(sstables);
     }
+    @Test
+    public void testMutateLevel() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLeveled";
+        Table table = Table.open(ksname);
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+        // Enough data to have a level 1 and 2
+        int rows = 20;
+        int columns = 10;
+
+        // Adds enough data to trigger multiple sstable per level
+        for (int r = 0; r < rows; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            for (int c = 0; c < columns; c++)
+            {
+                rm.add(cfname, ByteBufferUtil.bytes("column" + c), value, 0);
+            }
+            rm.apply();
+            store.forceBlockingFlush();
+        }
+
+        LeveledCompactionStrategy strat = (LeveledCompactionStrategy)store.getCompactionStrategy();
+
+        while (strat.getLevelSize(0) > 1)
+        {
+            store.forceMajorCompaction();
+            Thread.sleep(200);
+        }
+
+        for(SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
+        {
+            assertTrue(s.getSSTableLevel() != 6);
+            strat.manifest.remove(s);
+            LeveledManifest.mutateLevel(s.getSSTableMetadata(), s.descriptor, s.descriptor.filenameFor(Component.STATS), 6);
+            s.reloadSSTableMetadata();
+            strat.manifest.add(s);
+        }
+
+        for(SSTableReader s : table.getColumnFamilyStore(cfname).getSSTables())
+        {
+            assertTrue(s.getSSTableLevel() == 6);
+        }
+
+        int [] levels = strat.manifest.getAllLevelSize();
+
+        for (int i =0; i < levels.length; i++)
+        {
+            if (i!=6)
+                assertTrue(levels[i] == 0);
+            else
+                assertTrue(levels[i] == table.getColumnFamilyStore(cfname).getSSTables().size());
+        }
+
+    }
+
+
 }


Mime
View raw message