cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/3] git commit: Promote columns index to primary index file
Date Tue, 13 Mar 2012 13:28:23 GMT
Promote columns index to primary index file

patch by slebresne; reviewed by stuhood for CASSANDRA-2319


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e17ac46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e17ac46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e17ac46

Branch: refs/heads/trunk
Commit: 4e17ac4699e59637f20cd4cfcf2258eec5c42c5a
Parents: bc577ba
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Feb 28 14:27:41 2012 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Mar 13 14:26:58 2012 +0100

----------------------------------------------------------------------
 .../cassandra/db/AbstractColumnContainer.java      |   14 +-
 .../db/AbstractThreadUnsafeSortedColumns.java      |    4 +-
 .../apache/cassandra/db/AtomicSortedColumns.java   |    6 +-
 .../cassandra/db/ColumnFamilySerializer.java       |    6 -
 src/java/org/apache/cassandra/db/ColumnIndex.java  |  147 +++++++++++
 .../org/apache/cassandra/db/ColumnIndexer.java     |  195 ---------------
 src/java/org/apache/cassandra/db/DeletionInfo.java |   43 ++++
 src/java/org/apache/cassandra/db/EchoedRow.java    |   70 -----
 .../org/apache/cassandra/db/ISortedColumns.java    |   22 --
 .../org/apache/cassandra/db/RowIndexEntry.java     |  184 ++++++++++++++
 .../db/columniterator/IndexedSliceReader.java      |   82 +++++-
 .../db/columniterator/SSTableNamesIterator.java    |  133 ++++++++---
 .../db/columniterator/SSTableSliceIterator.java    |   44 +---
 .../db/columniterator/SimpleSliceReader.java       |   33 ++-
 .../db/compaction/AbstractCompactedRow.java        |   13 +
 .../db/compaction/CompactionController.java        |   12 -
 .../cassandra/db/compaction/CompactionManager.java |    6 +-
 .../cassandra/db/compaction/CompactionTask.java    |   17 +-
 .../db/compaction/LazilyCompactedRow.java          |   25 ++-
 .../cassandra/db/compaction/PrecompactedRow.java   |   23 ++-
 .../org/apache/cassandra/db/filter/IFilter.java    |    3 +-
 .../cassandra/db/filter/NamesQueryFilter.java      |    5 +-
 .../apache/cassandra/db/filter/QueryFilter.java    |    6 +-
 .../cassandra/db/filter/SliceQueryFilter.java      |    4 +-
 .../apache/cassandra/io/sstable/Descriptor.java    |    9 +-
 .../apache/cassandra/io/sstable/IndexHelper.java   |    6 +-
 .../apache/cassandra/io/sstable/KeyIterator.java   |    3 +-
 .../org/apache/cassandra/io/sstable/SSTable.java   |   10 +-
 .../io/sstable/SSTableBoundedScanner.java          |   13 +-
 .../io/sstable/SSTableIdentityIterator.java        |    9 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   70 +++---
 .../cassandra/io/sstable/SSTableScanner.java       |  164 ++++++++++---
 .../apache/cassandra/io/sstable/SSTableWriter.java |   65 ++---
 .../apache/cassandra/io/util/FileDataInput.java    |    4 +
 .../org/apache/cassandra/io/util/FileUtils.java    |    6 +
 .../cassandra/io/util/MappedFileDataInput.java     |   23 ++-
 .../cassandra/io/util/MmappedSegmentedFile.java    |    2 +-
 .../org/apache/cassandra/service/CacheService.java |    9 +-
 .../org/apache/cassandra/utils/StatusLogger.java   |    3 +-
 test/unit/org/apache/cassandra/Util.java           |   16 +-
 .../unit/org/apache/cassandra/db/KeyCacheTest.java |    2 +-
 test/unit/org/apache/cassandra/db/TableTest.java   |   10 +-
 .../cassandra/io/LazilyCompactedRowTest.java       |   16 +-
 .../cassandra/io/sstable/SSTableReaderTest.java    |   12 +-
 .../apache/cassandra/io/sstable/SSTableTest.java   |   49 +++--
 .../apache/cassandra/io/sstable/SSTableUtils.java  |   35 +--
 46 files changed, 1001 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
index 4db9304..23ad180 100644
--- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
+++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
@@ -46,12 +46,17 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
     @Deprecated // TODO this is a hack to set initial value outside constructor
     public void delete(int localtime, long timestamp)
     {
-        columns.delete(new ISortedColumns.DeletionInfo(timestamp, localtime));
+        columns.delete(new DeletionInfo(timestamp, localtime));
     }
 
     public void delete(AbstractColumnContainer cc2)
     {
-        columns.delete(cc2.columns.getDeletionInfo());
+        delete(cc2.columns.getDeletionInfo());
+    }
+
+    public void delete(DeletionInfo delInfo)
+    {
+        columns.delete(delInfo);
     }
 
     public boolean isMarkedForDelete()
@@ -69,6 +74,11 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter
         return columns.getDeletionInfo().localDeletionTime;
     }
 
+    public DeletionInfo deletionInfo()
+    {
+        return columns.getDeletionInfo();
+    }
+
     public AbstractType<?> getComparator()
     {
         return columns.getComparator();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
index 504f42c..0839923 100644
--- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java
@@ -30,7 +30,7 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
 
     public AbstractThreadUnsafeSortedColumns()
     {
-        deletionInfo = new DeletionInfo();
+        deletionInfo = DeletionInfo.LIVE;
     }
 
     public DeletionInfo getDeletionInfo()
@@ -49,7 +49,7 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn
     {
         // Update if it's not MIN_VALUE anymore and it has expired
         if (deletionInfo.localDeletionTime <= gcBefore)
-            deletionInfo = new DeletionInfo();
+            deletionInfo = DeletionInfo.LIVE;
     }
 
     public void retainAll(ISortedColumns columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index a1838d9..c959af4 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -125,7 +125,7 @@ public class AtomicSortedColumns implements ISortedColumns
             if (current.deletionInfo.localDeletionTime > gcBefore)
                 break;
         }
-        while (!ref.compareAndSet(current, current.with(new DeletionInfo())));
+        while (!ref.compareAndSet(current, current.with(DeletionInfo.LIVE)));
     }
 
     public void retainAll(ISortedColumns columns)
@@ -293,12 +293,12 @@ public class AtomicSortedColumns implements ISortedColumns
 
         Holder(AbstractType<?> comparator)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(comparator), new DeletionInfo());
+            this(new SnapTreeMap<ByteBuffer, IColumn>(comparator), DeletionInfo.LIVE);
         }
 
         Holder(SortedMap<ByteBuffer, IColumn> columns)
         {
-            this(new SnapTreeMap<ByteBuffer, IColumn>(columns), new DeletionInfo());
+            this(new SnapTreeMap<ByteBuffer, IColumn>(columns), DeletionInfo.LIVE);
         }
 
         Holder(SnapTreeMap<ByteBuffer, IColumn> map, DeletionInfo deletionInfo)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 31c8d4b..449ddbd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -95,12 +95,6 @@ public class ColumnFamilySerializer implements ISerializer<ColumnFamily>
         dos.writeLong(columnFamily.getMarkedForDeleteAt());
     }
 
-    public void serializeWithIndexes(ColumnFamily columnFamily, ColumnIndexer.RowHeader index, DataOutput dos)
-    {
-        ColumnIndexer.serialize(index, dos);
-        serializeForSSTable(columnFamily, dos);
-    }
-
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
         return deserialize(dis, IColumnSerializer.Flag.LOCAL, TreeMapBackedSortedColumns.factory());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
new file mode 100644
index 0000000..ff3af19
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.IIterableColumns;
+import org.apache.cassandra.utils.BloomFilter;
+
+public class ColumnIndex
+{
+    public final List<IndexHelper.IndexInfo> columnsIndex;
+    public final BloomFilter bloomFilter;
+
+    private static final ColumnIndex EMPTY = new ColumnIndex(Collections.<IndexHelper.IndexInfo>emptyList(), BloomFilter.emptyFilter());
+
+    private ColumnIndex(int estimatedColumnCount)
+    {
+        this(new ArrayList<IndexHelper.IndexInfo>(), BloomFilter.getFilter(estimatedColumnCount, 4));
+    }
+
+    private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex, BloomFilter bloomFilter)
+    {
+        this.columnsIndex = columnsIndex;
+        this.bloomFilter = bloomFilter;
+    }
+
+    /**
+     * Help to create an index for a column family based on size of columns
+     */
+    public static class Builder
+    {
+        private final ColumnIndex result;
+        private final Comparator<ByteBuffer> comparator;
+        private final long indexOffset;
+        private long startPosition = -1;
+        private long endPosition = 0;
+        private IColumn firstColumn = null;
+        private IColumn lastColumn = null;
+
+        public Builder(Comparator<ByteBuffer> comparator, ByteBuffer key, int estimatedColumnCount)
+        {
+            this.comparator = comparator;
+            this.indexOffset = rowHeaderSize(key);
+            this.result = new ColumnIndex(estimatedColumnCount);
+        }
+
+        /**
+         * Returns the number of bytes between the beginning of the row and the
+         * first serialized column.
+         */
+        private static long rowHeaderSize(ByteBuffer key)
+        {
+            return DBConstants.SHORT_SIZE + key.remaining()     // Row key
+                 + DBConstants.LONG_SIZE                        // Row data size
+                 + DBConstants.INT_SIZE + DBConstants.LONG_SIZE // Deletion info
+                 + DBConstants.INT_SIZE;                        // Column count
+        }
+
+        /**
+         * Serializes the index into in-memory structure with all required components
+         * such as Bloom Filter, index block size, IndexInfo list
+         *
+         * @param columns Column family to create index for
+         *
+         * @return information about index - it's Bloom Filter, block size and IndexInfo list
+         */
+        public ColumnIndex build(IIterableColumns columns)
+        {
+            int columnCount = columns.getEstimatedColumnCount();
+
+            if (columnCount == 0)
+                return ColumnIndex.EMPTY;
+
+            for (IColumn c : columns)
+                add(c);
+
+            return build();
+        }
+
+        public void add(IColumn column)
+        {
+            result.bloomFilter.add(column.name());
+
+            if (firstColumn == null)
+            {
+                firstColumn = column;
+                startPosition = endPosition;
+            }
+
+            endPosition += column.serializedSize();
+
+            // if we hit the column index size that we have to index after, go ahead and index it.
+            if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())
+            {
+                IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), indexOffset + startPosition, endPosition - startPosition);
+                result.columnsIndex.add(cIndexInfo);
+                firstColumn = null;
+            }
+
+            lastColumn = column;
+        }
+
+        public ColumnIndex build()
+        {
+            // all columns were GC'd after all
+            if (lastColumn == null)
+                return ColumnIndex.EMPTY;
+
+            // the last column may have fallen on an index boundary already.  if not, index it explicitly.
+            if (result.columnsIndex.isEmpty() || comparator.compare(result.columnsIndex.get(result.columnsIndex.size() - 1).lastName, lastColumn.name()) != 0)
+            {
+                IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), indexOffset + startPosition, endPosition - startPosition);
+                result.columnsIndex.add(cIndexInfo);
+            }
+
+            // we should always have at least one computed index block, but we only write it out if there is more than that.
+            assert result.columnsIndex.size() > 0;
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/ColumnIndexer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndexer.java b/src/java/org/apache/cassandra/db/ColumnIndexer.java
deleted file mode 100644
index 98cdb4a..0000000
--- a/src/java/org/apache/cassandra/db/ColumnIndexer.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataOutput;
-import java.io.IOError;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.sstable.IndexHelper;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.IIterableColumns;
-import org.apache.cassandra.utils.BloomFilter;
-
-/**
- * Help to create an index for a column family based on size of columns
- */
-public class ColumnIndexer
-{
-    /**
-     * Given a column family this, function creates an in-memory structure that represents the
-     * column index for the column family, and subsequently writes it to disk.
-     *
-     * @param columns Column family to create index for
-     * @param dos data output stream
-     */
-    public static void serialize(IIterableColumns columns, DataOutput dos)
-    {
-        try
-        {
-            writeIndex(serialize(columns), dos);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    public static void serialize(RowHeader indexInfo, DataOutput dos)
-    {
-        try
-        {
-            writeIndex(indexInfo, dos);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    /**
-     * Serializes the index into in-memory structure with all required components
-     * such as Bloom Filter, index block size, IndexInfo list
-     *
-     * @param columns Column family to create index for
-     *
-     * @return information about index - it's Bloom Filter, block size and IndexInfo list
-     */
-    public static RowHeader serialize(IIterableColumns columns)
-    {
-        int columnCount = columns.getEstimatedColumnCount();
-
-        BloomFilter bf = BloomFilter.getFilter(columnCount, 4);
-
-        if (columnCount == 0)
-            return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
-
-        // update bloom filter and create a list of IndexInfo objects marking the first and last column
-        // in each block of ColumnIndexSize
-        List<IndexHelper.IndexInfo> indexList = new ArrayList<IndexHelper.IndexInfo>();
-        long endPosition = 0, startPosition = -1;
-        IColumn lastColumn = null, firstColumn = null;
-
-        for (IColumn column : columns)
-        {
-            bf.add(column.name());
-
-            if (firstColumn == null)
-            {
-                firstColumn = column;
-                startPosition = endPosition;
-            }
-
-            endPosition += column.serializedSize();
-
-            // if we hit the column index size that we have to index after, go ahead and index it.
-            if (endPosition - startPosition >= DatabaseDescriptor.getColumnIndexSize())
-            {
-                IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), column.name(), startPosition, endPosition - startPosition);
-                indexList.add(cIndexInfo);
-                firstColumn = null;
-            }
-
-            lastColumn = column;
-        }
-
-        // all columns were GC'd after all
-        if (lastColumn == null)
-            return new RowHeader(bf, Collections.<IndexHelper.IndexInfo>emptyList());
-
-        // the last column may have fallen on an index boundary already.  if not, index it explicitly.
-        if (indexList.isEmpty() || columns.getComparator().compare(indexList.get(indexList.size() - 1).lastName, lastColumn.name()) != 0)
-        {
-            IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstColumn.name(), lastColumn.name(), startPosition, endPosition - startPosition);
-            indexList.add(cIndexInfo);
-        }
-
-        // we should always have at least one computed index block, but we only write it out if there is more than that.
-        assert indexList.size() > 0;
-        return new RowHeader(bf, indexList);
-    }
-
-    private static void writeIndex(RowHeader indexInfo, DataOutput dos) throws IOException
-    {
-        assert indexInfo != null;
-
-        /* Write out the bloom filter. */
-        writeBloomFilter(dos, indexInfo.bloomFilter);
-
-        dos.writeInt(indexInfo.entriesSize);
-        if (indexInfo.indexEntries.size() > 1)
-        {
-            for (IndexHelper.IndexInfo cIndexInfo : indexInfo.indexEntries)
-                cIndexInfo.serialize(dos);
-        }
-    }
-
-    /**
-     * Write a Bloom filter into file
-     *
-     * @param dos file to serialize Bloom Filter
-     * @param bf Bloom Filter
-     *
-     * @throws IOException on any I/O error.
-     */
-    private static void writeBloomFilter(DataOutput dos, BloomFilter bf) throws IOException
-    {
-        DataOutputBuffer bufOut = new DataOutputBuffer();
-        BloomFilter.serializer().serialize(bf, bufOut);
-        dos.writeInt(bufOut.getLength());
-        dos.write(bufOut.getData(), 0, bufOut.getLength());
-        bufOut.flush();
-    }
-
-    /**
-     * Holds information about serialized index and bloom filter
-     */
-    public static class RowHeader
-    {
-        public final BloomFilter bloomFilter;
-        public final List<IndexHelper.IndexInfo> indexEntries;
-        public final int entriesSize;
-
-        public RowHeader(BloomFilter bf, List<IndexHelper.IndexInfo> indexes)
-        {
-            assert bf != null;
-            assert indexes != null;
-            bloomFilter = bf;
-            indexEntries = indexes;
-            int entriesSize = 0;
-            if (indexEntries.size() > 1)
-            {
-                for (IndexHelper.IndexInfo info : indexEntries)
-                    entriesSize += info.serializedSize();
-            }
-            this.entriesSize = entriesSize;
-        }
-
-        public long serializedSize()
-        {
-            return DBConstants.INT_SIZE  // length of Bloom Filter
-                   + bloomFilter.serializedSize() // BF data
-                   + DBConstants.INT_SIZE // length of index block
-                   + entriesSize; // index block
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
new file mode 100644
index 0000000..9c1e2bc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public class DeletionInfo
+{
+    public final long markedForDeleteAt;
+    public final int localDeletionTime;
+
+    public static final DeletionInfo LIVE = new DeletionInfo(Long.MIN_VALUE, Integer.MAX_VALUE);
+
+    public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
+    {
+        // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
+        // (see CASSANDRA-3872)
+        if (localDeletionTime == Integer.MIN_VALUE)
+            localDeletionTime = Integer.MAX_VALUE;
+
+        this.markedForDeleteAt = markedForDeleteAt;
+        this.localDeletionTime = localDeletionTime;
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("{deletedAt=%d, localDeletion=%d}", markedForDeleteAt, localDeletionTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/EchoedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EchoedRow.java b/src/java/org/apache/cassandra/db/EchoedRow.java
deleted file mode 100644
index 69ec3d7..0000000
--- a/src/java/org/apache/cassandra/db/EchoedRow.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.utils.StreamingHistogram;
-
-/**
- * A CompactedRow implementation that just echos the original row bytes without deserializing.
- * Currently only used by cleanup.
- */
-public class EchoedRow extends AbstractCompactedRow
-{
-    private final SSTableIdentityIterator row;
-
-    public EchoedRow(SSTableIdentityIterator row)
-    {
-        super(row.getKey());
-        this.row = row;
-        // Reset SSTableIdentityIterator because we have not guarantee the filePointer hasn't moved since the Iterator was built
-        row.reset();
-    }
-
-    public long write(DataOutput out) throws IOException
-    {
-        assert row.dataSize > 0;
-        out.writeLong(row.dataSize);
-        row.echoData(out);
-        return row.dataSize;
-    }
-
-    public void update(MessageDigest digest)
-    {
-        // EchoedRow is not used in anti-entropy validation
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isEmpty()
-    {
-        // never okay to purge a EchoedRow -- if it were, we'd need to deserialize instead of echoing
-        return false;
-    }
-
-    public ColumnStats columnStats()
-    {
-        return new ColumnStats(row.getColumnCount(), Long.MIN_VALUE, new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/ISortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java
index 60f9d5b..2e477ce 100644
--- a/src/java/org/apache/cassandra/db/ISortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ISortedColumns.java
@@ -168,26 +168,4 @@ public interface ISortedColumns extends IIterableColumns
          */
         public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sm, boolean insertReversed);
     }
-
-    public static class DeletionInfo
-    {
-        public final long markedForDeleteAt;
-        public final int localDeletionTime;
-
-        public DeletionInfo()
-        {
-            this(Long.MIN_VALUE, Integer.MAX_VALUE);
-        }
-
-        public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
-        {
-            // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
-            // (see CASSANDRA-3872)
-            if (localDeletionTime == Integer.MIN_VALUE)
-                localDeletionTime = Integer.MAX_VALUE;
-
-            this.markedForDeleteAt = markedForDeleteAt;
-            this.localDeletionTime = localDeletionTime;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
new file mode 100644
index 0000000..46c6604
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RowIndexEntry
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final long position;
+
+    public RowIndexEntry(long position)
+    {
+        this.position = position;
+    }
+
+    public static RowIndexEntry create(long position, DeletionInfo deletionInfo, ColumnIndex index)
+    {
+        if (index != null && index.columnsIndex != null && index.columnsIndex.size() > 1)
+            return new IndexedEntry(position, deletionInfo, index.columnsIndex, index.bloomFilter);
+        else
+            return new RowIndexEntry(position);
+    }
+
+    public boolean isIndexed()
+    {
+        return !columnsIndex().isEmpty();
+    }
+
+    public DeletionInfo deletionInfo()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public List<IndexHelper.IndexInfo> columnsIndex()
+    {
+        return Collections.<IndexHelper.IndexInfo>emptyList();
+    }
+
+    public BloomFilter bloomFilter()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public static class Serializer
+    {
+        public void serialize(RowIndexEntry rie, DataOutput dos) throws IOException
+        {
+            dos.writeLong(rie.position);
+            if (rie.isIndexed())
+            {
+                dos.writeInt(((IndexedEntry)rie).serializedSize());
+                dos.writeInt(rie.deletionInfo().localDeletionTime);
+                dos.writeLong(rie.deletionInfo().markedForDeleteAt);
+                dos.writeInt(rie.columnsIndex().size());
+                for (IndexHelper.IndexInfo info : rie.columnsIndex())
+                    info.serialize(dos);
+                BloomFilter.serializer().serialize(rie.bloomFilter(), dos);
+            }
+            else
+            {
+                dos.writeInt(0);
+            }
+        }
+
+        public RowIndexEntry deserialize(DataInput dis, Descriptor descriptor) throws IOException
+        {
+            long position = dis.readLong();
+            if (descriptor.hasPromotedIndexes)
+            {
+                int size = dis.readInt();
+                if (size > 0)
+                {
+                    int ldt = dis.readInt();
+                    long mfda = dis.readLong();
+                    int entries = dis.readInt();
+                    List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<IndexHelper.IndexInfo>(entries);
+                    for (int i = 0; i < entries; i++)
+                        columnsIndex.add(IndexHelper.IndexInfo.deserialize(dis));
+                    BloomFilter bf = BloomFilter.serializer().deserialize(dis);
+                    return new IndexedEntry(position, new DeletionInfo(mfda, ldt), columnsIndex, bf);
+                }
+                else
+                {
+                    return new RowIndexEntry(position);
+                }
+            }
+            else
+            {
+                return new RowIndexEntry(position);
+            }
+        }
+
+        public void skip(DataInput dis, Descriptor descriptor) throws IOException
+        {
+            dis.readLong();
+            if (!descriptor.hasPromotedIndexes)
+                return;
+
+            int size = dis.readInt();
+            if (size <= 0)
+                return;
+
+            FileUtils.skipBytesFully(dis, size);
+        }
+    }
+
+    /**
+     * An entry in the row index for a row whose columns are indexed.
+     */
+    private static class IndexedEntry extends RowIndexEntry
+    {
+        private final DeletionInfo deletionInfo;
+        private final List<IndexHelper.IndexInfo> columnsIndex;
+        private final BloomFilter bloomFilter;
+
+        private IndexedEntry(long position, DeletionInfo deletionInfo, List<IndexHelper.IndexInfo> columnsIndex, BloomFilter bloomFilter)
+        {
+            super(position);
+            assert deletionInfo != null;
+            assert columnsIndex != null && columnsIndex.size() > 1;
+            this.deletionInfo = deletionInfo;
+            this.columnsIndex = columnsIndex;
+            this.bloomFilter = bloomFilter;
+        }
+
+        @Override
+        public DeletionInfo deletionInfo()
+        {
+            return deletionInfo;
+        }
+
+        @Override
+        public List<IndexHelper.IndexInfo> columnsIndex()
+        {
+            return columnsIndex;
+        }
+
+        @Override
+        public BloomFilter bloomFilter()
+        {
+            return bloomFilter;
+        }
+
+        public int serializedSize()
+        {
+            int size = DBConstants.LONG_SIZE + DBConstants.INT_SIZE; // deletion info
+            size += DBConstants.INT_SIZE; // number of entries
+            for (IndexHelper.IndexInfo info : columnsIndex)
+                size += info.serializedSize();
+            return size + (int)bloomFilter.serializedSize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index d4c7a47..50ada51 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -29,12 +29,14 @@ import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  *  This is a reader that finds the block for a starting column and returns
@@ -45,8 +47,10 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
 {
     private final ColumnFamily emptyColumnFamily;
 
+    private final SSTableReader sstable;
     private final List<IndexHelper.IndexInfo> indexes;
-    private final FileDataInput file;
+    private final FileDataInput originalInput;
+    private FileDataInput file;
     private final ByteBuffer startColumn;
     private final ByteBuffer finishColumn;
     private final boolean reversed;
@@ -55,20 +59,41 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
     private final Deque<IColumn> blockColumns = new ArrayDeque<IColumn>();
     private final AbstractType<?> comparator;
 
-    public IndexedSliceReader(SSTableReader sstable, FileDataInput input, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    public IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
     {
-        this.file = input;
+        this.sstable = sstable;
+        this.originalInput = input;
         this.startColumn = startColumn;
         this.finishColumn = finishColumn;
         this.reversed = reversed;
         this.comparator = sstable.metadata.comparator;
+
         try
         {
-            IndexHelper.skipBloomFilter(file);
-            indexes = IndexHelper.deserializeIndex(file);
-
-            emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
-            fetcher = indexes == null ? new SimpleBlockFetcher() : new IndexedBlockFetcher();
+            if (sstable.descriptor.hasPromotedIndexes)
+            {
+                this.indexes = indexEntry.columnsIndex();
+                if (indexes.isEmpty())
+                {
+                    setToRowStart(sstable, indexEntry, input);
+                    this.emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+                    fetcher = new SimpleBlockFetcher();
+                }
+                else
+                {
+                    this.emptyColumnFamily = ColumnFamily.create(sstable.metadata);
+                    emptyColumnFamily.delete(indexEntry.deletionInfo());
+                    fetcher = new IndexedBlockFetcher(indexEntry);
+                }
+            }
+            else
+            {
+                setToRowStart(sstable, indexEntry, input);
+                IndexHelper.skipBloomFilter(file);
+                this.indexes = IndexHelper.deserializeIndex(file);
+                this.emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+                fetcher = indexes.isEmpty() ? new SimpleBlockFetcher() : new IndexedBlockFetcher();
+            }
         }
         catch (IOException e)
         {
@@ -77,6 +102,21 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
         }
     }
 
+    private void setToRowStart(SSTableReader reader, RowIndexEntry indexEntry, FileDataInput input) throws IOException
+    {
+        if (input == null)
+        {
+            this.file = sstable.getFileDataInput(indexEntry.position);
+        }
+        else
+        {
+            this.file = input;
+            input.seek(indexEntry.position);
+        }
+        sstable.decodeKey(ByteBufferUtil.readWithShortLength(file));
+        SSTableReader.readRowSize(file, sstable.descriptor);
+    }
+
     public ColumnFamily getColumnFamily()
     {
         return emptyColumnFamily;
@@ -124,8 +164,10 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
         }
     }
 
-    public void close()
+    public void close() throws IOException
     {
+        if (originalInput == null && file != null)
+            file.close();
     }
 
     interface BlockFetcher
@@ -135,13 +177,19 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
 
     private class IndexedBlockFetcher implements BlockFetcher
     {
-        private final FileMark mark;
+        private final long basePosition;
         private int curRangeIndex;
 
         IndexedBlockFetcher() throws IOException
         {
             file.readInt(); // column count
-            this.mark = file.mark();
+            basePosition = file.getFilePointer();
+            curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
+        }
+
+        IndexedBlockFetcher(RowIndexEntry indexEntry)
+        {
+            basePosition = indexEntry.position;
             curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
         }
 
@@ -168,9 +216,15 @@ class IndexedSliceReader extends AbstractIterator<IColumn> implements IColumnIte
             }
 
             boolean outOfBounds = false;
-            file.reset(mark);
-            FileUtils.skipBytesFully(file, curColPosition.offset);
-            while (file.bytesPastMark(mark) < curColPosition.offset + curColPosition.width && !outOfBounds)
+            long positionToSeek = basePosition + curColPosition.offset;
+
+            // With new promoted indexes, our first seek in the data file will happen at that point.
+            if (file == null)
+                file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
+
+            file.seek(positionToSeek);
+            FileMark mark = file.mark();
+            while (file.bytesPastMark(mark) < curColPosition.width && !outOfBounds)
             {
                 IColumn column = emptyColumnFamily.getColumnSerializer().deserialize(file);
                 if (reversed)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 7471778..4d3a148 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -31,20 +31,25 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Filter;
+import org.apache.cassandra.utils.Pair;
 
 public class SSTableNamesIterator extends SimpleAbstractColumnIterator implements IColumnIterator
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableNamesIterator.class);
 
     private ColumnFamily cf;
+    private final SSTableReader sstable;
+    private FileDataInput fileToClose;
     private Iterator<IColumn> iter;
     public final SortedSet<ByteBuffer> columns;
     public final DecoratedKey key;
@@ -52,21 +57,17 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
     public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<ByteBuffer> columns)
     {
         assert columns != null;
+        this.sstable = sstable;
         this.columns = columns;
         this.key = key;
 
-        FileDataInput file = sstable.getFileDataInput(key, DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024);
-        if (file == null)
+        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
+        if (indexEntry == null)
             return;
 
         try
         {
-            DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner,
-                                                             sstable.descriptor,
-                                                             ByteBufferUtil.readWithShortLength(file));
-            assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
-            SSTableReader.readRowSize(file, sstable.descriptor);
-            read(sstable, file);
+            read(sstable, null, indexEntry);
         }
         catch (IOException e)
         {
@@ -75,19 +76,21 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         }
         finally
         {
-            FileUtils.closeQuietly(file);
+            if (fileToClose != null)
+                FileUtils.closeQuietly(fileToClose);
         }
     }
 
-    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns)
+    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<ByteBuffer> columns, RowIndexEntry indexEntry)
     {
         assert columns != null;
+        this.sstable = sstable;
         this.columns = columns;
         this.key = key;
 
         try
         {
-            read(sstable, file);
+            read(sstable, file, indexEntry);
         }
         catch (IOException ioe)
         {
@@ -96,26 +99,69 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         }
     }
 
-    private void read(SSTableReader sstable, FileDataInput file)
+    private FileDataInput createFileDataInput(long position)
+    {
+        fileToClose = sstable.getFileDataInput(position);
+        return fileToClose;
+    }
+
+    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
     throws IOException
     {
-        Filter bf = IndexHelper.defreezeBloomFilter(file, sstable.descriptor.usesOldBloomFilter);
-        List<IndexHelper.IndexInfo> indexList = IndexHelper.deserializeIndex(file);
+        Filter bf;
+        List<IndexHelper.IndexInfo> indexList;
+
+        // If the entry is not indexed or the index is not promoted, read from the row start
+        if (!indexEntry.isIndexed())
+        {
+            if (file == null)
+                file = createFileDataInput(indexEntry.position);
+            else
+                file.seek(indexEntry.position);
+
+            DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner,
+                                                             sstable.descriptor,
+                                                             ByteBufferUtil.readWithShortLength(file));
+            assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
+            SSTableReader.readRowSize(file, sstable.descriptor);
+        }
+
+        if (sstable.descriptor.hasPromotedIndexes)
+        {
+            bf = indexEntry.isIndexed() ? indexEntry.bloomFilter() : null;
+            indexList = indexEntry.columnsIndex();
+        }
+        else
+        {
+            assert file != null;
+            bf = IndexHelper.defreezeBloomFilter(file, sstable.descriptor.usesOldBloomFilter);
+            indexList = IndexHelper.deserializeIndex(file);
+        }
 
-        // we can stop early if bloom filter says none of the columns actually exist -- but,
-        // we can't stop before initializing the cf above, in case there's a relevant tombstone
-        ColumnFamilySerializer serializer = ColumnFamily.serializer();
-        try {
-            cf = serializer.deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
-        } catch (Exception e) {
-            throw new IOException
-                (serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
+        if (!indexEntry.isIndexed())
+        {
+            // we can stop early if bloom filter says none of the columns actually exist -- but,
+            // we can't stop before initializing the cf above, in case there's a relevant tombstone
+            ColumnFamilySerializer serializer = ColumnFamily.serializer();
+            try
+            {
+                cf = serializer.deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
+            }
+            catch (Exception e)
+            {
+                throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
+            }
+        }
+        else
+        {
+            cf = ColumnFamily.create(sstable.metadata);
+            cf.delete(indexEntry.deletionInfo());
         }
 
         List<ByteBuffer> filteredColumnNames = new ArrayList<ByteBuffer>(columns.size());
         for (ByteBuffer name : columns)
         {
-            if (bf.isPresent(name))
+            if (bf == null || bf.isPresent(name))
             {
                 filteredColumnNames.add(name);
             }
@@ -123,10 +169,25 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         if (filteredColumnNames.isEmpty())
             return;
 
-        if (indexList == null)
+        if (indexList.isEmpty())
+        {
             readSimpleColumns(file, columns, filteredColumnNames);
+        }
         else
-            readIndexedColumns(sstable.metadata, file, columns, filteredColumnNames, indexList);
+        {
+            long basePosition;
+            if (sstable.descriptor.hasPromotedIndexes)
+            {
+                basePosition = indexEntry.position;
+            }
+            else
+            {
+                assert file != null;
+                file.readInt(); // column count
+                basePosition = file.getFilePointer();
+            }
+            readIndexedColumns(sstable.metadata, file, columns, filteredColumnNames, indexList, basePosition);
+        }
 
         // create an iterator view of the columns we read
         iter = cf.iterator();
@@ -148,11 +209,9 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
         }
     }
 
-    private void readIndexedColumns(CFMetaData metadata, FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<IndexHelper.IndexInfo> indexList)
+    private void readIndexedColumns(CFMetaData metadata, FileDataInput file, SortedSet<ByteBuffer> columnNames, List<ByteBuffer> filteredColumnNames, List<IndexHelper.IndexInfo> indexList, long basePosition)
     throws IOException
     {
-        file.readInt(); // column count
-
         /* get the various column ranges we have to read */
         AbstractType<?> comparator = metadata.comparator;
         SortedSet<IndexHelper.IndexInfo> ranges = new TreeSet<IndexHelper.IndexInfo>(IndexHelper.getComparator(comparator, false));
@@ -167,13 +226,21 @@ public class SSTableNamesIterator extends SimpleAbstractColumnIterator implement
             ranges.add(indexInfo);
         }
 
-        FileMark mark = file.mark();
+        if (ranges.isEmpty())
+            return;
+
         for (IndexHelper.IndexInfo indexInfo : ranges)
         {
-            file.reset(mark);
-            FileUtils.skipBytesFully(file, indexInfo.offset);
+            long positionToSeek = basePosition + indexInfo.offset;
+
+            // With new promoted indexes, our first seek in the data file will happen at that point.
+            if (file == null)
+                file = createFileDataInput(positionToSeek);
+
+            file.seek(positionToSeek);
+            FileMark mark = file.mark();
             // TODO only completely deserialize columns we are interested in
-            while (file.bytesPastMark(mark) < indexInfo.offset + indexInfo.width)
+            while (file.bytesPastMark(mark) < indexInfo.width)
             {
                 IColumn column = cf.getColumnSerializer().deserialize(file);
                 // we check vs the original Set, not the filtered List, for efficiency

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
index 5e6aff0..feace3e 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableSliceIterator.java
@@ -20,47 +20,32 @@ package org.apache.cassandra.db.columniterator;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
 /**
  *  A Column Iterator over SSTable
  */
 public class SSTableSliceIterator implements IColumnIterator
 {
-    private final FileDataInput fileToClose;
-    private IColumnIterator reader;
+    private final IColumnIterator reader;
     private final DecoratedKey key;
 
     public SSTableSliceIterator(SSTableReader sstable, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
     {
         this.key = key;
-        fileToClose = sstable.getFileDataInput(this.key, DatabaseDescriptor.getSlicedReadBufferSizeInKB() * 1024);
-        if (fileToClose == null)
-            return;
-
-        try
-        {
-            DecoratedKey keyInDisk = SSTableReader.decodeKey(sstable.partitioner,
-                                                             sstable.descriptor,
-                                                             ByteBufferUtil.readWithShortLength(fileToClose));
-            assert keyInDisk.equals(key)
-                   : String.format("%s != %s in %s", keyInDisk, key, fileToClose.getPath());
-            SSTableReader.readRowSize(fileToClose, sstable.descriptor);
-        }
-        catch (IOException e)
-        {
-            sstable.markSuspect();
-            throw new IOError(e);
-        }
-
-        reader = createReader(sstable, fileToClose, startColumn, finishColumn, reversed);
+        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
+        this.reader = indexEntry == null ? null : createReader(sstable, indexEntry, null, startColumn, finishColumn, reversed);
     }
 
     /**
@@ -75,18 +60,17 @@ public class SSTableSliceIterator implements IColumnIterator
      * @param finishColumn The end of the slice
      * @param reversed Results are returned in reverse order iff reversed is true.
      */
-    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    public SSTableSliceIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed, RowIndexEntry indexEntry)
     {
         this.key = key;
-        fileToClose = null;
-        reader = createReader(sstable, file, startColumn, finishColumn, reversed);
+        reader = createReader(sstable, indexEntry, file, startColumn, finishColumn, reversed);
     }
 
-    private static IColumnIterator createReader(SSTableReader sstable, FileDataInput file, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
+    private static IColumnIterator createReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput file, ByteBuffer startColumn, ByteBuffer finishColumn, boolean reversed)
     {
         return startColumn.remaining() == 0 && !reversed
-                 ? new SimpleSliceReader(sstable, file, finishColumn)
-                 : new IndexedSliceReader(sstable, file, startColumn, finishColumn, reversed);
+                 ? new SimpleSliceReader(sstable, indexEntry, file, finishColumn)
+                 : new IndexedSliceReader(sstable, indexEntry, file, startColumn, finishColumn, reversed);
     }
 
     public DecoratedKey getKey()
@@ -116,8 +100,8 @@ public class SSTableSliceIterator implements IColumnIterator
 
     public void close() throws IOException
     {
-        if (fileToClose != null)
-            fileToClose.close();
+        if (reader != null)
+            reader.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
index e54e9bf..de54a12 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SimpleSliceReader.java
@@ -26,15 +26,18 @@ import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIterator
 {
     private final FileDataInput file;
+    private final boolean needsClosing;
     private final ByteBuffer finishColumn;
     private final AbstractType<?> comparator;
     private final ColumnFamily emptyColumnFamily;
@@ -42,15 +45,33 @@ class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIter
     private int i;
     private FileMark mark;
 
-    public SimpleSliceReader(SSTableReader sstable, FileDataInput input, ByteBuffer finishColumn)
+    public SimpleSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ByteBuffer finishColumn)
     {
-        this.file = input;
         this.finishColumn = finishColumn;
-        comparator = sstable.metadata.comparator;
+        this.comparator = sstable.metadata.comparator;
         try
         {
-            IndexHelper.skipBloomFilter(file);
-            IndexHelper.skipIndex(file);
+            if (input == null)
+            {
+                this.file = sstable.getFileDataInput(indexEntry.position);
+                this.needsClosing = true;
+            }
+            else
+            {
+                this.file = input;
+                input.seek(indexEntry.position);
+                this.needsClosing = false;
+            }
+
+            // Skip key and data size
+            ByteBufferUtil.skipShortLength(file);
+            SSTableReader.readRowSize(file, sstable.descriptor);
+
+            if (!sstable.descriptor.hasPromotedIndexes)
+            {
+                IndexHelper.skipBloomFilter(file);
+                IndexHelper.skipIndex(file);
+            }
 
             emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(sstable.metadata), file);
             columns = file.readInt();
@@ -92,6 +113,8 @@ class SimpleSliceReader extends AbstractIterator<IColumn> implements IColumnIter
 
     public void close() throws IOException
     {
+        if (needsClosing)
+            file.close();
     }
 
     public DecoratedKey getKey()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 5a892c1..55a440d 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -19,10 +19,13 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 import java.security.MessageDigest;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.ColumnIndex;
 
 /**
  * a CompactedRow is an object that takes a bunch of rows (keys + columnfamilies)
@@ -65,4 +68,14 @@ public abstract class AbstractCompactedRow
      * contain default values if computing them value would require extra effort we're not willing to make.
      */
     public abstract ColumnStats columnStats();
+
+    /**
+     * @return the compacted row deletion infos.
+     */
+    public abstract DeletionInfo deletionInfo();
+
+    /**
+     * @return the column index for this row.
+     */
+    public abstract ColumnIndex index();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index decbdfa..afb2879 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.EchoedRow;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
@@ -43,7 +42,6 @@ public class CompactionController
     private final boolean forceDeserialize;
 
     public final int gcBefore;
-    public final boolean keyExistenceIsExpensive;
     public final int mergeShardBefore;
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
@@ -58,7 +56,6 @@ public class CompactionController
         // current 'stop all write during memtable switch' situation).
         this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
         this.forceDeserialize = forceDeserialize;
-        this.keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(this.sstables);
     }
 
     public String getKeyspace()
@@ -124,15 +121,6 @@ public class CompactionController
         for (SSTableIdentityIterator row : rows)
             rowSize += row.dataSize;
 
-        // in-memory echoedrow is only enabled if we think checking for the key's existence in the other sstables,
-        // is going to be less expensive than simply de/serializing the row again
-        if (rows.size() == 1 && !needDeserialize()
-            && (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit() || !keyExistenceIsExpensive)
-            && !shouldPurge(rows.get(0).getKey()))
-        {
-            return new EchoedRow(rows.get(0));
-        }
-
         if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
         {
             String keyString = cfs.metadata.getKeyValidator().getString(rows.get(0).getKey().key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 590c3d7..13dc86f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -480,7 +480,7 @@ public class CompactionManager implements CompactionManagerMBean
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
                 // throw away variable so we don't have a side effect in the assert
-                long firstRowPositionFromIndex = indexFile.readLong();
+                long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor).position;
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
@@ -515,7 +515,9 @@ public class CompactionManager implements CompactionManagerMBean
                 try
                 {
                     nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                    nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+                    nextRowPositionFromIndex = indexFile.isEOF()
+                                             ? dataFile.length()
+                                             : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor).position;
                 }
                 catch (Throwable th)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 6961490..731c7f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -125,11 +126,11 @@ public class CompactionTask extends AbstractCompactionTask
                                       : new CompactionIterable(compactionType, toCompact, controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
-        Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
+        Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 
         // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
         // replace the old entries.  Track entries to preheat here until then.
-        Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap =  new HashMap<SSTableReader, Map<DecoratedKey, Long>>();
+        Map<SSTableReader, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new HashMap<SSTableReader, Map<DecoratedKey, RowIndexEntry>>();
 
         Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
         Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
@@ -158,7 +159,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (row.isEmpty())
                     continue;
 
-                long position = writer.append(row);
+                RowIndexEntry indexEntry = writer.append(row);
                 totalkeysWritten++;
 
                 if (DatabaseDescriptor.getPreheatKeyCache())
@@ -167,12 +168,12 @@ public class CompactionTask extends AbstractCompactionTask
                     {
                         if (sstable.getCachedPosition(row.key, false) != null)
                         {
-                            cachedKeys.put(row.key, position);
+                            cachedKeys.put(row.key, indexEntry);
                             break;
                         }
                     }
                 }
-                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, position))
+                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer, indexEntry.position))
                 {
                     SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                     cachedKeyMap.put(toIndex, cachedKeys);
@@ -181,7 +182,7 @@ public class CompactionTask extends AbstractCompactionTask
                     {
                         writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
                         writers.add(writer);
-                        cachedKeys = new HashMap<DecoratedKey, Long>();
+                        cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
                     }
                 }
             }
@@ -201,10 +202,10 @@ public class CompactionTask extends AbstractCompactionTask
 
         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
         // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
-        for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
+        for (Entry<SSTableReader, Map<DecoratedKey, RowIndexEntry>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
         {
             SSTableReader key = ssTableReaderMapEntry.getKey();
-            for (Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
+            for (Entry<DecoratedKey, RowIndexEntry> entry : ssTableReaderMapEntry.getValue().entrySet())
                key.cacheKey(entry.getKey(), entry.getValue());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index eaf401a..deeef24 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -58,12 +58,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
     private final List<? extends ICountableColumnIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
-    private final DataOutputBuffer headerBuffer;
     private ColumnFamily emptyColumnFamily;
     private Reducer reducer;
     private final ColumnStats columnStats;
     private long columnSerializedSize;
     private boolean closed;
+    private final ColumnIndex columnsIndex;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
     {
@@ -82,9 +82,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
                 emptyColumnFamily.delete(cf);
         }
 
-        // initialize row header so isEmpty can be called
-        headerBuffer = new DataOutputBuffer();
-        ColumnIndexer.serialize(this, headerBuffer);
+        this.columnsIndex = new ColumnIndex.Builder(emptyColumnFamily.getComparator(), key.key, getEstimatedColumnCount()).build(this);
         // reach into the reducer used during iteration to get column count, size, max column timestamp
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MIN_VALUE : reducer.maxTimestampSeen,
@@ -101,13 +99,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         DataOutputBuffer clockOut = new DataOutputBuffer();
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 
-        long dataSize = headerBuffer.getLength() + clockOut.getLength() + columnSerializedSize;
+        long dataSize = clockOut.getLength() + columnSerializedSize;
         if (logger.isDebugEnabled())
-            logger.debug(String.format("header / clock / column sizes are %s / %s / %s",
-                         headerBuffer.getLength(), clockOut.getLength(), columnSerializedSize));
+            logger.debug(String.format("clock / column sizes are %s / %s", clockOut.getLength(), columnSerializedSize));
         assert dataSize > 0;
         out.writeLong(dataSize);
-        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
         out.write(clockOut.getData(), 0, clockOut.getLength());
         out.writeInt(columnStats.columnCount);
 
@@ -203,6 +199,19 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements IIterabl
         closed = true;
     }
 
+    public DeletionInfo deletionInfo()
+    {
+        return emptyColumnFamily.deletionInfo();
+    }
+
+    /**
+     * @return the column index for this row.
+     */
+    public ColumnIndex index()
+    {
+        return columnsIndex;
+    }
+
     private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 35cd33d..e383a4e 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -29,9 +29,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ColumnIndexer;
+import org.apache.cassandra.db.ColumnIndex;
 import org.apache.cassandra.db.CounterColumn;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -131,12 +132,9 @@ public class PrecompactedRow extends AbstractCompactedRow
     {
         assert compactedCf != null;
         DataOutputBuffer buffer = new DataOutputBuffer();
-        DataOutputBuffer headerBuffer = new DataOutputBuffer();
-        ColumnIndexer.serialize(compactedCf, headerBuffer);
         ColumnFamily.serializer().serializeForSSTable(compactedCf, buffer);
-        int dataSize = headerBuffer.getLength() + buffer.getLength();
-        out.writeLong(dataSize);
-        out.write(headerBuffer.getData(), 0, headerBuffer.getLength());
+        int dataSize = buffer.getLength();
+        out.writeLong(buffer.getLength());
         out.write(buffer.getData(), 0, buffer.getLength());
         return dataSize;
     }
@@ -178,4 +176,17 @@ public class PrecompactedRow extends AbstractCompactedRow
     {
         return compactedCf;
     }
+
+    public DeletionInfo deletionInfo()
+    {
+        return compactedCf.deletionInfo();
+    }
+
+    /**
+     * @return the column index for this row.
+     */
+    public ColumnIndex index()
+    {
+        return new ColumnIndex.Builder(compactedCf.getComparator(), key.key, compactedCf.getColumnCount()).build(compactedCf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/filter/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IFilter.java b/src/java/org/apache/cassandra/db/filter/IFilter.java
index 95c46e8..7c1ffb1 100644
--- a/src/java/org/apache/cassandra/db/filter/IFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IFilter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.filter;
 
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
@@ -48,7 +49,7 @@ public interface IFilter
      * @param file Already opened file data input, saves us opening another one
      * @param key The key of the row we are about to iterate over
      */
-    public abstract IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key);
+    public abstract IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry);
 
     /**
      * returns an iterator that returns columns from the given SSTable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 0332d9a..a2c995f 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.filter;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.SortedSet;
 
 import org.apache.commons.lang.StringUtils;
@@ -56,9 +57,9 @@ public class NamesQueryFilter implements IFilter
         return new SSTableNamesIterator(sstable, key, columns);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key)
+    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        return new SSTableNamesIterator(sstable, file, key, columns);
+        return new SSTableNamesIterator(sstable, file, key, columns, indexEntry);
     }
 
     public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index 05bef37..1c923a4 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -75,11 +75,11 @@ public class QueryFilter
         return superFilter.getSSTableColumnIterator(sstable, key);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key)
+    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
         if (path.superColumnName == null)
-            return filter.getSSTableColumnIterator(sstable, file, key);
-        return superFilter.getSSTableColumnIterator(sstable, file, key);
+            return filter.getSSTableColumnIterator(sstable, file, key, indexEntry);
+        return superFilter.getSSTableColumnIterator(sstable, file, key, indexEntry);
     }
 
     // TODO move gcBefore into a field

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 9d5d84f..d688d14 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -61,9 +61,9 @@ public class SliceQueryFilter implements IFilter
         return new SSTableSliceIterator(sstable, key, start, finish, reversed);
     }
 
-    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key)
+    public IColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
     {
-        return new SSTableSliceIterator(sstable, file, key, start, finish, reversed);
+        return new SSTableSliceIterator(sstable, file, key, start, finish, reversed, indexEntry);
     }
 
     public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 734d742..4c9ab2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -53,8 +53,9 @@ public class Descriptor
     // h (1.0): tracks max client timestamp in metadata component
     // hb (1.0.3): records compression ration in metadata component
     // hc (1.0.4): records partitioner in metadata component
-    // hd (1.2): records estimated histogram of deletion times in tombstones
-    public static final String CURRENT_VERSION = "hd";
+    // ia (1.2.0): column indexes are promoted to the index file
+    //             records estimated histogram of deletion times in tombstones
+    public static final String CURRENT_VERSION = "ia";
 
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
@@ -75,6 +76,7 @@ public class Descriptor
     public final boolean hasCompressionRatio;
     public final boolean hasPartitioner;
     public final boolean tracksTombstones;
+    public final boolean hasPromotedIndexes;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
@@ -103,7 +105,8 @@ public class Descriptor
         tracksMaxTimestamp = version.compareTo("h") >= 0;
         hasCompressionRatio = version.compareTo("hb") >= 0;
         hasPartitioner = version.compareTo("hc") >= 0;
-        tracksTombstones = version.compareTo("hd") >= 0;
+        tracksTombstones = version.compareTo("ia") >= 0;
+        hasPromotedIndexes = version.compareTo("ia") >= 0;
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 2262196..798dc19 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -88,11 +88,11 @@ public class IndexHelper
      * @return ArrayList<IndexInfo> - list of de-serialized indexes
      * @throws IOException if an I/O error occurs.
      */
-    public static ArrayList<IndexInfo> deserializeIndex(FileDataInput in) throws IOException
+    public static List<IndexInfo> deserializeIndex(FileDataInput in) throws IOException
     {
         int columnIndexSize = in.readInt();
         if (columnIndexSize == 0)
-            return null;
+            return Collections.<IndexInfo>emptyList();
         ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
         FileMark mark = in.mark();
         while (in.bytesPastMark(mark) < columnIndexSize)
@@ -206,7 +206,7 @@ public class IndexHelper
             return 2 + firstName.remaining() + 2 + lastName.remaining() + 8 + 8;
         }
 
-        public static IndexInfo deserialize(FileDataInput dis) throws IOException
+        public static IndexInfo deserialize(DataInput dis) throws IOException
         {
             return new IndexInfo(ByteBufferUtil.readWithShortLength(dis), ByteBufferUtil.readWithShortLength(dis), dis.readLong(), dis.readLong());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 7771d4b..46a454f 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -54,7 +55,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close
             if (in.isEOF())
                 return endOfData();
             DecoratedKey key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, ByteBufferUtil.readWithShortLength(in));
-            in.readLong(); // skip data position
+            RowIndexEntry.serializer.skip(in, desc); // skip remainder of the entry
             return key;
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e17ac46/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 074c7fd..a18a973 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -162,6 +163,11 @@ public abstract class SSTable
         return descriptor.filenameFor(COMPONENT_DATA);
     }
 
+    public String getIndexFilename()
+    {
+        return descriptor.filenameFor(COMPONENT_INDEX);
+    }
+
     public String getColumnFamilyName()
     {
         return descriptor.cfname;
@@ -207,7 +213,7 @@ public abstract class SSTable
     }
 
     /** @return An estimate of the number of keys contained in the given index file. */
-    static long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
+    long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
     {
         // collect sizes for the first 10000 keys, or first 10 megabytes of data
         final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, ifile.length());
@@ -215,7 +221,7 @@ public abstract class SSTable
         while (ifile.getFilePointer() < BYTES_CAP && keys < SAMPLES_CAP)
         {
             ByteBufferUtil.skipShortLength(ifile);
-            FileUtils.skipBytesFully(ifile, 8);
+            RowIndexEntry.serializer.skip(ifile, descriptor);
             keys++;
         }
         assert keys > 0 && ifile.getFilePointer() > 0 && ifile.length() > 0 : "Unexpected empty index file: " + ifile;


Mime
View raw message