cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject git commit: Track max/min column names in sstables to be able to optimize slice queries.
Date Fri, 24 May 2013 15:42:54 GMT
Updated Branches:
  refs/heads/trunk 778b6a0eb -> 0ad499e9a


Track max/min column names in sstables to be able to optimize slice queries.

Patch by marcuse and jbellis, reviewed by jbellis for CASSANDRA-5514


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ad499e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ad499e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ad499e9

Branch: refs/heads/trunk
Commit: 0ad499e9a82afa23f17672e25678d065b421e0d8
Parents: 778b6a0
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu May 23 15:08:58 2013 -0500
Committer: Marcus Eriksson <marcuse@spotify.com>
Committed: Fri May 24 17:36:27 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../apache/cassandra/db/CollationController.java   |   40 +++-
 src/java/org/apache/cassandra/db/ColumnFamily.java |   10 +-
 src/java/org/apache/cassandra/db/Memtable.java     |    2 +-
 .../cassandra/db/compaction/CompactionManager.java |    2 +-
 .../cassandra/db/compaction/CompactionTask.java    |    2 +-
 .../db/compaction/LazilyCompactedRow.java          |   13 +-
 .../cassandra/db/filter/IDiskAtomFilter.java       |    2 +
 .../cassandra/db/filter/NamesQueryFilter.java      |    6 +
 .../apache/cassandra/db/filter/QueryFilter.java    |    5 +
 .../cassandra/db/filter/SliceQueryFilter.java      |   17 ++
 .../db/marshal/AbstractCompositeType.java          |    4 +-
 .../apache/cassandra/db/marshal/AbstractType.java  |   26 ++
 .../apache/cassandra/db/marshal/CompositeType.java |   34 +++
 .../io/sstable/AbstractSSTableSimpleWriter.java    |    2 +-
 .../cassandra/io/sstable/ColumnNameHelper.java     |  217 +++++++++++++++
 .../apache/cassandra/io/sstable/ColumnStats.java   |   11 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |    3 +
 .../cassandra/io/sstable/SSTableMetadata.java      |   91 ++++++-
 .../apache/cassandra/io/sstable/SSTableWriter.java |    9 +-
 test/unit/org/apache/cassandra/SchemaLoader.java   |    6 +
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |    4 +-
 test/unit/org/apache/cassandra/db/TableTest.java   |  102 +++++++
 .../cassandra/db/marshal/CompositeTypeTest.java    |    3 +
 .../compress/CompressedRandomAccessReaderTest.java |    5 +-
 .../io/sstable/SSTableMetadataSerializerTest.java  |    3 +-
 .../cassandra/io/sstable/SSTableMetadataTest.java  |   99 +++++++
 .../compress/CompressedInputStreamTest.java        |    3 +-
 28 files changed, 696 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b6e38f..2118778 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -50,7 +50,8 @@
  * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
  * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
  * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
-
+ * Track max/min column names in sstables to be able to optimize slice
+   queries (CASSANDRA-5514)
 
 1.2.6
  * Fix dealing with ridiculously large max sstable sizes in LCS (CASSANDRA-5589)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 4bba554..a634049 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -244,15 +244,30 @@ public class CollationController
              * in one pass, and minimize the number of sstables for which we read a rowTombstone.
              */
             Collections.sort(view.sstables, SSTable.maxTimestampComparator);
-
+            List<SSTableReader> skippedSSTables = null;
             long mostRecentRowTombstone = Long.MIN_VALUE;
+            long minTimestamp = Long.MAX_VALUE;
+
             for (SSTableReader sstable : view.sstables)
             {
+                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
                 // if we've already seen a row tombstone with a timestamp greater
                 // than the most recent update to this sstable, we can skip it
                 if (sstable.getMaxTimestamp() < mostRecentRowTombstone)
                     break;
 
+                if (!filter.shouldInclude(sstable))
+                {
+                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+                    {
+                        if (skippedSSTables == null)
+                            skippedSSTables = new ArrayList<SSTableReader>();
+                        skippedSSTables.add(sstable);
+                    }
+                    continue;
+                }
+
                 OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
                 iterators.add(iter);
                 if (iter.getColumnFamily() != null)
@@ -266,6 +281,29 @@ public class CollationController
                 }
             }
 
+            // Check for row tombstone in the skipped sstables
+            if (skippedSSTables != null)
+            {
+                for (SSTableReader sstable : skippedSSTables)
+                {
+                    if (sstable.getMaxTimestamp() <= minTimestamp)
+                        continue;
+
+                    OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable);
+                    if (iter.getColumnFamily() == null)
+                        continue;
+
+                    ColumnFamily cf = iter.getColumnFamily();
+                    // we are only interested in row-level tombstones here, and only if markedForDeleteAt is larger than minTimestamp
+                    if (cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt > minTimestamp)
+                    {
+                        iterators.add(iter);
+                        returnCF.delete(cf.deletionInfo().getTopLevelDeletion());
+                        sstablesIterated++;
+                    }
+                }
+            }
+
             // we need to distinguish between "there is no data at all for this row" (BF will let us rebuild that efficiently)
             // and "there used to be data, but it's gone now" (we should cache the empty CF so we don't need to rebuild that slower)
             if (iterators.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 3affb95..cda2d9b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -37,6 +39,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -394,7 +397,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         long maxTimestampSeen = deletionInfo().maxTimestamp();
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
         int maxLocalDeletionTime = Integer.MIN_VALUE;
-
+        List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
+        List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
         for (Column column : this)
         {
             minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
@@ -403,8 +407,10 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
             int deletionTime = column.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)
                 tombstones.update(deletionTime);
+            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, column.name, metadata.comparator);
+            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, column.name, metadata.comparator);
         }
-        return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones);
+        return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones, minColumnNamesSeen, maxColumnNamesSeen);
     }
 
     public boolean isMarkedForDelete()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a19c4a7..ad6258a 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -531,7 +531,7 @@ public class Memtable
 
         public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(context.get());
+            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.metadata.comparator).replayPosition(context.get());
             return new SSTableWriter(filename,
                                      rows.size(),
                                      cfs.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1e44a07..90f6dfa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -600,7 +600,7 @@ public class CompactionManager implements CompactionManagerMBean
                                  expectedBloomFilterSize,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
+                                 SSTableMetadata.createCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d6fd171..f07a1e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -280,7 +280,7 @@ public class CompactionTask extends AbstractCompactionTask
                                  keysPerSSTable,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 SSTableMetadata.createCollector(toCompact, getLevel()));
+                                 SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
     }
 
     protected int getLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index a94db88..5635652 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -30,10 +32,12 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.StreamingHistogram;
 
@@ -106,7 +110,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
                                       reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
-                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones
+                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
+                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
+                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
         );
         reducer = null;
 
@@ -190,6 +196,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         long maxTimestampSeen = Long.MIN_VALUE;
         int maxLocalDeletionTimeSeen = Integer.MIN_VALUE;
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
+        List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
+        List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
 
         public void reduce(OnDiskAtom current)
         {
@@ -247,6 +255,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());
                 maxTimestampSeen = Math.max(maxTimestampSeen, reduced.maxTimestamp());
                 maxLocalDeletionTimeSeen = Math.max(maxLocalDeletionTimeSeen, reduced.getLocalDeletionTime());
+                minColumnNameSeen = ColumnNameHelper.minComponents(minColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator);
+                maxColumnNameSeen = ColumnNameHelper.maxComponents(maxColumnNameSeen, reduced.name(), controller.cfs.metadata.comparator);
+
                 int deletionTime = reduced.getLocalDeletionTime();
                 if (deletionTime < Integer.MAX_VALUE)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index b3cec4b..e032f1d 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -78,6 +78,8 @@ public interface IDiskAtomFilter
     public IDiskAtomFilter cloneShallow();
     public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
 
+    boolean shouldInclude(SSTableReader sstable);
+
     public static class Serializer implements IVersionedSerializer<IDiskAtomFilter>
     {
         public static Serializer instance = new Serializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 094d856..caddb0e 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -142,6 +142,12 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return false;
     }
 
+    public boolean shouldInclude(SSTableReader sstable)
+    {
+        // only called by collationcontroller for slice queries
+        throw new UnsupportedOperationException();
+    }
+
     public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
     {
         public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index ab4a64e..126b240 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -217,4 +217,9 @@ public class QueryFilter
     {
         return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")";
     }
+
+    public boolean shouldInclude(SSTableReader sstable)
+    {
+        return filter.shouldInclude(sstable);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index bfe2337..2a83960 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -21,10 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,12 +34,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SliceQueryFilter implements IDiskAtomFilter
 {
@@ -230,6 +234,19 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return false;
     }
 
+    public boolean shouldInclude(SSTableReader sstable)
+    {
+        List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames;
+        List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames;
+        assert minColumnNames.size() == maxColumnNames.size();
+        AbstractType<?> comparator = sstable.metadata.comparator;
+
+        if (minColumnNames.isEmpty() || maxColumnNames.isEmpty())
+            return true;
+
+        return comparator.intersects(minColumnNames, maxColumnNames, this);
+    }
+
     public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
     {
         public void serialize(SliceQueryFilter f, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 2aac62a..06a0a82 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -131,10 +131,10 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
 
     public static class CompositeComponent
     {
-        public AbstractType comparator;
+        public AbstractType<?> comparator;
         public ByteBuffer   value;
 
-        public CompositeComponent( AbstractType comparator, ByteBuffer value )
+        public CompositeComponent( AbstractType<?> comparator, ByteBuffer value )
         {
             this.comparator = comparator;
             this.value      = value;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 93e9309..a6e7b98 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -25,10 +25,14 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.cql3.CQL3Type;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.OnDiskAtom;
 import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 
 /**
@@ -276,4 +280,26 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     {
         return getClass().getName();
     }
+
+    protected boolean intersects(ByteBuffer minColName, ByteBuffer maxColName, ByteBuffer sliceStart, ByteBuffer sliceEnd)
+    {
+        return (sliceStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(maxColName, sliceStart) >= 0)
+               && (sliceEnd.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || compare(sliceEnd, minColName) >= 0);
+    }
+
+    public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
+    {
+        assert minColumnNames.size() == 1;
+
+        for (ColumnSlice slice : filter.slices)
+        {
+            ByteBuffer start = filter.isReversed() ? slice.finish : slice.start;
+            ByteBuffer finish = filter.isReversed() ? slice.start : slice.finish;
+
+            if (intersects(minColumnNames.get(0), maxColumnNames.get(0), start, finish))
+                return true;
+        }
+        return false;
+    }
 }
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 7333e0e..ccc3b20 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -27,6 +27,8 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
@@ -189,6 +191,38 @@ public class CompositeType extends AbstractCompositeType
         return true;
     }
 
+    /**
+     * Deconstructs the composite and fills out any missing components with EMPTY_BYTE_BUFFER.
+     */
+    public List<AbstractCompositeType.CompositeComponent> deconstructAndExpand(ByteBuffer composite)
+    {
+        List<AbstractCompositeType.CompositeComponent> components = deconstruct(composite);
+        for (int i = components.size(); i < types.size(); i++)
+            components.add(new AbstractCompositeType.CompositeComponent(this.types.get(i), ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        return components;
+    }
+
+    @Override
+    public boolean intersects(List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames, SliceQueryFilter filter)
+    {
+        int typeCount = types.get(types.size() - 1) instanceof ColumnToCollectionType ? types.size() - 1 : types.size();
+
+        assert minColumnNames.size() == typeCount;
+
+        for (ColumnSlice slice : filter.slices)
+        {
+            List<AbstractCompositeType.CompositeComponent> start = deconstructAndExpand(filter.isReversed() ? slice.finish : slice.start);
+            List<AbstractCompositeType.CompositeComponent> finish = deconstructAndExpand(filter.isReversed() ? slice.start : slice.finish);
+            for (int i = 0; i < typeCount; i++)
+            {
+                AbstractType<?> t = types.get(i);
+                if (!t.intersects(minColumnNames.get(i), maxColumnNames.get(i), start.get(i).value, finish.get(i).value))
+                    return false;
+            }
+        }
+        return true;
+    }
+
     private static class StaticParsedComparator implements ParsedComparator
     {
         final AbstractType<?> type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 72eeac9..23f5c85 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -56,7 +56,7 @@ public abstract class AbstractSSTableSimpleWriter
             0, // We don't care about the bloom filter
             metadata,
             DatabaseDescriptor.getPartitioner(),
-            SSTableMetadata.createCollector());
+            SSTableMetadata.createCollector(metadata.comparator));
     }
 
     // find available generation and pick up filename from that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
new file mode 100644
index 0000000..d9dc4b8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+
+public class ColumnNameHelper
+{
+    /**
+     * finds the max column name(s)
+     *
+     * if comparator is of CompositeType, candidate will be split into its components, and each
+     * component is compared to the component on the same place in maxSeen, and then returning the list
+     * with the max columns.
+     *
+     * if comparator is not CompositeType, maxSeen is assumed to be of size 1 and the item there is
+     * compared to the candidate.
+     *
+     * @param maxSeen the max columns seen so far
+     * @param candidate the candidate column(s)
+     * @param comparator the comparator to use
+     * @return a list with the max column(s)
+     */
+    public static List<ByteBuffer> maxComponents(List<ByteBuffer> maxSeen, ByteBuffer candidate, AbstractType<?> comparator)
+    {
+        if (comparator instanceof AbstractCompositeType)
+        {
+            if (maxSeen.size() == 0)
+                return Arrays.asList(((AbstractCompositeType)comparator).split(candidate));
+
+            List<AbstractCompositeType.CompositeComponent> components = ((AbstractCompositeType)comparator).deconstruct(candidate);
+            List<ByteBuffer> retList = new ArrayList<ByteBuffer>(components.size());
+            for (int i = 0; i < maxSeen.size(); i++)
+            {
+                AbstractCompositeType.CompositeComponent component = components.get(i);
+                retList.add(ColumnNameHelper.max(maxSeen.get(i), component.value, component.comparator));
+            }
+            return retList;
+        }
+        else
+        {
+            if (maxSeen.size() == 0)
+                return Collections.singletonList(candidate);
+            return Collections.singletonList(ColumnNameHelper.max(maxSeen.get(0), candidate, comparator));
+
+        }
+    }
+    /**
+     * finds the min column name(s)
+     *
+     * if comparator is of CompositeType, candidate will be split into its components, and each
+     * component is compared to the component on the same place in minSeen, and then returning the list
+     * with the min columns.
+     *
+     * if comparator is not CompositeType, maxSeen is assumed to be of size 1 and the item there is
+     * compared to the candidate.
+     *
+     * @param minSeen the max columns seen so far
+     * @param candidate the candidate column(s)
+     * @param comparator the comparator to use
+     * @return a list with the min column(s)
+     */
+    public static List<ByteBuffer> minComponents(List<ByteBuffer> minSeen, ByteBuffer candidate, AbstractType<?> comparator)
+    {
+        if (comparator instanceof CompositeType)
+        {
+            if (minSeen.size() == 0)
+                return Arrays.asList(((CompositeType)comparator).split(candidate));
+
+            List<AbstractCompositeType.CompositeComponent> components = ((AbstractCompositeType)comparator).deconstruct(candidate);
+            List<ByteBuffer> retList = new ArrayList<ByteBuffer>(components.size());
+            for (int i = 0; i < minSeen.size(); i++)
+            {
+                AbstractCompositeType.CompositeComponent component = components.get(i);
+                retList.add(ColumnNameHelper.min(minSeen.get(i), component.value, component.comparator));
+            }
+            return retList;
+        }
+        else
+        {
+            if (minSeen.size() == 0)
+                return Collections.singletonList(candidate);
+            return Collections.singletonList(ColumnNameHelper.min(minSeen.get(0), candidate, comparator));
+
+        }
+    }
+
+    /**
+     * return the min column
+     *
+     * note that comparator should not be of CompositeType!
+     *
+     * @param b1 lhs
+     * @param b2 rhs
+     * @param comparator the comparator to use
+     * @return the smallest column according to comparator
+     */
+    private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
+    {
+        if (comparator.compare(b1, b2) >= 0)
+            return b2;
+        return b1;
+    }
+
+    /**
+     * return the max column
+     *
+     * note that comparator should not be of CompositeType!
+     *
+     * @param b1 lhs
+     * @param b2 rhs
+     * @param comparator the comparator to use
+     * @return the biggest column according to comparator
+     */
+    private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
+    {
+        if (comparator.compare(b1, b2) >= 0)
+            return b1;
+        return b2;
+    }
+
+    /**
+     * if columnNameComparator is CompositeType the columns are compared by components using the subcomparator
+     * on the same position.
+     *
+     * if comparator is not CompositeType, the lists are assumed to be of max size 1 and compared using the comparator
+     * directly.
+     *
+     * @param minColumnNames lhs
+     * @param candidates rhs
+     * @param columnNameComparator comparator to use
+     * @return a list with smallest column names according to (sub)comparator
+     */
+    public static List<ByteBuffer> mergeMin(List<ByteBuffer> minColumnNames, List<ByteBuffer> candidates, AbstractType<?> columnNameComparator)
+    {
+        if (minColumnNames.size() == 0)
+            return candidates;
+
+        if (candidates.size() == 0)
+            return minColumnNames;
+
+        if (columnNameComparator instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)columnNameComparator;
+            List<ByteBuffer> retList = new ArrayList<ByteBuffer>(ct.types.size());
+            for (int i = 0; i < minColumnNames.size(); i++)
+            {
+                retList.add(min(minColumnNames.get(i), candidates.get(i), ct.types.get(i)));
+            }
+            return retList;
+        }
+        else
+        {
+            return Collections.singletonList(min(minColumnNames.get(0), candidates.get(0), columnNameComparator));
+        }
+    }
+
+    /**
+     * if columnNameComparator is CompositeType the columns are compared by components using the subcomparator
+     * on the same position.
+     *
+     * if comparator is not CompositeType, the lists are assumed to be of max size 1 and compared using the comparator
+     * directly.
+     *
+     * @param maxColumnNames lhs
+     * @param candidates rhs
+     * @param columnNameComparator comparator to use
+     * @return a list with biggest column names according to (sub)comparator
+     */
+    public static List<ByteBuffer> mergeMax(List<ByteBuffer> maxColumnNames, List<ByteBuffer> candidates, AbstractType<?> columnNameComparator)
+    {
+        if (maxColumnNames.size() == 0)
+            return candidates;
+
+        if (candidates.size() == 0)
+            return maxColumnNames;
+
+        if (columnNameComparator instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)columnNameComparator;
+            List<ByteBuffer> retList = new ArrayList<ByteBuffer>(ct.types.size());
+            for (int i = 0; i < maxColumnNames.size(); i++)
+            {
+                retList.add(max(maxColumnNames.get(i), candidates.get(i), ct.types.get(i)));
+            }
+            return retList;
+        }
+        else
+        {
+            return Collections.singletonList(max(maxColumnNames.get(0), candidates.get(0), columnNameComparator));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
index 3107e2e..bd3bd1c 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.cassandra.utils.StreamingHistogram;
 
 /**
@@ -34,12 +37,18 @@ public class ColumnStats
     /** histogram of tombstone drop time */
     public final StreamingHistogram tombstoneHistogram;
 
-    public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime, StreamingHistogram tombstoneHistogram)
+    /** max and min column names according to comparator */
+    public final List<ByteBuffer> minColumnNames;
+    public final List<ByteBuffer> maxColumnNames;
+
+    public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime, StreamingHistogram tombstoneHistogram, List<ByteBuffer> minColumnNames, List<ByteBuffer> maxColumnNames)
     {
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
         this.maxLocalDeletionTime = maxLocalDeletionTime;
         this.columnCount = columnCount;
         this.tombstoneHistogram = tombstoneHistogram;
+        this.minColumnNames = minColumnNames;
+        this.maxColumnNames = maxColumnNames;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fdc99b9..ac1e55f 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -54,6 +54,7 @@ public class Descriptor
         //             tracks max local deletiontime in sstable metadata
         //             records bloom_filter_fp_chance in metadata component
         //             remove data size and column count from data file (CASSANDRA-4180)
+        //             tracks max/min column values (according to comparator)
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -65,6 +66,7 @@ public class Descriptor
         public final boolean hasBloomFilterFPChance;
         public final boolean offHeapSummaries;
         public final boolean hasRowSizeAndColumnCount;
+        public final boolean tracksMaxMinColumnNames;
 
         public Version(String version)
         {
@@ -75,6 +77,7 @@ public class Descriptor
             hasBloomFilterFPChance = version.compareTo("ja") >= 0;
             offHeapSummaries = version.compareTo("ja") >= 0;
             hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
+            tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
index ba6bfb5..ff068e8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.StreamingHistogram;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +66,8 @@ public class SSTableMetadata
     public final Set<Integer> ancestors;
     public final StreamingHistogram estimatedTombstoneDropTime;
     public final int sstableLevel;
+    public final List<ByteBuffer> maxColumnNames;
+    public final List<ByteBuffer> minColumnNames;
 
     private SSTableMetadata()
     {
@@ -77,7 +82,9 @@ public class SSTableMetadata
              null,
              Collections.<Integer>emptySet(),
              defaultTombstoneDropTimeHistogram(),
-             0);
+             0,
+             Collections.<ByteBuffer>emptyList(),
+             Collections.<ByteBuffer>emptyList());
     }
 
     private SSTableMetadata(EstimatedHistogram rowSizes,
@@ -91,7 +98,9 @@ public class SSTableMetadata
                             String partitioner,
                             Set<Integer> ancestors,
                             StreamingHistogram estimatedTombstoneDropTime,
-                            int sstableLevel)
+                            int sstableLevel,
+                            List<ByteBuffer> minColumnNames,
+                            List<ByteBuffer> maxColumnNames)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
@@ -105,16 +114,18 @@ public class SSTableMetadata
         this.ancestors = ancestors;
         this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
         this.sstableLevel = sstableLevel;
+        this.minColumnNames = minColumnNames;
+        this.maxColumnNames = maxColumnNames;
     }
 
-    public static Collector createCollector()
+    public static Collector createCollector(AbstractType<?> columnNameComparator)
     {
-        return new Collector();
+        return new Collector(columnNameComparator);
     }
 
-    public static Collector createCollector(Collection<SSTableReader> sstables, int level)
+    public static Collector createCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
     {
-        Collector collector = new Collector();
+        Collector collector = new Collector(columnNameComparator);
 
         collector.replayPosition(ReplayPosition.getReplayPosition(sstables));
         collector.sstableLevel(level);
@@ -153,7 +164,9 @@ public class SSTableMetadata
                                    metadata.partitioner,
                                    metadata.ancestors,
                                    metadata.estimatedTombstoneDropTime,
-                                   sstableLevel);
+                                   sstableLevel,
+                                   metadata.minColumnNames,
+                                   metadata.maxColumnNames);
 
     }
 
@@ -211,7 +224,14 @@ public class SSTableMetadata
         protected Set<Integer> ancestors = new HashSet<Integer>();
         protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
         protected int sstableLevel;
+        protected List<ByteBuffer> minColumnNames = Collections.emptyList();
+        protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+        private final AbstractType<?> columnNameComparator;
 
+        private Collector(AbstractType<?> columnNameComparator)
+        {
+            this.columnNameComparator = columnNameComparator;
+        }
         public void addRowSize(long rowSize)
         {
             estimatedRowSize.add(rowSize);
@@ -264,7 +284,9 @@ public class SSTableMetadata
                                        partitioner,
                                        ancestors,
                                        estimatedTombstoneDropTime,
-                                       sstableLevel);
+                                       sstableLevel,
+                                       minColumnNames,
+                                       maxColumnNames);
         }
 
         public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -306,6 +328,8 @@ public class SSTableMetadata
             addRowSize(size);
             addColumnCount(stats.columnCount);
             mergeTombstoneHistogram(stats.tombstoneHistogram);
+            updateMinColumnNames(stats.minColumnNames);
+            updateMaxColumnNames(stats.maxColumnNames);
         }
 
         public Collector sstableLevel(int sstableLevel)
@@ -314,6 +338,19 @@ public class SSTableMetadata
             return this;
         }
 
+        public Collector updateMinColumnNames(List<ByteBuffer> minColumnNames)
+        {
+            if (minColumnNames.size() > 0)
+                this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
+            return this;
+        }
+
+        public Collector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
+        {
+            if (maxColumnNames.size() > 0)
+                this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
+            return this;
+        }
     }
 
     public static class SSTableMetadataSerializer
@@ -338,8 +375,18 @@ public class SSTableMetadata
                 out.writeInt(g);
             StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
             out.writeInt(sstableStats.sstableLevel);
+            serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
         }
 
+        private void serializeMinMaxColumnNames(List<ByteBuffer> minColNames, List<ByteBuffer> maxColNames, DataOutput out) throws IOException
+        {
+            out.writeInt(minColNames.size());
+            for (ByteBuffer columnName : minColNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+            out.writeInt(maxColNames.size());
+            for (ByteBuffer columnName : maxColNames)
+                ByteBufferUtil.writeWithShortLength(columnName, out);
+        }
         /**
          * Used to serialize to an old version - needed to be able to update sstable level without a full compaction.
          *
@@ -370,6 +417,8 @@ public class SSTableMetadata
                 out.writeInt(g);
             StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
             out.writeInt(sstableStats.sstableLevel);
+            if (legacyDesc.version.tracksMaxMinColumnNames)
+                serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
         }
 
         public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
@@ -423,6 +472,28 @@ public class SSTableMetadata
             if (loadSSTableLevel && in.available() > 0)
                 sstableLevel = in.readInt();
 
+            List<ByteBuffer> minColumnNames;
+            List<ByteBuffer> maxColumnNames;
+            if (desc.version.tracksMaxMinColumnNames)
+            {
+                int colCount = in.readInt();
+                minColumnNames = new ArrayList<ByteBuffer>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+                colCount = in.readInt();
+                maxColumnNames = new ArrayList<ByteBuffer>(colCount);
+                for (int i = 0; i < colCount; i++)
+                {
+                    maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+                }
+            }
+            else
+            {
+                minColumnNames = Collections.emptyList();
+                maxColumnNames = Collections.emptyList();
+            }
             return new SSTableMetadata(rowSizes,
                                        columnCounts,
                                        replayPosition,
@@ -434,7 +505,9 @@ public class SSTableMetadata
                                        partitioner,
                                        ancestors,
                                        tombstoneHistogram,
-                                       sstableLevel);
+                                       sstableLevel,
+                                       minColumnNames,
+                                       maxColumnNames);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 43ea5a8..879c9bc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Sets;
@@ -56,7 +57,7 @@ public class SSTableWriter extends SSTable
              keyCount,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
-             SSTableMetadata.createCollector());
+             SSTableMetadata.createCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -214,6 +215,8 @@ public class SSTableWriter extends SSTable
         long minTimestamp = Long.MAX_VALUE;
         long maxTimestamp = Long.MIN_VALUE;
         int maxLocalDeletionTime = Integer.MIN_VALUE;
+        List<ByteBuffer> minColumnNames = Collections.emptyList();
+        List<ByteBuffer> maxColumnNames = Collections.emptyList();
         StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
         cf.delete(deletionInfo);
@@ -239,6 +242,8 @@ public class SSTableWriter extends SSTable
                 }
                 minTimestamp = Math.min(minTimestamp, atom.minTimestamp());
                 maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp());
+                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
+                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
                 maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime());
 
                 columnIndexer.add(atom); // This write the atom on disk too
@@ -258,6 +263,8 @@ public class SSTableWriter extends SSTable
         sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
         sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount());
         sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
+        sstableMetadataCollector.updateMinColumnNames(minColumnNames);
+        sstableMetadataCollector.updateMaxColumnNames(maxColumnNames);
         afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, deletionInfo.getTopLevelDeletion(), columnIndexer.build()));
         return currentPosition;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 015ece0..1ba45e6 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -121,6 +121,7 @@ public class SchemaLoader
         AbstractType bytes = BytesType.instance;
 
         AbstractType<?> composite = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{BytesType.instance, TimeUUIDType.instance, IntegerType.instance}));
+        AbstractType<?> compositeMaxMin = CompositeType.getInstance(Arrays.asList(new AbstractType<?>[]{BytesType.instance, IntegerType.instance}));
         Map<Byte, AbstractType<?>> aliases = new HashMap<Byte, AbstractType<?>>();
         aliases.put((byte)'b', BytesType.instance);
         aliases.put((byte)'t', TimeUUIDType.instance);
@@ -193,6 +194,11 @@ public class SchemaLoader
                                                           composite,
                                                           null),
                                            new CFMetaData(ks1,
+                                                          "StandardComposite2",
+                                                          st,
+                                                          compositeMaxMin,
+                                                          null),
+                                           new CFMetaData(ks1,
                                                           "StandardDynamicComposite",
                                                           st,
                                                           dynamicComposite,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b029e29..a0f64de 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -1203,7 +1203,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         String ks = "Keyspace1";
         String cf = "Standard3"; // should be empty
 
-        CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
         Directories dir = Directories.create(ks, cf);
         ByteBuffer key = bytes("key");
 
@@ -1226,7 +1226,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         {
             protected SSTableWriter getWriter()
             {
-                SSTableMetadata.Collector collector = SSTableMetadata.createCollector();
+                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
                 collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
                 return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
                                          0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java
index 60274a5..8db1769 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -35,12 +35,18 @@ import org.junit.Test;
 
 import static junit.framework.Assert.*;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.Relation;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.Util.expiringColumn;
 import static org.apache.cassandra.Util.getBytes;
+import static org.junit.Assert.assertEquals;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -429,6 +435,102 @@ public class TableTest extends SchemaLoader
         validateSliceLarge(cfStore);
     }
 
+    @Test
+    public void testLimitSSTables() throws CharacterCodingException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1");
+        cfStore.disableAutoCompaction();
+        DecoratedKey key = Util.dk("row_maxmin");
+        for (int j = 0; j < 10; j++)
+        {
+            ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
+            for (int i = 1000 + (j*100); i < 1000 + ((j+1)*100); i++)
+            {
+                cf.addColumn(column("col" + i, ("v" + i), i));
+            }
+            RowMutation rm = new RowMutation("Keyspace1", key.key, cf);
+            rm.apply();
+            cfStore.forceBlockingFlush();
+        }
+        cfStore.metric.sstablesPerReadHistogram.clear();
+        ColumnFamily cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes(""), ByteBufferUtil.bytes("col1499"), false, 1000);
+        assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
+        int i = 0;
+        for (Column c : cf.getSortedColumns())
+        {
+            assertEquals(ByteBufferUtil.string(c.name), "col" + (1000 + i++));
+        }
+        assertEquals(i, 500);
+        cfStore.metric.sstablesPerReadHistogram.clear();
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col1500"), ByteBufferUtil.bytes("col2000"), false, 1000);
+        assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
+
+        for (Column c : cf.getSortedColumns())
+        {
+            assertEquals(ByteBufferUtil.string(c.name), "col"+(1000 + i++));
+        }
+        assertEquals(i, 1000);
+
+        // reverse
+        cfStore.metric.sstablesPerReadHistogram.clear();
+        cf = cfStore.getColumnFamily(key, ByteBufferUtil.bytes("col2000"), ByteBufferUtil.bytes("col1500"), true, 1000);
+        assertEquals(cfStore.metric.sstablesPerReadHistogram.max(), 5, 0.1);
+        i = 500;
+        for (Column c : cf.getSortedColumns())
+        {
+            assertEquals(ByteBufferUtil.string(c.name), "col"+(1000 + i++));
+        }
+        assertEquals(i, 1000);
+
+    }
+
+    @Test
+    public void testLimitSSTablesComposites() throws CharacterCodingException, ExecutionException, InterruptedException
+    {
+        /*
+        creates 10 sstables, composite columns like this:
+        ---------------------
+        k   |a0:0|a1:1|..|a9:9
+        ---------------------
+        ---------------------
+        k   |a0:10|a1:11|..|a9:19
+        ---------------------
+        ...
+        ---------------------
+        k   |a0:90|a1:91|..|a9:99
+        ---------------------
+        then we slice out col1 = a5 and col2 > 85 -> which should let us just check 2 sstables and get 2 columns
+         */
+        Table table = Table.open("Keyspace1");
+
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("StandardComposite2");
+        cfs.disableAutoCompaction();
+
+        CompositeType ct = CompositeType.getInstance(BytesType.instance, IntegerType.instance);
+        DecoratedKey key = Util.dk("k");
+        for (int j = 0; j < 10; j++)
+        {
+            for (int i = 0; i < 10; i++)
+            {
+                RowMutation rm = new RowMutation("Keyspace1", key.key);
+                ByteBuffer colName = ct.builder().add(ByteBufferUtil.bytes("a" + i)).add(ByteBufferUtil.bytes(j*10 + i)).build();
+                rm.add("StandardComposite2", colName, ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+                rm.apply();
+            }
+            cfs.forceBlockingFlush();
+        }
+        ByteBuffer start = ct.builder().add(ByteBufferUtil.bytes("a5")).add(ByteBufferUtil.bytes(85)).build();
+        ByteBuffer finish = ct.builder().add(ByteBufferUtil.bytes("a5")).buildAsEndOfRange();
+        cfs.metric.sstablesPerReadHistogram.clear();
+        ColumnFamily cf = cfs.getColumnFamily(key, start, finish, false, 1000);
+        int colCount = 0;
+        for (Column c : cf)
+            colCount++;
+        assertEquals(2, colCount);
+        assertEquals(2, cfs.metric.sstablesPerReadHistogram.max(), 0.1);
+    }
+
     private void validateSliceLarge(ColumnFamilyStore cfStore) throws IOException
     {
         DecoratedKey key = Util.dk("row3");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
index 50009a0..3614980 100644
--- a/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -34,6 +36,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.*;
 
 public class CompositeTypeTest extends SchemaLoader

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 437b778..8321ae6 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
 
 import org.junit.Test;
 
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.util.*;
@@ -54,7 +55,7 @@ public class CompressedRandomAccessReaderTest
 
         try
         {
-            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
+            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
             SequentialWriter writer = compressed
                 ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
                 : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
@@ -105,7 +106,7 @@ public class CompressedRandomAccessReaderTest
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
+        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(BytesType.instance).replayPosition(null);
         SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
index fd9e9c5..889f468 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import org.junit.Test;
 
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
@@ -46,7 +47,7 @@ public class SSTableMetadataSerializerTest
         long minTimestamp = 2162517136L;
         long maxTimestamp = 4162517136L;
 
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector()
+        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance)
                                                              .estimatedRowSize(rowSizes)
                                                              .estimatedColumnCount(columnCounts)
                                                              .replayPosition(rp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
index 6ab995f..761422d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -1,5 +1,7 @@
 package org.apache.cassandra.io.sstable;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -11,6 +13,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -140,4 +145,98 @@ public class SSTableMetadataTest extends SchemaLoader
             assertEquals(ttltimestamp + 100, sstable.getSSTableMetadata().maxLocalDeletionTime, 10);
         }
     }
+
+    @Test
+    public void trackMaxMinColNames() throws CharacterCodingException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard3");
+        store.getCompactionStrategy();
+        for (int j = 0; j < 8; j++)
+        {
+            DecoratedKey key = Util.dk("row"+j);
+            RowMutation rm = new RowMutation("Keyspace1", key.key);
+            for (int i = 100; i<150; i++)
+            {
+                rm.add("Standard3", ByteBufferUtil.bytes(j+"col"+i),
+                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                   System.currentTimeMillis());
+            }
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        assertEquals(1, store.getSSTables().size());
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().minColumnNames.get(0)), "0col100");
+            assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)), "7col149");
+        }
+        DecoratedKey key = Util.dk("row2");
+        RowMutation rm = new RowMutation("Keyspace1", key.key);
+        for (int i = 101; i<299; i++)
+        {
+            rm.add("Standard3", ByteBufferUtil.bytes(9+"col"+i),
+               ByteBufferUtil.EMPTY_BYTE_BUFFER,
+               System.currentTimeMillis());
+        }
+        rm.apply();
+
+        store.forceBlockingFlush();
+        store.forceMajorCompaction();
+        assertEquals(1, store.getSSTables().size());
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().minColumnNames.get(0)), "0col100");
+            assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)), "9col298");
+        }
+    }
+    @Test
+    public void testMaxMinComposites() throws CharacterCodingException, ExecutionException, InterruptedException
+    {
+        /*
+        creates two sstables, columns like this:
+        ---------------------
+        k   |a0:9|a1:8|..|a9:0
+        ---------------------
+        and
+        ---------------------
+        k2  |b0:9|b1:8|..|b9:0
+        ---------------------
+        meaning max columns are b9 and 9, min is a0 and 0
+         */
+        Table table = Table.open("Keyspace1");
+
+        ColumnFamilyStore cfs = table.getColumnFamilyStore("StandardComposite2");
+
+        CompositeType ct = CompositeType.getInstance(BytesType.instance, IntegerType.instance);
+
+        ByteBuffer key = ByteBufferUtil.bytes("k");
+        for (int i = 0; i < 10; i++)
+        {
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            ByteBuffer colName = ct.builder().add(ByteBufferUtil.bytes("a"+(9-i))).add(ByteBufferUtil.bytes(i)).build();
+            rm.add("StandardComposite2", colName, ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+
+        key = ByteBufferUtil.bytes("k2");
+        for (int i = 0; i < 10; i++)
+        {
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            ByteBuffer colName = ct.builder().add(ByteBufferUtil.bytes("b"+(9-i))).add(ByteBufferUtil.bytes(i)).build();
+            rm.add("StandardComposite2", colName, ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        cfs.forceMajorCompaction();
+        assertEquals(cfs.getSSTables().size(), 1);
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            assertEquals("b9", ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)));
+            assertEquals(9, ByteBufferUtil.toInt(sstable.getSSTableMetadata().maxColumnNames.get(1)));
+            assertEquals("a0", ByteBufferUtil.string(sstable.getSSTableMetadata().minColumnNames.get(0)));
+            assertEquals(0, ByteBufferUtil.toInt(sstable.getSSTableMetadata().minColumnNames.get(1)));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ad499e9/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index d432ea5..95297b1 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -25,6 +25,7 @@ import java.util.*;
 
 import org.junit.Test;
 
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.compress.*;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -56,7 +57,7 @@ public class CompressedInputStreamTest
         // write compressed data file of longs
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
-        SSTableMetadata.Collector collector = SSTableMetadata.createCollector();
+        SSTableMetadata.Collector collector = SSTableMetadata.createCollector(BytesType.instance);
         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();


Mime
View raw message