cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1162223 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/notifications/ s...
Date Fri, 26 Aug 2011 19:55:26 GMT
Author: jbellis
Date: Fri Aug 26 19:55:25 2011
New Revision: 1162223

URL: http://svn.apache.org/viewvc?rev=1162223&view=rev
Log:
add LeveledCompactionStrategy (take 2)
patch by Ben Coverston; reviewed by jbellis for CASSANDRA-1608

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
    cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java
    cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java
    cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
    cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 26 19:55:25 2011
@@ -44,6 +44,8 @@
    Thrift<->Avro conversion methods (CASSANDRA-3032)
  * Add timeouts to client request schedulers (CASSANDRA-3079)
  * Cli to use hashes rather than array of hashes for strategy options (CASSANDRA-3081)
+ * LeveledCompactionStrategy (CASSANDRA-1608)
+
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri Aug 26 19:55:25 2011
@@ -67,7 +67,7 @@ public final class CFMetaData
     public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS = sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);
     public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
     public final static String DEFAULT_ROW_CACHE_PROVIDER = "org.apache.cassandra.cache.ConcurrentLinkedHashCacheProvider";
-    public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = "org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy";
+    public final static String DEFAULT_COMPACTION_STRATEGY_CLASS = "SizeTieredCompactionStrategy";
     public final static ByteBuffer DEFAULT_KEY_NAME = ByteBufferUtil.bytes("KEY");
     public final static boolean DEFAULT_COMPRESSION = false;
 
@@ -214,11 +214,11 @@ public final class CFMetaData
 
         try
         {
-            compactionStrategyClass = (Class<? extends AbstractCompactionStrategy>)Class.forName(DEFAULT_COMPACTION_STRATEGY_CLASS);
+            compactionStrategyClass = createCompactionSrategy(DEFAULT_COMPACTION_STRATEGY_CLASS);
         }
-        catch (Exception e)
+        catch (ConfigurationException e)
         {
-            throw new RuntimeException("Could not create Compaction Strategy of type " + DEFAULT_COMPACTION_STRATEGY_CLASS, e);
+            throw new AssertionError(e);
         }
         compactionStrategyOptions = new HashMap<String, String>();
     }
@@ -409,11 +409,11 @@ public final class CFMetaData
         {
             try
             {
-                newCFMD.compactionStrategyClass((Class<? extends AbstractCompactionStrategy>)Class.forName(cf.compaction_strategy.toString()));
+                newCFMD.compactionStrategyClass = createCompactionSrategy(cf.compaction_strategy.toString());
             }
-            catch (Exception e)
+            catch (ConfigurationException e)
             {
-                throw new RuntimeException("Could not create Compaction Strategy of type " + cf.compaction_strategy.toString(), e);
+                throw new RuntimeException(e);
             }
         }
         if (cf.compaction_strategy_options != null)
@@ -695,16 +695,7 @@ public final class CFMetaData
         if (cf_def.isSetKey_alias()) { newCFMD.keyAlias(cf_def.key_alias); }
         if (cf_def.isSetKey_validation_class()) { newCFMD.keyValidator(TypeParser.parse(cf_def.key_validation_class)); }
         if (cf_def.isSetCompaction_strategy())
-        {
-            try
-            {
-               newCFMD.compactionStrategyClass((Class<? extends AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy));
-            }
-            catch (Exception e)
-            {
-                throw new ConfigurationException("Unable to set Compaction Strategy Class of " + cf_def.compaction_strategy, e);
-            }
-        }
+            newCFMD.compactionStrategyClass = createCompactionSrategy(cf_def.compaction_strategy);
         if (cf_def.isSetCompaction_strategy_options())
             newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options));
 
@@ -812,16 +803,7 @@ public final class CFMetaData
         }
 
         if (cf_def.compaction_strategy != null)
-        {
-            try
-            {
-                compactionStrategyClass = (Class<? extends AbstractCompactionStrategy>)Class.forName(cf_def.compaction_strategy.toString());
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException("Could not create Compaction Strategy of type " + cf_def.compaction_strategy.toString(), e);
-            }
-        }
+            compactionStrategyClass = createCompactionSrategy(cf_def.compaction_strategy.toString());
 
         if (null != cf_def.compaction_strategy_options)
         {
@@ -832,7 +814,20 @@ public final class CFMetaData
 
         logger.debug("application result is {}", this);
     }
-    
+
+    private static Class<? extends AbstractCompactionStrategy> createCompactionSrategy(String className) throws ConfigurationException
+    {
+        className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
+        try
+        {
+            return (Class<? extends AbstractCompactionStrategy>) Class.forName(className);
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Could not create Compaction Strategy of type " + className, e);
+        }
+    }
+
     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
     {
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CollationController.java Fri Aug 26 19:55:25 2011
@@ -19,6 +19,14 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
@@ -28,14 +36,7 @@ import org.apache.cassandra.db.marshal.C
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CloseableIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.IntervalTree.Interval;
 
 public class CollationController
 {
@@ -75,7 +76,7 @@ public class CollationController
         logger.debug("collectTimeOrderedData");
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
         final ColumnFamily container = ColumnFamily.create(metadata, factory, filter.filter.isReversed());
-
+        List<SSTableReader> sstables = null;
         try
         {
             for (Memtable memtable : Iterables.concat(dataview.memtablesPendingFlush, Collections.singleton(dataview.memtable)))
@@ -96,8 +97,12 @@ public class CollationController
             filterColumns.addAll(((NamesQueryFilter) filter.filter).columns);
             QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns));
 
+            /* add the SSTables on disk */
+            sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key));
+            Collections.sort(sstables, SSTable.maxTimestampComparator);
+            SSTableReader.acquireReferences(sstables);
             // read sorted sstables
-            for (SSTableReader sstable : dataview.sstables)
+            for (SSTableReader sstable : sstables)
             {
                 long currentMaxTs = sstable.getMaxTimestamp();
                 reduceNameFilter(reducedFilter, container, currentMaxTs);
@@ -117,6 +122,7 @@ public class CollationController
         }
         finally
         {
+            SSTableReader.releaseReferences(sstables);
             for (IColumnIterator iter : iterators)
                 FileUtils.closeQuietly(iter);
         }
@@ -182,6 +188,7 @@ public class CollationController
         logger.debug("collectAllData");
         List<IColumnIterator> iterators = new ArrayList<IColumnIterator>();
         ColumnFamily returnCF = ColumnFamily.create(metadata, factory, filter.filter.isReversed());
+        List<SSTableReader> sstables = null;
 
         try
         {
@@ -196,7 +203,9 @@ public class CollationController
             }
 
             /* add the SSTables on disk */
-            for (SSTableReader sstable : dataview.sstables)
+            sstables = dataview.intervalTree.search(new Interval(filter.key, filter.key));
+            SSTableReader.acquireReferences(sstables);
+            for (SSTableReader sstable : sstables)
             {
                 IColumnIterator iter = filter.getSSTableColumnIterator(sstable);
                 iterators.add(iter);
@@ -209,6 +218,7 @@ public class CollationController
         }
         finally
         {
+            SSTableReader.releaseReferences(sstables);
             for (IColumnIterator iter : iterators)
                 FileUtils.closeQuietly(iter);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 26 19:55:25 2011
@@ -35,11 +35,7 @@ import com.google.common.collect.Iterabl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.AutoSavingCache;
-import org.apache.cassandra.cache.AutoSavingKeyCache;
-import org.apache.cassandra.cache.AutoSavingRowCache;
-import org.apache.cassandra.cache.ConcurrentLinkedHashCache;
-import org.apache.cassandra.cache.ICache;
+import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -49,17 +45,20 @@ import org.apache.cassandra.db.commitlog
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.IntervalTree.Interval;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@ -137,7 +136,7 @@ public class ColumnFamilyStore implement
     private volatile DefaultDouble memops;
     private volatile DefaultInteger rowCacheSaveInSeconds;
     private volatile DefaultInteger keyCacheSaveInSeconds;
-    private volatile DefaultInteger rowCacheKeysToSave; 
+    private volatile DefaultInteger rowCacheKeysToSave;
 
     /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
     public final Lock flushLock = new ReentrantLock();
@@ -173,7 +172,7 @@ public class ColumnFamilyStore implement
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
-        
+
         // only update these runtime-modifiable settings if they have not been modified.
         if (!minCompactionThreshold.isModified())
             for (ColumnFamilyStore cfs : concatWithIndexes())
@@ -194,11 +193,12 @@ public class ColumnFamilyStore implement
         if (!rowCacheKeysToSave.isModified())
             rowCacheKeysToSave = new DefaultInteger(metadata.getRowCacheKeysToSave());
 
+        compactionStrategy.shutdown();
         compactionStrategy = metadata.createCompactionStrategyInstance(this);
 
         updateCacheSizes();
         scheduleCacheSaving(rowCacheSaveInSeconds.value(), keyCacheSaveInSeconds.value(), rowCacheKeysToSave.value());
-        
+
         indexManager.reload();
     }
 
@@ -206,11 +206,10 @@ public class ColumnFamilyStore implement
     {
         assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
         this.table = table;
-        columnFamily = columnFamilyName; 
+        columnFamily = columnFamilyName;
         this.metadata = metadata;
         this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold());
         this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold());
-        this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
         this.memsize = new DefaultInteger(metadata.getMemtableThroughputInMb());
         this.memops = new DefaultDouble(metadata.getMemtableOperationsInMillions());
         this.rowCacheSaveInSeconds = new DefaultInteger(metadata.getRowCacheSavePeriodInSeconds());
@@ -241,6 +240,9 @@ public class ColumnFamilyStore implement
         }
         data.addSSTables(sstables);
 
+        // compaction strategy should be created after the CFS has been prepared
+        this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
+
         // create the private ColumnFamilyStores for the secondary column indexes
         for (ColumnDefinition info : metadata.getColumn_metadata().values())
         {
@@ -337,7 +339,7 @@ public class ColumnFamilyStore implement
 
         return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata);
     }
-    
+
     /**
      * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
@@ -395,7 +397,7 @@ public class ColumnFamilyStore implement
                     if (!file.delete())
                         logger.warn("could not delete " + file.getAbsolutePath());
         }
-        
+
         // also clean out any index leftovers.
         CFMetaData cfm = Schema.instance.getCFMetaData(table, columnFamily);
         if (cfm != null) // secondary indexes aren't stored in DD.
@@ -886,12 +888,21 @@ public class ColumnFamilyStore implement
      */
     public boolean isKeyInRemainingSSTables(DecoratedKey key, Set<? extends SSTable> sstablesToIgnore)
     {
-        for (SSTableReader sstable : data.getSSTables())
+        DataTracker.View currentView = markCurrentViewReferenced();
+        try
         {
-            if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
-                return true;
+            List<SSTableReader> filteredSSTables = currentView.intervalTree.search(new Interval(key, key));
+            for (SSTableReader sstable : filteredSSTables)
+            {
+                if (!sstablesToIgnore.contains(sstable) && sstable.getBloomFilter().isPresent(key.key))
+                    return true;
+            }
+            return false;
+        }
+        finally
+        {
+            SSTableReader.releaseReferences(currentView.sstables);
         }
-        return false;
     }
 
     /*
@@ -983,7 +994,7 @@ public class ColumnFamilyStore implement
     public void removeAllSSTables()
     {
         data.removeAllSSTables();
-        indexManager.removeAllIndexes();    
+        indexManager.removeAllIndexes();
     }
 
     public long getMemtableColumnsCount()
@@ -1177,7 +1188,7 @@ public class ColumnFamilyStore implement
             ColumnFamily cached = cacheRow(filter.key);
             if (cached == null)
                 return null;
- 
+
             return filterColumnFamily(cached, filter, gcBefore);
         }
         finally
@@ -1219,7 +1230,7 @@ public class ColumnFamilyStore implement
                     // top-level columns
                     if (sliceFilter.count >= cached.getColumnCount())
                     {
-                        removeDeletedColumnsOnly(cached, gcBefore);                    
+                        removeDeletedColumnsOnly(cached, gcBefore);
                         return removeDeletedCF(cached, gcBefore);
                     }
                 }
@@ -1279,12 +1290,12 @@ public class ColumnFamilyStore implement
         DataTracker.View currentView = markCurrentViewReferenced();
         try
         {
-            CollationController controller = new CollationController(currentView, factory, filter, metadata, gcBefore);
-            ColumnFamily columns = controller.getTopLevelColumns();
-            recentSSTablesPerRead.add(controller.getSstablesIterated());
-            sstablesPerRead.add(controller.getSstablesIterated());
-            return columns;
-        }
+        CollationController controller = new CollationController(currentView, factory, filter, metadata, gcBefore);
+        ColumnFamily columns = controller.getTopLevelColumns();
+        recentSSTablesPerRead.add(controller.getSstablesIterated());
+        sstablesPerRead.add(controller.getSstablesIterated());
+        return columns;
+    }
         finally
         {
             SSTableReader.releaseReferences(currentView.sstables);
@@ -1293,7 +1304,7 @@ public class ColumnFamilyStore implement
 
     /**
       * Fetch a range of rows and columns from memtables/sstables.
-      * 
+      *
       * @param superColumn optional SuperColumn to slice subcolumns of; null to slice top-level columns
       * @param range Either a Bounds, which includes start key, or a Range, which does not.
       * @param maxResults Maximum rows to return
@@ -1322,6 +1333,14 @@ public class ColumnFamilyStore implement
             // It is fine to aliases the View.sstables since it's an unmodifiable collection
             Collection<SSTableReader> sstables = currentView.sstables;
 
+            Comparable startWithComp = startWith;
+            Comparable stopAtComp = stopAt;
+            if (startWith.token.equals(partitioner.getMinimumToken()))
+                startWithComp = currentView.intervalTree.min;
+            if (stopAt.token.equals(partitioner.getMinimumToken()))
+                stopAtComp = currentView.intervalTree.max;
+            sstables = currentView.intervalTree.search(new Interval(startWithComp, stopAtComp));
+
             CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
             List<Row> rows = new ArrayList<Row>();
 
@@ -1373,12 +1392,12 @@ public class ColumnFamilyStore implement
             SSTableReader.releaseReferences(currentView.sstables);
         }
     }
-    
+
     public List<Row> search(IndexClause clause, AbstractBounds range, IFilter dataFilter)
     {
         return indexManager.search(clause, range, dataFilter);
     }
-  
+
     public AbstractType getComparator()
     {
         return metadata.comparator;
@@ -1421,8 +1440,8 @@ public class ColumnFamilyStore implement
 
     /**
      * Take a snap shot of this columnfamily store.
-     * 
-     * @param snapshotName the name of the associated with the snapshot 
+     *
+     * @param snapshotName the name of the associated with the snapshot
      */
     public void snapshot(String snapshotName)
     {
@@ -1677,7 +1696,7 @@ public class ColumnFamilyStore implement
         return data.getRecentBloomFilterFalseRatio();
     }
 
-   
+
 
     @Override
     public String toString()
@@ -1715,7 +1734,7 @@ public class ColumnFamilyStore implement
     {
         return minCompactionThreshold.value();
     }
-    
+
     public void setMinimumCompactionThreshold(int minCompactionThreshold)
     {
         if ((minCompactionThreshold > this.maxCompactionThreshold.value()) && this.maxCompactionThreshold.value() != 0)
@@ -1938,4 +1957,11 @@ public class ColumnFamilyStore implement
 
         return reader;
     }
+
+    public int getUnleveledSSTables()
+    {
+        return this.compactionStrategy instanceof LeveledCompactionStrategy
+               ? ((LeveledCompactionStrategy) this.compactionStrategy).getLevelSize(0)
+               : 0;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Fri Aug 26 19:55:25 2011
@@ -240,4 +240,9 @@ public interface ColumnFamilyStoreMBean
      * determine which SSTables should be loaded and load them
      */
     public void loadNewSSTables();
+
+    /**
+     * @return the number of SSTables in L0.  Always return 0 if Leveled compaction is not enabled.
+     */
+    public int getUnleveledSSTables();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Aug 26 19:55:25 2011
@@ -20,16 +20,13 @@
 package org.apache.cassandra.db;
 
 import java.io.File;
-import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,12 +35,22 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class DataTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(DataTracker.class);
 
+    public Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<INotificationConsumer>();
+
     public final ColumnFamilyStore cfstore;
 
     private final AtomicReference<View> view;
@@ -134,26 +141,27 @@ public class DataTracker
         addNewSSTablesSize(Arrays.asList(sstable));
         cfstore.updateCacheSizes();
 
+        notifyAdded(sstable);
         incrementallyBackup(sstable);
     }
 
-    public void incrementallyBackup(SSTableReader sstable)
+    public void incrementallyBackup(final SSTableReader sstable)
     {
-        if (DatabaseDescriptor.incrementalBackupsEnabled())
+        if (!DatabaseDescriptor.incrementalBackupsEnabled())
+            return;
+
+        Runnable runnable = new WrappedRunnable()
         {
-            File keyspaceDir = new File(sstable.getFilename()).getParentFile();
-            File backupsDir = new File(keyspaceDir, "backups");
-            try
+            protected void runMayThrow() throws Exception
             {
+                File keyspaceDir = new File(sstable.getFilename()).getParentFile();
+                File backupsDir = new File(keyspaceDir, "backups");
                 if (!backupsDir.exists() && !backupsDir.mkdirs())
                     throw new IOException("Unable to create " + backupsDir);
                 sstable.createLinks(backupsDir.getCanonicalPath());
             }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
+        };
+        StorageService.tasks.execute(runnable);
     }
 
     /**
@@ -233,6 +241,7 @@ public class DataTracker
     {
         addSSTables(Arrays.asList(sstable));
         incrementallyBackup(sstable);
+        notifyAdded(sstable);
     }
 
     public void removeAllSSTables()
@@ -246,7 +255,8 @@ public class DataTracker
         view.set(new View(new Memtable(cfstore),
                           Collections.<Memtable>emptySet(),
                           Collections.<SSTableReader>emptyList(),
-                          Collections.<SSTableReader>emptySet()));
+                          Collections.<SSTableReader>emptySet(),
+                          new IntervalTree()));
     }
 
     private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
@@ -262,6 +272,7 @@ public class DataTracker
         addNewSSTablesSize(replacements);
         removeOldSSTablesSize(oldSSTables);
 
+        notifySSTablesChanged(replacements, oldSSTables);
         cfstore.updateCacheSizes();
     }
 
@@ -455,6 +466,35 @@ public class DataTracker
         return (double) falseCount / (trueCount + falseCount);
     }
 
+    public void notifySSTablesChanged(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+    {
+        for (INotificationConsumer subscriber : subscribers)
+        {
+            INotification notification = new SSTableListChangedNotification(added, removed);
+            subscriber.handleNotification(notification, this);
+        }
+    }
+
+    public void notifyAdded(SSTableReader added)
+    {
+        for (INotificationConsumer subscriber : subscribers)
+        {
+            INotification notification = new SSTableAddedNotification(added);
+            subscriber.handleNotification(notification, this);
+        }
+    }
+
+    public void subscribe(INotificationConsumer consumer)
+    {
+        subscribers.add(consumer);
+    }
+
+    public void unsubscribe(INotificationConsumer consumer)
+    {
+        boolean found = subscribers.remove(consumer);
+        assert found : consumer + " not subscribed";
+    }
+
     /**
      * An immutable structure holding the current memtable, the memtables pending
      * flush, the sstables for a column family, and the sstables that are active
@@ -471,49 +511,63 @@ public class DataTracker
         // Obviously, dropping sstables whose max column timestamp happens to be equal to another's
         // is not acceptable for us.  So, we use a List instead.
         public final List<SSTableReader> sstables;
+        public final IntervalTree intervalTree;
 
-        View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting)
+        View(Memtable memtable, Set<Memtable> pendingFlush, List<SSTableReader> sstables, Set<SSTableReader> compacting, IntervalTree intervalTree)
         {
             this.memtable = memtable;
             this.memtablesPendingFlush = pendingFlush;
             this.sstables = sstables;
             this.compacting = compacting;
+            this.intervalTree = intervalTree;
+        }
+
+        private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
+        {
+            List<SSTableReader> itsstList = ImmutableList.copyOf(Ordering.from(SSTable.sstableComparator).sortedCopy(sstables));
+            List<Interval> intervals = new ArrayList<Interval>(itsstList.size());
+            for (SSTableReader sstable : itsstList)
+                intervals.add(new Interval<SSTableReader>(sstable.first, sstable.last, sstable));
+            assert intervals.size() == sstables.size();
+            return new IntervalTree<SSTableReader>(intervals);
         }
 
         public View switchMemtable(Memtable newMemtable)
         {
             Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
-            return new View(newMemtable, newPending, sstables, compacting);
+            return new View(newMemtable, newPending, sstables, compacting, intervalTree);
         }
 
         public View renewMemtable(Memtable newMemtable)
         {
-            return new View(newMemtable, memtablesPendingFlush, sstables, compacting);
+            return new View(newMemtable, memtablesPendingFlush, sstables, compacting, intervalTree);
         }
 
         public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
         {
             Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable)));
             List<SSTableReader> newSSTables = newSSTables(newSSTable);
-            return new View(memtable, newPending, Collections.unmodifiableList(newSSTables), compacting);
+            IntervalTree intervalTree = buildIntervalTree(newSSTables);
+            return new View(memtable, newPending, Collections.unmodifiableList(newSSTables), compacting, intervalTree);
         }
 
         public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
         {
             List<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
-            return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting);
+            IntervalTree intervalTree = buildIntervalTree(newSSTables);
+            return new View(memtable, memtablesPendingFlush, Collections.unmodifiableList(newSSTables), compacting, intervalTree);
         }
 
         public View markCompacting(Collection<SSTableReader> tomark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
+            return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
         }
 
         public View unmarkCompacting(Collection<SSTableReader> tounmark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
+            return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
         }
 
         private List<SSTableReader> newSSTables(SSTableReader newSSTable)
@@ -534,7 +588,6 @@ public class DataTracker
             }
             Iterables.addAll(newSSTables, replacements);
             assert newSSTables.size() == newSSTablesSize;
-            Collections.sort(newSSTables, SSTable.maxTimestampComparator);
             return newSSTables;
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Fri Aug 26 19:55:25 2011
@@ -89,7 +89,6 @@ public class RowIteratorFactory
             iterators.add(new ConvertToColumnIterator(filter, comparator, p, memtable.getEntryIterator(startWith)));
         }
 
-        // sstables
         for (SSTableReader sstable : sstables)
         {
             final SSTableScanner scanner = sstable.getScanner(RANGE_FILE_BUFFER_SIZE, filter);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java Fri Aug 26 19:55:25 2011
@@ -50,11 +50,18 @@ public abstract class AbstractCompaction
 
     protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
+        assert cfs != null;
         this.cfs = cfs;
         this.options = options;
     }
 
     /**
+     * Releases any resources if this strategy is shutdown (when the CFS is reloaded after a schema change).
+     * Default is to do nothing.
+     */
+    public void shutdown() { }
+
+    /**
      * @return a list of compaction tasks that should run in the background to get the sstable
      * count down to desired parameters. Will not be null, but may be empty.
      * @param gcBefore throw away tombstones older than this

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Aug 26 19:55:25 2011
@@ -19,14 +19,8 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
@@ -49,6 +43,7 @@ public class CompactionTask extends Abst
     protected String compactionFileLocation;
     protected final int gcBefore;
     protected boolean isUserDefined;
+    protected static long totalBytesCompacted = 0;
 
     public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore)
     {
@@ -58,6 +53,11 @@ public class CompactionTask extends Abst
         this.isUserDefined = false;
     }
 
+    public static synchronized long addToTotalBytesCompacted(long bytesCompacted)
+    {
+        return totalBytesCompacted += bytesCompacted;
+    }
+
     /**
      * For internal use and testing only.  The rest of the system should go through the submit* methods,
      * which are properly serialized.
@@ -72,7 +72,7 @@ public class CompactionTask extends Abst
         Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
         if (!isUserDefined)
         {
-            if (toCompact.size() < 2)
+            if ( !allowSingletonCompaction() && toCompact.size() < 2)
             {
                 logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." +
                             "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " +
@@ -128,13 +128,18 @@ public class CompactionTask extends Abst
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
-        SSTableWriter writer = null;
-        final SSTableReader ssTable;
         CompactionIterable ci = new CompactionIterable(type, toCompact, controller); // retain a handle so we can call close()
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
+        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+        // replace the old entries.  Track entries to preheat here until then.
+        Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap =  new HashMap<SSTableReader, Map<DecoratedKey, Long>>();
+
+        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
+        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
+
         if (collector != null)
             collector.beginCompaction(ci);
         try
@@ -148,13 +153,14 @@ public class CompactionTask extends Abst
                 return 0;
             }
 
-            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
+            SSTableWriter writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
+            writers.add(writer);
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();
                 if (row.isEmpty())
                     continue;
-                
+
                 long position = writer.append(row);
                 totalkeysWritten++;
 
@@ -169,32 +175,70 @@ public class CompactionTask extends Abst
                         }
                     }
                 }
+                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
+                {
+                    SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+                    cachedKeyMap.put(toIndex, cachedKeys);
+                    sstables.add(toIndex);
+                    writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
+                    writers.add(writer);
+                    cachedKeys = new HashMap<DecoratedKey, Long>();
+                }
             }
-            ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
         }
         finally
         {
             iter.close();
             if (collector != null)
                 collector.finishCompaction(ci);
-            if (writer != null)
+            for (SSTableWriter writer : writers)
                 writer.cleanupIfNecessary();
         }
 
-        cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
-        for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
-            ssTable.cacheKey(entry.getKey(), entry.getValue());
+        cfs.replaceCompactedSSTables(toCompact, sstables);
+        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
+        for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
+        {
+            SSTableReader key = ssTableReaderMapEntry.getKey();
+            for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
+               key.cacheKey(entry.getKey(), entry.getValue());
+        }
+
         CompactionManager.instance.submitBackground(cfs);
 
         long dTime = System.currentTimeMillis() - startTime;
         long startsize = SSTable.getTotalBytes(toCompact);
-        long endsize = ssTable.length();
+        long endsize = SSTable.getTotalBytes(sstables);
         double ratio = (double)endsize / (double)startsize;
-        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.",
-                ssTable.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        for (SSTableReader reader : sstables)
+            builder.append(reader.getFilename()).append(",");
+        builder.append("]");
+
+        double mbps = dTime > 0 ? (double)endsize/(1024*1024)/((double)dTime/1000) : 0;
+        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys at %fMBPS.  Time: %,dms.",
+                                  builder.toString(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, mbps, dTime));
+        logger.info(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
         return toCompact.size();
     }
 
+    //extensibility point for other strategies that may want to limit the upper bounds of the sstable segment size
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+    {
+        return false;
+    }
+
+    /**
+     * extend this if the overridden compaction strategy requires single files to be compacted to function properly
+     * @return boolean
+     */
+    protected boolean allowSingletonCompaction()
+    {
+        return false;
+    }
+
     public static long getMaxDataAge(Collection<SSTableReader> sstables)
     {
         long max = 0;

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,133 @@
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.service.StorageService;
+
+public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+{
+    private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
+
+    private LeveledManifest manifest;
+    private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
+    private final int maxSSTableSize;
+
+    public class ScheduledBackgroundCompaction implements Runnable
+    {
+        ColumnFamilyStore cfs;
+
+        public ScheduledBackgroundCompaction(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+        }
+
+        public void run()
+        {
+            if (CompactionManager.instance.getActiveCompactions() == 0)
+            {
+                CompactionManager.instance.submitBackground(cfs);
+            }
+        }
+    }
+
+    public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        super(cfs, options);
+        int configuredMaxSSTableSize = 5;
+        if (options != null)
+        {
+            String value = options.containsKey(SSTABLE_SIZE_OPTION) ? options.get(SSTABLE_SIZE_OPTION) : null;
+            if (null != value)
+            {
+                try
+                {
+                    configuredMaxSSTableSize = Integer.parseInt(value);
+                }
+                catch (NumberFormatException ex)
+                {
+                    logger.warn(String.format("%s is not a parsable int (base10) for %s using default value",
+                                              value, SSTABLE_SIZE_OPTION));
+                }
+            }
+        }
+        maxSSTableSize = configuredMaxSSTableSize;
+
+        cfs.getDataTracker().subscribe(this);
+        logger.info(this + " subscribed to the data tracker.");
+
+        manifest = LeveledManifest.create(cfs, this.maxSSTableSize);
+        // override min/max for this strategy
+        cfs.setMaximumCompactionThreshold(Integer.MAX_VALUE);
+        cfs.setMinimumCompactionThreshold(1);
+
+        DebuggableScheduledThreadPoolExecutor st = StorageService.scheduledTasks;
+        st.scheduleAtFixedRate(new ScheduledBackgroundCompaction(cfs), 10000, 3000, TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown()
+    {
+        cfs.getDataTracker().unsubscribe(this);
+    }
+
+    public int getLevelSize(int i)
+    {
+        return manifest.getLevelSize(i);
+    }
+
+    public synchronized List<AbstractCompactionTask> getBackgroundTasks(int gcBefore)
+    {
+        Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
+        logger.debug("CompactionManager candidates are {}", StringUtils.join(sstables, ","));
+        if (sstables.isEmpty())
+            return Collections.emptyList();
+        LeveledCompactionTask task = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSize);
+        return Collections.<AbstractCompactionTask>singletonList(task);
+    }
+
+    public List<AbstractCompactionTask> getMaximalTasks(int gcBefore)
+    {
+        return Collections.emptyList();
+    }
+
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return 0;
+    }
+
+    public void handleNotification(INotification notification, Object sender)
+    {
+        if (notification instanceof SSTableAddedNotification)
+        {
+            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+            manifest.add(flushedNotification.added);
+            manifest.logDistribution();
+        }
+        else if (notification instanceof SSTableListChangedNotification)
+        {
+            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+            manifest.promote(listChangedNotification.removed, listChangedNotification.added);
+            manifest.logDistribution();
+        }
+    }
+}
\ No newline at end of file

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,31 @@
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collection;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
+public class LeveledCompactionTask extends CompactionTask
+{
+    private final int sstableSizeInMB;
+
+    public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore, int sstableSizeInMB)
+    {
+        super(cfs, sstables, gcBefore);
+        this.sstableSizeInMB = sstableSizeInMB;
+    }
+
+    @Override
+    protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer, long position)
+    {
+        return position > sstableSizeInMB * 1024 * 1024;
+    }
+
+    @Override
+    protected boolean allowSingletonCompaction()
+    {
+        return true;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,340 @@
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class LeveledManifest
+{
+    private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
+
+    private final ColumnFamilyStore cfs;
+    private final List<SSTableReader>[] generations;
+    private final DecoratedKey[] lastCompactedKeys;
+    private final int maxSSTableSizeInMB;
+    private static int MAX_COMPACTING_L0 = 32;
+
+    private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB)
+    {
+        this.cfs = cfs;
+        this.maxSSTableSizeInMB = maxSSTableSizeInMB;
+
+        // allocate enough generations for a PB of data
+        int n = (int) Math.log10(1000 * 1000 * 1000 / maxSSTableSizeInMB);
+        generations = new List[n];
+        lastCompactedKeys = new DecoratedKey[n];
+        for (int i = 0; i < generations.length; i++)
+        {
+            generations[i] = new ArrayList<SSTableReader>();
+            lastCompactedKeys[i] = new DecoratedKey(cfs.partitioner.getMinimumToken(), null);
+        }
+    }
+
+    static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize)
+    {
+        LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize);
+        load(cfs, manifest);
+
+        // ensure all SSTables are in the manifest
+        for (SSTableReader ssTableReader : cfs.getSSTables())
+        {
+            if (manifest.levelOf(ssTableReader) < 0)
+                manifest.add(ssTableReader);
+        }
+
+        return manifest;
+    }
+
+    private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
+    {
+        ObjectMapper m = new ObjectMapper();
+        try
+        {
+            File manifestFile = tryGetManifest(cfs);
+
+            if (manifestFile != null && manifestFile.exists())
+            {
+                JsonNode rootNode = m.readValue(manifestFile, JsonNode.class);
+                JsonNode generations = rootNode.get("generations");
+                assert generations.isArray();
+                for (JsonNode generation : generations)
+                {
+                    int level = generation.get("generation").getIntValue();
+                    JsonNode generationValues = generation.get("members");
+                    for (JsonNode generationValue : generationValues)
+                    {
+                        for (SSTableReader ssTableReader : cfs.getSSTables())
+                        {
+                            if (ssTableReader.descriptor.generation == generationValue.getIntValue())
+                            {
+                                logger.debug("Loading {} at L{}", ssTableReader, level);
+                                manifest.add(ssTableReader, level);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public synchronized void add(SSTableReader reader)
+    {
+        logger.debug("Adding {} to L0", reader);
+        add(reader, 0);
+        serialize();
+    }
+
+    // if the number of SSTables in the current compacted set exeeds the target level, find an empty level
+    private int skipLevels(int newLevel, Iterable<SSTableReader> added)
+    {
+        // skip newlevel if the resulting sstables exceed newlevel threshold
+        if (maxBytesForLevel(newLevel) < SSTableReader.getTotalBytes(added)
+            && SSTableReader.getTotalBytes(generations[(newLevel + 1)]) == 0)
+        {
+            newLevel = skipLevels(newLevel + 1, added);
+        }
+        return newLevel;
+    }
+
+    public synchronized void promote(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+    {
+        logger.debug("Replacing [{}] with [{}]", StringUtils.join(removed.iterator(), ", "), StringUtils.join(added.iterator(), ", "));
+        
+        // the level for the added sstables is the max of the removed ones,
+        // plus one if the removed were all on the same level
+        int minimumLevel = Integer.MAX_VALUE;
+        int maximumLevel = 0;
+        for (SSTableReader sstable : removed)
+        {
+            int thisLevel = levelOf(sstable);
+            maximumLevel = Math.max(maximumLevel, thisLevel);
+            minimumLevel = Math.min(minimumLevel, thisLevel);
+            remove(sstable);
+        }
+
+        // it's valid to do a remove w/o an add (e.g. on truncate)
+        if (!added.iterator().hasNext())
+            return;
+
+        int newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
+        newLevel = skipLevels(newLevel, added);
+
+        lastCompactedKeys[minimumLevel] = SSTable.sstableOrdering.max(added).last;
+        logger.debug("Adding [{}] to L{}",
+                     StringUtils.join(added.iterator(), ", "), newLevel);
+        for (SSTableReader ssTableReader : added)
+            add(ssTableReader, newLevel);
+
+        serialize();
+    }
+
+
+    private double maxBytesForLevel (int level)
+    {
+        return level == 0
+               ? 4 * maxSSTableSizeInMB * 1024 * 1024
+               : Math.pow(10, level) * maxSSTableSizeInMB * 1024 * 1024;
+    }
+
+    public synchronized Collection<SSTableReader> getCompactionCandidates()
+    {
+        logDistribution();
+
+        double bestScore = -1;
+        int bestLevel = -1;
+        for (int level = 0; level < generations.length; level++)
+        {
+            List<SSTableReader> sstables = generations[level];
+            if (sstables.isEmpty())
+                continue;
+
+            double score = SSTableReader.getTotalBytes(sstables) / maxBytesForLevel(level);
+            //if we're idle and we don't have anything better to do schedule a compaction for L0
+            //by setting its threshold to some very low value
+            score = (level == 0 && score < 1) ? 1.001 : 0;
+            logger.debug("Compaction score for level {} is {}", level, score);
+            if (score > bestScore)
+            {
+                bestScore = score;
+                bestLevel = level;
+            }
+        }
+
+        // if we have met at least one of our thresholds then trigger a compaction
+        return bestScore > 1 ? getCandidatesFor(bestLevel) : Collections.<SSTableReader>emptyList();
+    }
+
+    public int getLevelSize(int i)
+    {
+
+        return generations.length > i ? generations[i].size() : 0;
+    }
+
+    public void logDistribution()
+    {
+        for (int i = 0; i < generations.length; i++)
+            logger.debug("Level {} contains {} SSTables", i, generations[i].size());
+    }
+
+    private int levelOf(SSTableReader sstable)
+    {
+        for (int level = 0; level < generations.length; level++)
+        {
+            if (generations[level].contains(sstable))
+                return level;
+        }
+        return -1;
+    }
+
+    private void remove(SSTableReader reader)
+    {
+        int level = levelOf(reader);
+        assert level >= 0 : reader + " not present in manifest";
+        generations[level].remove(reader);
+    }
+
+    private void add(SSTableReader sstable, int level)
+    {
+        generations[level].add(sstable);
+    }
+
+    private static List<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> candidates)
+    {
+        List<SSTableReader> overlapped = new ArrayList<SSTableReader>();
+        overlapped.add(sstable);
+
+        Range promotedRange = new Range(sstable.first.token, sstable.last.token);
+        for (SSTableReader candidate : candidates)
+        {
+            Range candidateRange = new Range(candidate.first.token, candidate.last.token);
+            if (candidateRange.intersects(promotedRange))
+                overlapped.add(candidate);
+        }
+        return overlapped;
+    }
+
+    private Collection<SSTableReader> getCandidatesFor(int level)
+    {
+        assert !generations[level].isEmpty();
+
+        if (level == 0)
+        {
+            // because L0 files may overlap each other, we treat compactions there specially:
+            // a L0 compaction also checks other L0 files for overlap.
+            Set<SSTableReader> candidates = new HashSet<SSTableReader>();
+            Set<SSTableReader> remaining = new HashSet<SSTableReader>(generations[0]);
+
+            while (!remaining.isEmpty())
+            {
+                // pick a random sstable from L0, and any that overlap with it
+                List<SSTableReader> L0 = overlapping(remaining.iterator().next(), remaining);
+                //but not too many, as we could run out of memory
+                L0 = L0.size() > MAX_COMPACTING_L0 ? L0.subList(0, MAX_COMPACTING_L0) : L0;
+                // add the overlapping ones from L1
+                for (SSTableReader sstable : L0)
+                {
+                    candidates.addAll(overlapping(sstable, generations[1]));
+                    remaining.remove(sstable);
+                }
+            }
+            return candidates;
+        }
+
+        // for non-L0 compactions, pick up where we left off last time
+        Collections.sort(generations[level], SSTable.sstableComparator);
+        for (SSTableReader sstable : generations[level])
+        {
+            // the first sstable that is > than the marked
+            if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
+                return overlapping(sstable, generations[(level + 1)]);
+        }
+        // or if there was no last time, start with the first sstable
+        return overlapping(generations[level].get(0), generations[(level + 1)]);
+    }
+
+    public synchronized void serialize()
+    {
+        String dataFileLocation = getDataFilePrefix(cfs);
+        String tempManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "tmp.json";
+        String manifestFileName = dataFileLocation + cfs.getColumnFamilyName() + ".json";
+        String oldManifestFileName = dataFileLocation + cfs.getColumnFamilyName() + "-" + "old.json";
+
+        File tmpManifest = new File(tempManifestFileName);
+
+        JsonFactory f = new JsonFactory();
+
+        try
+        {
+            JsonGenerator g = f.createJsonGenerator(tmpManifest, JsonEncoding.UTF8);
+            g.useDefaultPrettyPrinter();
+            g.writeStartObject();
+            g.writeArrayFieldStart("generations");
+            for (int level = 0; level < generations.length; level++)
+            {
+                g.writeStartObject();
+                g.writeNumberField("generation", level);
+                g.writeArrayFieldStart("members");
+                for (SSTableReader ssTableReader : generations[level])
+                    g.writeNumber(ssTableReader.descriptor.generation);
+                g.writeEndArray(); // members
+
+                g.writeEndObject(); // generation
+            }
+            g.writeEndArray(); // for field generations
+            g.writeEndObject(); // write global object
+            g.close();
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+        File oldFile = new File(oldManifestFileName);
+        if (oldFile.exists())
+            oldFile.delete();
+        File currentManifest = new File(manifestFileName);
+        if (currentManifest.exists())
+            currentManifest.renameTo(new File(oldManifestFileName));
+        if (tmpManifest.exists())
+            tmpManifest.renameTo(new File(manifestFileName));
+    }
+
+    public static File tryGetManifest(ColumnFamilyStore cfs)
+    {
+        for (String dataFileLocation : DatabaseDescriptor.getAllDataFileLocations())
+        {
+            dataFileLocation = getDataFilePrefix(cfs);
+            String manifestFileName = dataFileLocation + System.getProperty("file.separator") + cfs.table.name + ".json";
+            File manifestFile = new File(manifestFileName);
+            if (manifestFile.exists())
+                return manifestFile;
+        }
+        return null;
+    }
+
+    public static String getDataFilePrefix(ColumnFamilyStore cfs)
+    {
+        return DatabaseDescriptor.getAllDataFileLocations()[0] + System.getProperty("file.separator") + cfs.table.name + System.getProperty("file.separator");
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Fri Aug 26 19:55:25 2011
@@ -24,6 +24,8 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.*;
 
+import com.google.common.collect.Ordering;
+import org.apache.cassandra.db.DecoratedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +76,9 @@ public abstract class SSTable
     public final IPartitioner partitioner;
     public final boolean compression;
 
+    public DecoratedKey first;
+    public DecoratedKey last;
+
     protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
     {
         this(descriptor, new HashSet<Component>(), metadata, partitioner);
@@ -98,6 +103,16 @@ public abstract class SSTable
         this.partitioner = partitioner;
     }
 
+    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
+    {
+        public int compare(SSTableReader o1, SSTableReader o2)
+        {
+            return o1.first.compareTo(o2.first);
+        }
+    };
+
+    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
+
     /**
      * We use a ReferenceQueue to manage deleting files that have been compacted
      * and for which no more SSTable references exist.  But this is not guaranteed
@@ -154,7 +169,8 @@ public abstract class SSTable
         }
         catch (Exception e)
         {
-            if (!"snapshots".equals(name) && !"backups".equals(name))
+            if (!"snapshots".equals(name) && !"backups".equals(name)
+                    && !name.contains(".json"))
                 logger.warn("Invalid file '{}' in data directory {}.", name, dir);
             return null;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Aug 26 19:55:25 2011
@@ -261,6 +261,7 @@ public class SSTableReader extends SSTab
 
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
         RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+        DecoratedKey left = null, right = null;
         try
         {
             if (keyCache != null && keyCache.getCapacity() - keyCache.size() < keysToLoadInCache.size())
@@ -278,10 +279,19 @@ public class SSTableReader extends SSTab
                 if (indexPosition == indexSize)
                     break;
 
+                ByteBuffer key = null, skippedKey;
+                skippedKey = ByteBufferUtil.readWithShortLength(input);
+
                 boolean shouldAddEntry = indexSummary.shouldAddEntry();
-                ByteBuffer key = (shouldAddEntry || cacheLoading || recreatebloom)
-                             ? ByteBufferUtil.readWithShortLength(input)
-                             : ByteBufferUtil.skipShortLength(input);
+                if (shouldAddEntry || cacheLoading || recreatebloom)
+                {
+                    key = skippedKey;
+                }
+
+                if(null == left)
+                    left = decodeKey(partitioner, descriptor, skippedKey);
+                right = decodeKey(partitioner, descriptor, skippedKey);
+
                 long dataPosition = input.readLong();
                 if (key != null)
                 {
@@ -304,6 +314,8 @@ public class SSTableReader extends SSTab
         {
             FileUtils.closeQuietly(input);
         }
+        this.first = left;
+        this.last = right;
 
         // finalize the state of the reader
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1162223&r1=1162222&r2=1162223&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Aug 26 19:55:25 2011
@@ -136,6 +136,9 @@ public class SSTableWriter extends SSTab
     private void afterAppend(DecoratedKey decoratedKey, long dataPosition) throws IOException
     {
         lastWrittenKey = decoratedKey;
+        this.last = lastWrittenKey;
+        if(null == this.first)
+            this.first = lastWrittenKey;
 
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
@@ -248,6 +251,8 @@ public class SSTableWriter extends SSTab
                                                            iwriter.bf,
                                                            maxDataAge,
                                                            sstableMetadata);
+        sstable.first = this.first;
+        sstable.last = this.last;
         iwriter = null;
         dbuilder = null;
         return sstable;

Added: cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/notifications/INotification.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,13 @@
+package org.apache.cassandra.notifications;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: bcoverston
+ * Date: 6/9/11
+ * Time: 10:53 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public interface INotification
+{
+
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/notifications/INotificationConsumer.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.notifications;
+
+public interface INotificationConsumer
+{
+    void handleNotification(INotification notification, Object sender);
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,14 @@
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+
+import java.util.List;
+
+public class SSTableAddedNotification implements INotification
+{
+    public SSTableReader added;
+    public SSTableAddedNotification(SSTableReader added)
+    {
+        this.added = added;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,16 @@
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+
+import java.util.List;
+
+public class SSTableListChangedNotification implements INotification
+{
+    public Iterable<SSTableReader> removed;
+    public Iterable<SSTableReader> added;
+    public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+    {
+        this.removed = removed;
+        this.added = added;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/Interval.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,62 @@
+package org.apache.cassandra.utils.IntervalTree;
+
+import com.google.common.collect.Ordering;
+
+public class Interval<T>
+{
+    public Comparable min;
+    public Comparable max;
+    public final T Data;
+
+
+    public Interval(Comparable min, Comparable max)
+    {
+        this.min = min;
+        this.max = max;
+        this.Data = null;
+    }
+
+    public Interval(Comparable min, Comparable max, T data)
+    {
+        this.min = min;
+        this.max = max;
+        this.Data = data;
+    }
+
+    public boolean encloses(Interval interval)
+    {
+        return (this.min.compareTo(interval.min) <= 0
+                && this.max.compareTo(interval.max) >= 0);
+    }
+
+    public boolean contains(Comparable point)
+    {
+        return (this.min.compareTo(point) <= 0
+                && this.max.compareTo(point) >= 0);
+    }
+
+    public boolean intersects(Interval interval)
+    {
+        return this.contains(interval.min) || this.contains(interval.max);
+    }
+
+
+    public static Ordering<Interval> minOrdering = new Ordering<Interval>()
+    {
+        @Override
+        public int compare(Interval interval, Interval interval1)
+        {
+            return interval.min.compareTo(interval1.min);
+        }
+    };
+
+    public static Ordering<Interval> maxOrdering = new Ordering<Interval>()
+    {
+        @Override
+        public int compare(Interval interval, Interval interval1)
+        {
+            return interval.max.compareTo(interval1.max);
+        }
+    };
+
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalNode.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,85 @@
+package org.apache.cassandra.utils.IntervalTree;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class IntervalNode
+{
+    Interval interval;
+    Comparable v_pt;
+    List<Interval> v_left;
+    List<Interval> v_right;
+    IntervalNode left = null;
+    IntervalNode right = null;
+
+    public IntervalNode(List<Interval> toBisect)
+    {
+        if (toBisect.size() > 0)
+        {
+            v_pt = findMedianEndpoint(toBisect);
+            v_left = interval.minOrdering.sortedCopy(getIntersectingIntervals(toBisect));
+            v_right = interval.maxOrdering.reverse().sortedCopy(getIntersectingIntervals(toBisect));
+            //if i.min < v_pt then it goes to the left subtree
+            List<Interval> leftSegment = getLeftIntervals(toBisect);
+            List<Interval> rightSegment = getRightIntervals(toBisect);
+            if (leftSegment.size() > 0)
+                this.left = new IntervalNode(leftSegment);
+            if (rightSegment.size() > 0)
+                this.right = new IntervalNode(rightSegment);
+        }
+    }
+
+    public List<Interval> getLeftIntervals(List<Interval> candidates)
+    {
+        List<Interval> retval = new ArrayList<Interval>();
+        for (Interval candidate : candidates)
+        {
+            if (candidate.max.compareTo(v_pt) < 0)
+                retval.add(candidate);
+        }
+        return retval;
+    }
+
+    public List<Interval> getRightIntervals(List<Interval> candidates)
+    {
+        List<Interval> retval = new ArrayList<Interval>();
+        for (Interval candidate : candidates)
+        {
+            if (candidate.min.compareTo(v_pt) > 0)
+                retval.add(candidate);
+        }
+        return retval;
+    }
+
+    public List<Interval> getIntersectingIntervals(List<Interval> candidates)
+    {
+        List<Interval> retval = new ArrayList<Interval>();
+        for (Interval candidate : candidates)
+        {
+            if (candidate.min.compareTo(v_pt) <= 0
+                && candidate.max.compareTo(v_pt) >= 0)
+                retval.add(candidate);
+        }
+        return retval;
+    }
+
+    public Comparable findMedianEndpoint(List<Interval> intervals)
+    {
+
+        ConcurrentSkipListSet<Comparable> sortedSet = new ConcurrentSkipListSet<Comparable>();
+
+        for (Interval interval : intervals)
+        {
+            sortedSet.add(interval.min);
+            sortedSet.add(interval.max);
+        }
+        int medianIndex = sortedSet.size() / 2;
+        if (sortedSet.size() > 0)
+        {
+            return (Comparable) sortedSet.toArray()[medianIndex];
+        }
+        return null;
+    }
+
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/IntervalTree/IntervalTree.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,95 @@
+package org.apache.cassandra.utils.IntervalTree;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class IntervalTree<T>
+{
+    private final IntervalNode head;
+
+    public Comparable max = null;
+    public Comparable min = null;
+
+    public IntervalTree()
+    {
+        head = null;
+    }
+
+    public IntervalTree(List<Interval> intervals)
+    {
+        if (intervals.size() > 0)
+        {
+            min = intervals.get(0).min;
+            max = intervals.get(intervals.size() - 1).max;
+        }
+        head = new IntervalNode(intervals);
+    }
+
+    public List<T> search(Interval searchInterval)
+    {
+        List<T> retlist = new LinkedList<T>();
+        searchInternal(head, searchInterval, retlist);
+        return retlist;
+    }
+
+    protected void searchInternal(IntervalNode node, Interval<T> searchInterval, List<T> retList)
+    {
+        if (null == head)
+            return;
+        if (null == node || node.v_pt == null)
+            return;
+        if (null == node)
+            return;
+        //if searchInterval.contains(node.v_pt)
+        //then add every interval contained in this node to the result set then search left and right for further
+        //overlapping intervals
+        if (searchInterval.contains(node.v_pt))
+        {
+            for (Interval<T> interval : node.v_left)
+            {
+                retList.add(interval.Data);
+            }
+
+            searchInternal(node.left, searchInterval, retList);
+            searchInternal(node.right, searchInterval, retList);
+            return;
+        }
+
+        //if v.pt < searchInterval.left
+        //add intervals in v with v[i].right >= searchitnerval.left
+        //L contains no overlaps
+        //R May
+        if (node.v_pt.compareTo(searchInterval.min) < 0)
+        {
+            for (Interval<T> interval : node.v_right)
+            {
+                if (interval.max.compareTo(searchInterval.min) >= 0)
+                {
+                    retList.add(interval.Data);
+                }
+                else break;
+            }
+            searchInternal(node.right, searchInterval, retList);
+            return;
+        }
+
+        //if v.pt > searchInterval.right
+        //add intervals in v with [i].left <= searchitnerval.right
+        //R contains no overlaps
+        //L May
+        if (node.v_pt.compareTo(searchInterval.max) > 0)
+        {
+            for (Interval<T> interval : node.v_left)
+            {
+                if (interval.min.compareTo(searchInterval.max) <= 0)
+                {
+                    retList.add(interval.Data);
+                }
+                else break;
+            }
+            searchInternal(node.left, searchInterval, retList);
+            return;
+        }
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTest.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,48 @@
+package org.apache.cassandra.utils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.cassandra.utils.IntervalTree.Interval;
+import org.junit.Test;
+
+public class IntervalTest extends TestCase
+{
+    @Test
+    public void testEncloses() throws Exception
+    {
+        Interval interval = new Interval(0,5,null);
+        Interval interval1 = new Interval(0, 10, null);
+        Interval interval2 = new Interval(5,10,null);
+        Interval interval3 = new Interval(0, 11, null);
+
+
+        assertTrue(interval.encloses(interval));
+        assertTrue(interval1.encloses(interval));
+        assertFalse(interval.encloses(interval2));
+        assertTrue(interval1.encloses(interval2));
+        assertFalse(interval1.encloses(interval3));
+    }
+    @Test
+    public void testContains() throws Exception
+    {
+        Interval interval = new Interval(0, 5, null);
+        assertTrue(interval.contains(0));
+        assertTrue(interval.contains(5));
+        assertFalse(interval.contains(-1));
+        assertFalse(interval.contains(6));
+    }
+    @Test
+    public void testIntersects() throws Exception
+    {
+        Interval interval = new Interval(0,5,null);
+        Interval interval1 = new Interval(0, 10, null);
+        Interval interval2 = new Interval(5,10,null);
+        Interval interval3 = new Interval(0, 11, null);
+        Interval interval5 = new Interval(6,12,null);
+
+        assertTrue(interval.intersects(interval1));
+        assertTrue(interval.intersects(interval2));
+        assertTrue(interval.intersects(interval3));
+        assertFalse(interval.intersects(interval5));
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java?rev=1162223&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/IntervalTreeTest.java Fri Aug 26 19:55:25 2011
@@ -0,0 +1,73 @@
+package org.apache.cassandra.utils;
+
+import junit.framework.TestCase;
+import org.apache.cassandra.utils.IntervalTree.*;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class IntervalTreeTest extends TestCase
+{
+    @Test
+    public void testSearch() throws Exception
+    {
+        List<Interval> intervals = new ArrayList<Interval>();
+
+        intervals.add(new Interval(-300, -200));
+        intervals.add(new Interval(-3, -2));
+        intervals.add(new Interval(1,2));
+        intervals.add(new Interval(3,6));
+        intervals.add(new Interval(2,4));
+        intervals.add(new Interval(5,7));
+        intervals.add(new Interval(1,3));
+        intervals.add(new Interval(4,6));
+        intervals.add(new Interval(8,9));
+        intervals.add(new Interval(15,20));
+        intervals.add(new Interval(40,50));
+        intervals.add(new Interval(49,60));
+
+
+        IntervalTree it = new IntervalTree(intervals);
+
+        assertEquals(3,it.search(new Interval(4,4)).size());
+
+        assertEquals(4, it.search(new Interval(4, 5)).size());
+
+        assertEquals(7, it.search(new Interval(-1,10)).size());
+
+        assertEquals(0, it.search(new Interval(-1,-1)).size());
+
+        assertEquals(5, it.search(new Interval(1,4)).size());
+
+        assertEquals(2, it.search(new Interval(0,1)).size());
+
+        assertEquals(0, it.search(new Interval(10,12)).size());
+
+        List<Interval> intervals2 = new ArrayList<Interval>();
+
+        //stravinsky 1880-1971
+        intervals2.add(new Interval(1880, 1971));
+        //Schoenberg
+        intervals2.add(new Interval(1874, 1951));
+        //Grieg
+        intervals2.add(new Interval(1843, 1907));
+        //Schubert
+        intervals2.add(new Interval(1779, 1828));
+        //Mozart
+        intervals2.add(new Interval(1756, 1828));
+        //Schuetz
+        intervals2.add(new Interval(1585, 1672));
+
+        IntervalTree it2 = new IntervalTree(intervals2);
+
+        assertEquals(0, it2.search(new Interval(1829, 1842)).size());
+
+        List<Interval> intersection1 = it2.search(new Interval(1907, 1907));
+        assertEquals(3, intersection1.size());
+
+        intersection1 = it2.search(new Interval(1780, 1790));
+        assertEquals(2, intersection1.size());
+
+    }
+}



Mime
View raw message