cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [33/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:57 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
deleted file mode 100644
index 65e153f..0000000
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class SuperColumns
-{
-    public static Iterator<OnDiskAtom> onDiskIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type)
-    {
-        return new SCIterator(in, superColumnCount, flag, expireBefore, type);
-    }
-
-    public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException
-    {
-        // Note that there was no way to insert a range tombstone in a SCF in 1.2
-        cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version));
-        assert !cf.deletionInfo().rangeIterator().hasNext();
-
-        Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE, cf.getComparator());
-        while (iter.hasNext())
-            cf.addAtom(iter.next());
-    }
-
-    private static class SCIterator implements Iterator<OnDiskAtom>
-    {
-        private final DataInput in;
-        private final int scCount;
-
-        private final ColumnSerializer.Flag flag;
-        private final int expireBefore;
-
-        private final CellNameType type;
-
-        private int read;
-        private ByteBuffer scName;
-        private Iterator<Cell> subColumnsIterator;
-
-        private SCIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type)
-        {
-            this.in = in;
-            this.scCount = superColumnCount;
-            this.flag = flag;
-            this.expireBefore = expireBefore;
-            this.type = type;
-        }
-
-        public boolean hasNext()
-        {
-            return (subColumnsIterator != null && subColumnsIterator.hasNext()) || read < scCount;
-        }
-
-        public OnDiskAtom next()
-        {
-            try
-            {
-                if (subColumnsIterator != null && subColumnsIterator.hasNext())
-                {
-                    Cell c = subColumnsIterator.next();
-                    return c.withUpdatedName(type.makeCellName(scName, c.name().toByteBuffer()));
-                }
-
-                // Read one more super column
-                ++read;
-
-                scName = ByteBufferUtil.readWithShortLength(in);
-                DeletionInfo delInfo = new DeletionInfo(DeletionTime.serializer.deserialize(in));
-
-                /* read the number of columns */
-                int size = in.readInt();
-                List<Cell> subCells = new ArrayList<>(size);
-
-                ColumnSerializer colSer = subType(type).columnSerializer();
-                for (int i = 0; i < size; ++i)
-                    subCells.add(colSer.deserialize(in, flag, expireBefore));
-
-                subColumnsIterator = subCells.iterator();
-
-                // If the SC was deleted, return that first, otherwise return the first subcolumn
-                DeletionTime dtime = delInfo.getTopLevelDeletion();
-                if (!dtime.equals(DeletionTime.LIVE))
-                    return new RangeTombstone(startOf(scName), endOf(scName), dtime);
-
-                return next();
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static CellNameType subType(CellNameType type)
-    {
-        return new SimpleDenseCellNameType(type.subtype(1));
-    }
-
-    public static CellNameType scNameType(CellNameType type)
-    {
-        return new SimpleDenseCellNameType(type.subtype(0));
-    }
-
-    public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn)
-    {
-        return getComparatorFor(metadata, superColumn != null);
-    }
-
-    public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn)
-    {
-        return metadata.isSuper()
-             ? metadata.comparator.subtype(subColumn ? 1 : 0)
-             : metadata.comparator.asAbstractType();
-    }
-
-    // Extract the first component of a columnName, i.e. the super column name
-    public static ByteBuffer scName(Composite columnName)
-    {
-        return columnName.get(0);
-    }
-
-    // Extract the 2nd component of a columnName, i.e. the sub-column name
-    public static ByteBuffer subName(Composite columnName)
-    {
-        return columnName.get(1);
-    }
-
-    public static Composite startOf(ByteBuffer scName)
-    {
-        return CellNames.compositeDense(scName).start();
-    }
-
-    public static Composite endOf(ByteBuffer scName)
-    {
-        return CellNames.compositeDense(scName).end();
-    }
-
-    public static IDiskAtomFilter fromSCFilter(CellNameType type, ByteBuffer scName, IDiskAtomFilter filter)
-    {
-        if (filter instanceof NamesQueryFilter)
-            return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter);
-        else
-            return fromSCSliceFilter(type, scName, (SliceQueryFilter)filter);
-    }
-
-    public static IDiskAtomFilter fromSCNamesFilter(CellNameType type, ByteBuffer scName, NamesQueryFilter filter)
-    {
-        if (scName == null)
-        {
-            ColumnSlice[] slices = new ColumnSlice[filter.columns.size()];
-            int i = 0;
-            for (CellName name : filter.columns)
-            {
-                // Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName.
-                // So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly.
-                // This is why we call buffer() and rebuild a  Composite of the right type before call slice().
-                slices[i++] = type.make(name.toByteBuffer()).slice();
-            }
-            return new SliceQueryFilter(slices, false, slices.length, 1);
-        }
-        else
-        {
-            SortedSet<CellName> newColumns = new TreeSet<>(type);
-            for (CellName c : filter.columns)
-                newColumns.add(type.makeCellName(scName, c.toByteBuffer()));
-            return filter.withUpdatedColumns(newColumns);
-        }
-    }
-
-    public static SliceQueryFilter fromSCSliceFilter(CellNameType type, ByteBuffer scName, SliceQueryFilter filter)
-    {
-        assert filter.slices.length == 1;
-        if (scName == null)
-        {
-            // The filter is on the super column name
-            CBuilder builder = type.builder();
-            Composite start = filter.start().isEmpty()
-                            ? Composites.EMPTY
-                            : builder.buildWith(filter.start().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START);
-            Composite finish = filter.finish().isEmpty()
-                             ? Composites.EMPTY
-                             : builder.buildWith(filter.finish().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END);
-            return new SliceQueryFilter(start, finish, filter.reversed, filter.count, 1);
-        }
-        else
-        {
-            CBuilder builder = type.builder().add(scName);
-            Composite start = filter.start().isEmpty()
-                            ? builder.build().withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START)
-                            : builder.buildWith(filter.start().toByteBuffer());
-            Composite end = filter.finish().isEmpty()
-                          ? builder.build().withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END)
-                          : builder.buildWith(filter.finish().toByteBuffer());
-            return new SliceQueryFilter(start, end, filter.reversed, filter.count);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 4bc1522..34c617f 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -36,11 +36,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -126,8 +125,10 @@ public final class SystemKeyspace
                 + "in_progress_ballot timeuuid,"
                 + "most_recent_commit blob,"
                 + "most_recent_commit_at timeuuid,"
+                + "most_recent_commit_version int,"
                 + "proposal blob,"
                 + "proposal_ballot timeuuid,"
+                + "proposal_version int,"
                 + "PRIMARY KEY ((row_key), cf_id))")
                 .compactionStrategyClass(LeveledCompactionStrategy.class);
 
@@ -831,27 +832,22 @@ public final class SystemKeyspace
 
     public static boolean isIndexBuilt(String keyspaceName, String indexName)
     {
-        ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES);
-        QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)),
-                                                        BUILT_INDEXES,
-                                                        FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()),
-                                                        System.currentTimeMillis());
-        return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null;
+        String req = "SELECT index_name FROM %s.\"%s\" WHERE table_name=? AND index_name=?";
+        UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName);
+        return !result.isEmpty();
     }
 
     public static void setIndexBuilt(String keyspaceName, String indexName)
     {
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES);
-        cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
-        new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply();
+        String req = "INSERT INTO %s.\"%s\" (table_name, index_name) VALUES (?, ?)";
+        executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName);
         forceBlockingFlush(BUILT_INDEXES);
     }
 
     public static void setIndexRemoved(String keyspaceName, String indexName)
     {
-        Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName));
-        mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
-        mutation.apply();
+        String req = "DELETE FROM %s.\"%s\" WHERE table_name = ? AND index_name = ?";
+        executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName);
         forceBlockingFlush(BUILT_INDEXES);
     }
 
@@ -884,23 +880,26 @@ public final class SystemKeyspace
         return hostId;
     }
 
-    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+
+    public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId);
+        UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
         Commit promised = row.has("in_progress_ballot")
-                        ? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata))
+                        ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.partitionColumns(), 1))
                         : Commit.emptyCommit(key, metadata);
         // either we have both a recently accepted ballot and update or we have neither
+        int proposalVersion = row.has("proposal_version") ? row.getInt("proposal_version") : MessagingService.VERSION_21;
         Commit accepted = row.has("proposal")
-                        ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
+                        ? new Commit(row.getUUID("proposal_ballot"), PartitionUpdate.fromBytes(row.getBytes("proposal"), proposalVersion, key))
                         : Commit.emptyCommit(key, metadata);
         // either most_recent_commit and most_recent_commit_at will both be set, or neither
+        int mostRecentVersion = row.has("most_recent_commit_version") ? row.getInt("most_recent_commit_version") : MessagingService.VERSION_21;
         Commit mostRecent = row.has("most_recent_commit")
-                          ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
+                          ? new Commit(row.getUUID("most_recent_commit_at"), PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), mostRecentVersion, key))
                           : Commit.emptyCommit(key, metadata);
         return new PaxosState(promised, accepted, mostRecent);
     }
@@ -910,21 +909,22 @@ public final class SystemKeyspace
         String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
         executeInternal(String.format(req, PAXOS),
                         UUIDGen.microsTimestamp(promise.ballot),
-                        paxosTtl(promise.update.metadata),
+                        paxosTtl(promise.update.metadata()),
                         promise.ballot,
-                        promise.key,
-                        promise.update.id());
+                        promise.update.partitionKey().getKey(),
+                        promise.update.metadata().cfId);
     }
 
     public static void savePaxosProposal(Commit proposal)
     {
-        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
+        executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
                         UUIDGen.microsTimestamp(proposal.ballot),
-                        paxosTtl(proposal.update.metadata),
+                        paxosTtl(proposal.update.metadata()),
                         proposal.ballot,
-                        proposal.update.toBytes(),
-                        proposal.key,
-                        proposal.update.id());
+                        PartitionUpdate.toBytes(proposal.update, MessagingService.current_version),
+                        MessagingService.current_version,
+                        proposal.update.partitionKey().getKey(),
+                        proposal.update.metadata().cfId);
     }
 
     private static int paxosTtl(CFMetaData metadata)
@@ -937,14 +937,15 @@ public final class SystemKeyspace
     {
         // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
         // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
-        String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?";
+        String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?";
         executeInternal(String.format(cql, PAXOS),
                         UUIDGen.microsTimestamp(commit.ballot),
-                        paxosTtl(commit.update.metadata),
+                        paxosTtl(commit.update.metadata()),
                         commit.ballot,
-                        commit.update.toBytes(),
-                        commit.key,
-                        commit.update.id());
+                        PartitionUpdate.toBytes(commit.update, MessagingService.current_version),
+                        MessagingService.current_version,
+                        commit.update.partitionKey().getKey(),
+                        commit.update.metadata().cfId);
     }
 
     /**
@@ -998,24 +999,24 @@ public final class SystemKeyspace
     public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates)
     {
         long timestamp = FBUtilities.timestampMicros();
-        Mutation mutation = new Mutation(NAME, UTF8Type.instance.decompose(keyspace));
+        DecoratedKey key = decorate(UTF8Type.instance.decompose(keyspace));
+        PartitionUpdate update = new PartitionUpdate(SizeEstimates, key, SizeEstimates.partitionColumns(), estimates.size());
+        Mutation mutation = new Mutation(update);
 
         // delete all previous values with a single range tombstone.
-        mutation.deleteRange(SIZE_ESTIMATES,
-                             SizeEstimates.comparator.make(table).start(),
-                             SizeEstimates.comparator.make(table).end(),
-                             timestamp - 1);
+        int nowInSec = FBUtilities.nowInSeconds();
+        update.addRangeTombstone(Slice.make(SizeEstimates.comparator, table), new SimpleDeletionTime(timestamp - 1, nowInSec));
 
         // add a CQL row for each primary token range.
-        ColumnFamily cells = mutation.addOrGet(SizeEstimates);
         for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet())
         {
             Range<Token> range = entry.getKey();
             Pair<Long, Long> values = entry.getValue();
-            Composite prefix = SizeEstimates.comparator.make(table, range.left.toString(), range.right.toString());
-            CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp);
-            adder.add("partitions_count", values.left)
-                 .add("mean_partition_size", values.right);
+            new RowUpdateBuilder(SizeEstimates, timestamp, mutation)
+                .clustering(table, range.left.toString(), range.right.toString())
+                .add("partitions_count", values.left)
+                .add("mean_partition_size", values.right)
+                .build();
         }
 
         mutation.apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
new file mode 100644
index 0000000..a15fb61
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Helper class to deserialize Unfiltered object from disk efficiently.
+ *
+ * More precisely, this class is used by the low-level reader to ensure
+ * we don't do more work than necessary (i.e. we don't allocate/deserialize
+ * objects for things we don't care about).
+ */
+public abstract class UnfilteredDeserializer
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class);
+
+    protected final CFMetaData metadata;
+    protected final DataInput in;
+    protected final SerializationHelper helper;
+
+    protected UnfilteredDeserializer(CFMetaData metadata,
+                                     DataInput in,
+                                     SerializationHelper helper)
+    {
+        this.metadata = metadata;
+        this.in = in;
+        this.helper = helper;
+    }
+
+    public static UnfilteredDeserializer create(CFMetaData metadata,
+                                                DataInput in,
+                                                SerializationHeader header,
+                                                SerializationHelper helper,
+                                                DeletionTime partitionDeletion,
+                                                boolean readAllAsDynamic)
+    {
+        if (helper.version >= MessagingService.VERSION_30)
+            return new CurrentDeserializer(metadata, in, header, helper);
+        else
+            return new OldFormatDeserializer(metadata, in, helper, partitionDeletion, readAllAsDynamic);
+    }
+
+    /**
+     * Whether or not there is more atom to read.
+     */
+    public abstract boolean hasNext() throws IOException;
+
+    /**
+     * Compare the provided bound to the next atom to read on disk.
+     *
+     * This will not read/deserialize the whole atom but only what is necessary for the
+     * comparison. Whenever we know what to do with this atom (read it or skip it),
+     * readNext or skipNext should be called.
+     */
+    public abstract int compareNextTo(Slice.Bound bound) throws IOException;
+
+    /**
+     * Returns whether the next atom is a row or not.
+     */
+    public abstract boolean nextIsRow() throws IOException;
+
+    /**
+     * Returns whether the next atom is the static row or not.
+     */
+    public abstract boolean nextIsStatic() throws IOException;
+
+    /**
+     * Returns the next atom.
+     */
+    public abstract Unfiltered readNext() throws IOException;
+
+    /**
+     * Clears any state in this deserializer.
+     */
+    public abstract void clearState() throws IOException;
+
+    /**
+     * Skips the next atom.
+     */
+    public abstract void skipNext() throws IOException;
+
+    private static class CurrentDeserializer extends UnfilteredDeserializer
+    {
+        private final ClusteringPrefix.Deserializer clusteringDeserializer;
+        private final SerializationHeader header;
+
+        private int nextFlags;
+        private boolean isReady;
+        private boolean isDone;
+
+        private final ReusableRow row;
+        private final RangeTombstoneMarker.Builder markerBuilder;
+
+        private CurrentDeserializer(CFMetaData metadata,
+                                    DataInput in,
+                                    SerializationHeader header,
+                                    SerializationHelper helper)
+        {
+            super(metadata, in, helper);
+            this.header = header;
+            this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
+            this.row = new ReusableRow(metadata.clusteringColumns().size(), header.columns().regulars, true, metadata.isCounter());
+            this.markerBuilder = new RangeTombstoneMarker.Builder(metadata.clusteringColumns().size());
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            if (isReady)
+                return true;
+
+            prepareNext();
+            return !isDone;
+        }
+
+        private void prepareNext() throws IOException
+        {
+            if (isDone)
+                return;
+
+            nextFlags = in.readUnsignedByte();
+            if (UnfilteredSerializer.isEndOfPartition(nextFlags))
+            {
+                isDone = true;
+                isReady = false;
+                return;
+            }
+
+            clusteringDeserializer.prepare(nextFlags);
+            isReady = true;
+        }
+
+        public int compareNextTo(Slice.Bound bound) throws IOException
+        {
+            if (!isReady)
+                prepareNext();
+
+            assert !isDone;
+
+            return clusteringDeserializer.compareNextTo(bound);
+        }
+
+        public boolean nextIsRow() throws IOException
+        {
+            if (!isReady)
+                prepareNext();
+
+            return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW;
+        }
+
+        public boolean nextIsStatic() throws IOException
+        {
+            // This exists only for the sake of the OldFormatDeserializer
+            throw new UnsupportedOperationException();
+        }
+
+        public Unfiltered readNext() throws IOException
+        {
+            isReady = false;
+            if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+            {
+                markerBuilder.reset();
+                RangeTombstone.Bound.Kind kind = clusteringDeserializer.deserializeNextBound(markerBuilder);
+                UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, kind.isBoundary(), markerBuilder);
+                return markerBuilder.build();
+            }
+            else
+            {
+                Row.Writer writer = row.writer();
+                clusteringDeserializer.deserializeNextClustering(writer);
+                UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, writer);
+                return row;
+            }
+        }
+
+        public void skipNext() throws IOException
+        {
+            isReady = false;
+            ClusteringPrefix.Kind kind = clusteringDeserializer.skipNext();
+            if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+            {
+                UnfilteredSerializer.serializer.skipMarkerBody(in, header, kind.isBoundary());
+            }
+            else
+            {
+                UnfilteredSerializer.serializer.skipRowBody(in, header, helper, nextFlags);
+            }
+        }
+
+        public void clearState()
+        {
+            isReady = false;
+            isDone = false;
+        }
+    }
+
+    public static class OldFormatDeserializer extends UnfilteredDeserializer
+    {
+        private final boolean readAllAsDynamic;
+        private boolean skipStatic;
+
+        private int nextFlags;
+        private boolean isDone;
+        private boolean isStart = true;
+
+        private final LegacyLayout.CellGrouper grouper;
+        private LegacyLayout.LegacyAtom nextAtom;
+
+        private boolean staticFinished;
+        private LegacyLayout.LegacyAtom savedAtom;
+
+        private final LegacyLayout.TombstoneTracker tombstoneTracker;
+
+        private RangeTombstoneMarker closingMarker;
+
+        private OldFormatDeserializer(CFMetaData metadata,
+                                      DataInput in,
+                                      SerializationHelper helper,
+                                      DeletionTime partitionDeletion,
+                                      boolean readAllAsDynamic)
+        {
+            super(metadata, in, helper);
+            this.readAllAsDynamic = readAllAsDynamic;
+            this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
+            this.tombstoneTracker = new LegacyLayout.TombstoneTracker(metadata, partitionDeletion);
+        }
+
+        public void setSkipStatic()
+        {
+            this.skipStatic = true;
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            if (nextAtom != null)
+                return true;
+
+            if (isDone)
+                return false;
+
+            return deserializeNextAtom();
+        }
+
+        private boolean deserializeNextAtom() throws IOException
+        {
+            if (staticFinished && savedAtom != null)
+            {
+                nextAtom = savedAtom;
+                savedAtom = null;
+                return true;
+            }
+
+            while (true)
+            {
+                nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+                if (nextAtom == null)
+                {
+                    isDone = true;
+                    return false;
+                }
+                else if (tombstoneTracker.isShadowed(nextAtom))
+                {
+                    // We don't want to return shadowed data because that would fail the contract
+                    // of UnfilteredRowIterator. However the old format could have shadowed data, so filter it here.
+                    nextAtom = null;
+                    continue;
+                }
+
+                tombstoneTracker.update(nextAtom);
+
+                // For static compact tables, the "column_metadata" columns are supposed to be static, but in the old
+                // format they are intermingled with other columns. We deal with that with 2 different strategy:
+                //  1) for thrift queries, we basically consider everything as a "dynamic" cell. This is ok because
+                //     that's basically what we end up with on ThriftResultsMerger has done its thing.
+                //  2) otherwise, we make sure to extract the "static" columns first (see AbstractSSTableIterator.readStaticRow
+                //     and SSTableSimpleIterator.readStaticRow) as a first pass. So, when we do a 2nd pass for dynamic columns
+                //     (which in practice we only do for compactions), we want to ignore those extracted static columns.
+                if (skipStatic && metadata.isStaticCompactTable() && nextAtom.isCell())
+                {
+                    LegacyLayout.LegacyCell cell = nextAtom.asCell();
+                    if (cell.name.column.isStatic())
+                    {
+                        nextAtom = null;
+                        continue;
+                    }
+                }
+
+                // We want to fetch the static row as the first thing this deserializer return.
+                // However, in practice, it's possible to have range tombstone before the static row cells
+                // if that tombstone has an empty start. So if we do, we save it initially so we can get
+                // to the static parts (if there is any).
+                if (isStart)
+                {
+                    isStart = false;
+                    if (!nextAtom.isCell())
+                    {
+                        LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
+                        if (tombstone.start.bound.size() == 0)
+                        {
+                            savedAtom = tombstone;
+                            nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
+                            if (nextAtom == null)
+                            {
+                                // That was actually the only atom so use it after all
+                                nextAtom = savedAtom;
+                                savedAtom = null;
+                            }
+                            else if (!nextAtom.isStatic())
+                            {
+                                // We don't have anything static. So we do want to send first
+                                // the saved atom, so switch
+                                LegacyLayout.LegacyAtom atom = nextAtom;
+                                nextAtom = savedAtom;
+                                savedAtom = atom;
+                            }
+                        }
+                    }
+                }
+
+                return true;
+            }
+        }
+
+        private void checkReady() throws IOException
+        {
+            if (nextAtom == null)
+                hasNext();
+            assert !isDone;
+        }
+
+        public int compareNextTo(Slice.Bound bound) throws IOException
+        {
+            checkReady();
+            return metadata.comparator.compare(nextAtom, bound);
+        }
+
+        public boolean nextIsRow() throws IOException
+        {
+            checkReady();
+            if (nextAtom.isCell())
+                return true;
+
+            LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
+            return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
+        }
+
+        public boolean nextIsStatic() throws IOException
+        {
+            checkReady();
+            return nextAtom.isStatic();
+        }
+
+        public Unfiltered readNext() throws IOException
+        {
+            if (!nextIsRow())
+            {
+                LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
+                // TODO: this is actually more complex, we can have repeated markers etc....
+                if (closingMarker == null)
+                    throw new UnsupportedOperationException();
+                closingMarker = new RangeTombstoneBoundMarker(tombstone.stop.bound, tombstone.deletionTime);
+                return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+            }
+
+            LegacyLayout.CellGrouper grouper = nextAtom.isStatic()
+                                             ? LegacyLayout.CellGrouper.staticGrouper(metadata, helper)
+                                             : this.grouper;
+
+            grouper.reset();
+            grouper.addAtom(nextAtom);
+            while (deserializeNextAtom() && grouper.addAtom(nextAtom))
+            {
+            }
+
+            // if this was the first static row, we're done with it. Otherwise, we're also done with static.
+            staticFinished = true;
+            return grouper.getRow();
+        }
+
+        public void skipNext() throws IOException
+        {
+            readNext();
+        }
+
+        public void clearState()
+        {
+            isDone = false;
+            nextAtom = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java
new file mode 100644
index 0000000..55dc453
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Exception thrown when we read a column internally that is unknown. Note that
+ * this is an internal exception and is not meant to be user facing.
+ */
+public class UnknownColumnException extends Exception
+{
+    public final ByteBuffer columnName;
+
+    public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
+    {
+        super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName));
+        this.columnName = columnName;
+    }
+
+    private static String stringify(ByteBuffer name)
+    {
+        try
+        {
+            return UTF8Type.instance.getString(name);
+        }
+        catch (Exception e)
+        {
+            return ByteBufferUtil.bytesToHex(name);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
new file mode 100644
index 0000000..b406251
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.columniterator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractSSTableIterator.class);
+
+    protected final SSTableReader sstable;
+    protected final DecoratedKey key;
+    protected final DeletionTime partitionLevelDeletion;
+    protected final ColumnFilter columns;
+    protected final SerializationHelper helper;
+
+    protected final Row staticRow;
+    protected final Reader reader;
+
+    private final boolean isForThrift;
+
+    private boolean isClosed;
+
+    @SuppressWarnings("resource") // We need this because the analysis is not able to determine that we do close
+                                  // file on every path where we created it.
+    protected AbstractSSTableIterator(SSTableReader sstable,
+                                      FileDataInput file,
+                                      DecoratedKey key,
+                                      RowIndexEntry indexEntry,
+                                      ColumnFilter columnFilter,
+                                      boolean isForThrift)
+    {
+        this.sstable = sstable;
+        this.key = key;
+        this.columns = columnFilter;
+        this.helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter);
+        this.isForThrift = isForThrift;
+
+        if (indexEntry == null)
+        {
+            this.partitionLevelDeletion = DeletionTime.LIVE;
+            this.reader = null;
+            this.staticRow = Rows.EMPTY_STATIC_ROW;
+        }
+        else
+        {
+            boolean shouldCloseFile = file == null;
+            try
+            {
+                // We seek to the beginning to the partition if either:
+                //   - the partition is not indexed; we then have a single block to read anyway
+                //     and we need to read the partition deletion time.
+                //   - we're querying static columns.
+                boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty();
+
+                // For CQL queries on static compact tables, we only want to consider static value (only those are exposed),
+                // but readStaticRow have already read them and might in fact have consumed the whole partition (when reading
+                // the legacy file format), so set the reader to null so we don't try to read anything more. We can remove this
+                // once we drop support for the legacy file format
+                boolean needsReader = sstable.descriptor.version.storeRows() || isForThrift || !sstable.metadata.isStaticCompactTable();
+
+                if (needSeekAtPartitionStart)
+                {
+                    // Not indexed (or is reading static), set to the beginning of the partition and read partition level deletion there
+                    if (file == null)
+                        file = sstable.getFileDataInput(indexEntry.position);
+                    else
+                        file.seek(indexEntry.position);
+
+                    ByteBufferUtil.skipShortLength(file); // Skip partition key
+                    this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+
+                    // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow
+                    // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format).
+                    this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+                    this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer);
+                }
+                else
+                {
+                    this.partitionLevelDeletion = indexEntry.deletionTime();
+                    this.staticRow = Rows.EMPTY_STATIC_ROW;
+                    this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null;
+                }
+
+                if (reader == null && shouldCloseFile)
+                    file.close();
+            }
+            catch (IOException e)
+            {
+                sstable.markSuspect();
+                String filePath = file.getPath();
+                if (shouldCloseFile && file != null)
+                {
+                    try
+                    {
+                        file.close();
+                    }
+                    catch (IOException suppressed)
+                    {
+                        e.addSuppressed(suppressed);
+                    }
+                }
+                throw new CorruptSSTableException(e, filePath);
+            }
+        }
+    }
+
+    private static Row readStaticRow(SSTableReader sstable,
+                                     FileDataInput file,
+                                     SerializationHelper helper,
+                                     Columns statics,
+                                     boolean isForThrift,
+                                     UnfilteredDeserializer deserializer) throws IOException
+    {
+        if (!sstable.descriptor.version.storeRows())
+        {
+            if (!sstable.metadata.isCompactTable())
+            {
+                assert deserializer != null;
+                return deserializer.hasNext() && deserializer.nextIsStatic()
+                     ? (Row)deserializer.readNext()
+                     : Rows.EMPTY_STATIC_ROW;
+            }
+
+            // For compact tables, we use statics for the "column_metadata" definition. However, in the old format, those
+            // "column_metadata" are intermingled as any other "cell". In theory, this means that we'd have to do a first
+            // pass to extract the static values. However, for thrift, we'll use the ThriftResultsMerger right away which
+            // will re-merge static values with dynamic ones, so we can just ignore static and read every cell as a
+            // "dynamic" one. For CQL, if the table is a "static compact", then is has only static columns exposed and no
+            // dynamic ones. So we do a pass to extract static columns here, but will have no more work to do. Otherwise,
+            // the table won't have static columns.
+            if (statics.isEmpty() || isForThrift)
+                return Rows.EMPTY_STATIC_ROW;
+
+            assert sstable.metadata.isStaticCompactTable() && !isForThrift;
+
+            // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the
+            // static ones. So we don't have to mark the position to seek back later.
+            return LegacyLayout.extractStaticColumns(sstable.metadata, file, statics);
+        }
+
+        if (!sstable.header.hasStatic())
+            return Rows.EMPTY_STATIC_ROW;
+
+        if (statics.isEmpty())
+        {
+            UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
+            return Rows.EMPTY_STATIC_ROW;
+        }
+        else
+        {
+            return UnfilteredSerializer.serializer.deserializeStaticRow(file, sstable.header, helper);
+        }
+    }
+
+    protected abstract Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile);
+
+    public CFMetaData metadata()
+    {
+        return sstable.metadata;
+    }
+
+    public PartitionColumns columns()
+    {
+        return columns.fetchedColumns();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return key;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return partitionLevelDeletion;
+    }
+
+    public Row staticRow()
+    {
+        return staticRow;
+    }
+
+    public RowStats stats()
+    {
+        // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
+        // SerializationHeader.make() for details) so we use the latter instead.
+        return new RowStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+    }
+
+    public boolean hasNext()
+    {
+        try
+        {
+            return reader != null && reader.hasNext();
+        }
+        catch (IOException e)
+        {
+            try
+            {
+                closeInternal();
+            }
+            catch (IOException suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, reader.file.getPath());
+        }
+    }
+
+    public Unfiltered next()
+    {
+        try
+        {
+            assert reader != null;
+            return reader.next();
+        }
+        catch (IOException e)
+        {
+            try
+            {
+                closeInternal();
+            }
+            catch (IOException suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, reader.file.getPath());
+        }
+    }
+
+    public Iterator<Unfiltered> slice(Slice slice)
+    {
+        try
+        {
+            if (reader == null)
+                return Collections.emptyIterator();
+
+            return reader.slice(slice);
+        }
+        catch (IOException e)
+        {
+            try
+            {
+                closeInternal();
+            }
+            catch (IOException suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, reader.file.getPath());
+        }
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private void closeInternal() throws IOException
+    {
+        // It's important to make closing idempotent since it would bad to double-close 'file' as its a RandomAccessReader
+        // and its close is not idemptotent in the case where we recycle it.
+        if (isClosed)
+            return;
+
+        if (reader != null)
+            reader.close();
+
+        isClosed = true;
+    }
+
+    public void close()
+    {
+        try
+        {
+            closeInternal();
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, reader.file.getPath());
+        }
+    }
+
+    protected abstract class Reader
+    {
+        private final boolean shouldCloseFile;
+        public FileDataInput file;
+
+        protected UnfilteredDeserializer deserializer;
+
+        // Records the currently open range tombstone (if any)
+        protected DeletionTime openMarker = null;
+
+        protected Reader(FileDataInput file, boolean shouldCloseFile)
+        {
+            this.file = file;
+            this.shouldCloseFile = shouldCloseFile;
+            if (file != null)
+                createDeserializer();
+        }
+
+        private void createDeserializer()
+        {
+            assert file != null && deserializer == null;
+            deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion, isForThrift);
+        }
+
+        protected void seekToPosition(long position) throws IOException
+        {
+            // This may be the first time we're actually looking into the file
+            if (file == null)
+            {
+                file = sstable.getFileDataInput(position);
+                createDeserializer();
+            }
+            else
+            {
+                file.seek(position);
+                deserializer.clearState();
+            }
+        }
+
+        protected void updateOpenMarker(RangeTombstoneMarker marker)
+        {
+            // Note that we always read index blocks in forward order so this method is always called in forward order
+            openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
+        }
+
+        protected DeletionTime getAndClearOpenMarker()
+        {
+            DeletionTime toReturn = openMarker;
+            openMarker = null;
+            return toReturn;
+        }
+
+        public abstract boolean hasNext() throws IOException;
+        public abstract Unfiltered next() throws IOException;
+        public abstract Iterator<Unfiltered> slice(Slice slice) throws IOException;
+
+        public void close() throws IOException
+        {
+            if (shouldCloseFile && file != null)
+                file.close();
+        }
+    }
+
+    protected abstract class IndexedReader extends Reader
+    {
+        protected final RowIndexEntry indexEntry;
+        protected final List<IndexHelper.IndexInfo> indexes;
+
+        protected int currentIndexIdx = -1;
+
+        // Marks the beginning of the block corresponding to currentIndexIdx.
+        protected FileMark mark;
+
+        // !isInit means we have never seeked in the file and thus shouldn't read as we could be anywhere
+        protected boolean isInit;
+
+        protected IndexedReader(FileDataInput file, boolean shouldCloseFile, RowIndexEntry indexEntry, boolean isInit)
+        {
+            super(file, shouldCloseFile);
+            this.indexEntry = indexEntry;
+            this.indexes = indexEntry.columnsIndex();
+            this.isInit = isInit;
+        }
+
+        // Should be called when we're at the beginning of blockIdx.
+        protected void updateBlock(int blockIdx) throws IOException
+        {
+            seekToPosition(indexEntry.position + indexes.get(blockIdx).offset);
+
+            currentIndexIdx = blockIdx;
+            openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null;
+            mark = file.mark();
+        }
+
+        public IndexHelper.IndexInfo currentIndex()
+        {
+            return indexes.get(currentIndexIdx);
+        }
+
+        public IndexHelper.IndexInfo previousIndex()
+        {
+            return currentIndexIdx <= 1 ? null : indexes.get(currentIndexIdx - 1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java b/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java
deleted file mode 100644
index 46983e9..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-public interface IColumnIteratorFactory
-{
-    OnDiskAtomIterator create();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
deleted file mode 100644
index 7185eef..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-
-public class IdentityQueryFilter extends SliceQueryFilter
-{
-    /**
-     * Will read entire CF into memory.  Use with caution.
-     */
-    public IdentityQueryFilter()
-    {
-        super(Composites.EMPTY, Composites.EMPTY, false, Integer.MAX_VALUE);
-    }
-
-    @Override
-    protected boolean respectTombstoneThresholds()
-    {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java b/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java
deleted file mode 100644
index 9d1cecb..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.OnDiskAtom;
-
-import java.io.IOException;
-
-
-/*
- * The goal of this encapsulating OnDiskAtomIterator is to delay the use of
- * the filter until columns are actually queried.
- * The reason for that is get_paged_slice because it change the start of
- * the filter after having seen the first row, and so we must not use the
- * filter before the row data is actually queried. However, mergeIterator
- * needs to "fetch" a row in advance. But all it needs is the key and so
- * this IColumnIterator make sure getKey() can be called without triggering
- * the use of the filter itself.
- */
-public class LazyColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-{
-    private final DecoratedKey key;
-    private final IColumnIteratorFactory subIteratorFactory;
-
-    private OnDiskAtomIterator subIterator;
-
-    public LazyColumnIterator(DecoratedKey key, IColumnIteratorFactory subIteratorFactory)
-    {
-        this.key = key;
-        this.subIteratorFactory = subIteratorFactory;
-    }
-
-    private OnDiskAtomIterator getSubIterator()
-    {
-        if (subIterator == null)
-            subIterator = subIteratorFactory.create();
-        return subIterator;
-    }
-
-    protected OnDiskAtom computeNext()
-    {
-        getSubIterator();
-        return subIterator.hasNext() ? subIterator.next() : endOfData();
-    }
-
-    public ColumnFamily getColumnFamily()
-    {
-        return getSubIterator().getColumnFamily();
-    }
-
-    public DecoratedKey getKey()
-    {
-        return key;
-    }
-
-    public void close() throws IOException
-    {
-        if (subIterator != null)
-            subIterator.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java b/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java
deleted file mode 100644
index 21c38f7..0000000
--- a/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.columniterator;
-
-import java.io.IOException;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.utils.CloseableIterator;
-
-public interface OnDiskAtomIterator extends CloseableIterator<OnDiskAtom>
-{
-    /**
-     * @return A ColumnFamily holding metadata for the row being iterated.
-     * Do not modify this CF. Whether it is empty or not is implementation-dependent.
-     */
-    public abstract ColumnFamily getColumnFamily();
-
-    /**
-     * @return the current row key
-     */
-    public DecoratedKey getKey();
-
-    /** clean up any open resources */
-    public void close() throws IOException;
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
new file mode 100644
index 0000000..4fd5205
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.columniterator;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ *  A Cell Iterator over SSTable
+ */
+public class SSTableIterator extends AbstractSSTableIterator
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableIterator.class);
+
+    public SSTableIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift)
+    {
+        this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift);
+    }
+
+    public SSTableIterator(SSTableReader sstable,
+                           FileDataInput file,
+                           DecoratedKey key,
+                           RowIndexEntry indexEntry,
+                           ColumnFilter columns,
+                           boolean isForThrift)
+    {
+        super(sstable, file, key, indexEntry, columns, isForThrift);
+    }
+
+    protected Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+    {
+        return indexEntry.isIndexed()
+             ? new ForwardIndexedReader(indexEntry, file, isAtPartitionStart, shouldCloseFile)
+             : new ForwardReader(file, isAtPartitionStart, shouldCloseFile);
+    }
+
+    public boolean isReverseOrder()
+    {
+        return false;
+    }
+
+    private class ForwardReader extends Reader
+    {
+        private ForwardReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+        {
+            super(file, shouldCloseFile);
+            assert isAtPartitionStart;
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            assert deserializer != null;
+            return deserializer.hasNext();
+        }
+
+        public Unfiltered next() throws IOException
+        {
+            return deserializer.readNext();
+        }
+
+        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        {
+            return new AbstractIterator<Unfiltered>()
+            {
+                private boolean beforeStart = true;
+
+                protected Unfiltered computeNext()
+                {
+                    try
+                    {
+                        // While we're before the start of the slice, we can skip row but we should keep
+                        // track of open range tombstones
+                        if (beforeStart)
+                        {
+                            // Note that the following comparison is not strict. The reason is that the only cases
+                            // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary),
+                            // and if we had a strict inequality and an open RT marker before this, we would issue
+                            // the open marker first, and then return then next later, which would yet in the
+                            // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong.
+                            // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same
+                            // clustering value than the slice, we'll simply record it in 'openMarker').
+                            while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
+                            {
+                                if (deserializer.nextIsRow())
+                                    deserializer.skipNext();
+                                else
+                                    updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+                            }
+
+                            beforeStart = false;
+
+                            // We've reached the beginning of our queried slice. If we have an open marker
+                            // we should return that first.
+                            if (openMarker != null)
+                                return new RangeTombstoneBoundMarker(slice.start(), openMarker);
+                        }
+
+                        if (deserializer.hasNext() && deserializer.compareNextTo(slice.end()) <= 0)
+                        {
+                            Unfiltered next = deserializer.readNext();
+                            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+                                updateOpenMarker((RangeTombstoneMarker)next);
+                            return next;
+                        }
+
+                        // If we have an open marker, we should close it before finishing
+                        if (openMarker != null)
+                            return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
+
+                        return endOfData();
+                    }
+                    catch (IOException e)
+                    {
+                        try
+                        {
+                            close();
+                        }
+                        catch (IOException suppressed)
+                        {
+                            e.addSuppressed(suppressed);
+                        }
+                        sstable.markSuspect();
+                        throw new CorruptSSTableException(e, file.getPath());
+                    }
+                }
+            };
+        }
+    }
+
+    private class ForwardIndexedReader extends IndexedReader
+    {
+        private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile)
+        {
+            super(file, shouldCloseFile, indexEntry, isAtPartitionStart);
+        }
+
+        public boolean hasNext() throws IOException
+        {
+            // If it's called before we've created the file, create it. This then mean
+            // we're reading from the beginning of the partition.
+            if (!isInit)
+            {
+                seekToPosition(indexEntry.position);
+                ByteBufferUtil.skipShortLength(file); // partition key
+                DeletionTime.serializer.skip(file);   // partition deletion
+                if (sstable.header.hasStatic())
+                    UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper);
+                isInit = true;
+            }
+            return deserializer.hasNext();
+        }
+
+        public Unfiltered next() throws IOException
+        {
+            return deserializer.readNext();
+        }
+
+        public Iterator<Unfiltered> slice(final Slice slice) throws IOException
+        {
+            final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex();
+
+            // if our previous slicing already got us the biggest row in the sstable, we're done
+            if (currentIndexIdx >= indexes.size())
+                return Collections.emptyIterator();
+
+            // Find the first index block we'll need to read for the slice.
+            final int startIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, false, currentIndexIdx);
+            if (startIdx >= indexes.size())
+                return Collections.emptyIterator();
+
+            // If that's the last block we were reading, we're already where we want to be. Otherwise,
+            // seek to that first block
+            if (startIdx != currentIndexIdx)
+                updateBlock(startIdx);
+
+            // Find the last index block we'll need to read for the slice.
+            final int endIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, false, startIdx);
+
+            final IndexHelper.IndexInfo startIndex = currentIndex();
+
+            // The index search is based on the last name of the index blocks, so at that point we have that:
+            //   1) indexes[startIdx - 1].lastName < slice.start <= indexes[startIdx].lastName
+            //   2) indexes[endIdx - 1].lastName < slice.end <= indexes[endIdx].lastName
+            // so if startIdx == endIdx and slice.end < indexes[startIdx].firstName, we're guaranteed that the
+            // whole slice is between the previous block end and this bloc start, and thus has no corresponding
+            // data. One exception is if the previous block ends with an openMarker as it will cover our slice
+            // and we need to return it.
+            if (startIdx == endIdx && metadata().comparator.compare(slice.end(), startIndex.firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows())
+                return Collections.emptyIterator();
+
+            return new AbstractIterator<Unfiltered>()
+            {
+                private boolean beforeStart = true;
+                private int currentIndexIdx = startIdx;
+
+                protected Unfiltered computeNext()
+                {
+                    try
+                    {
+                        // While we're before the start of the slice, we can skip row but we should keep
+                        // track of open range tombstones
+                        if (beforeStart)
+                        {
+                            // See ForwardReader equivalent method to see why this inequality is not strict.
+                            while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0)
+                            {
+                                if (deserializer.nextIsRow())
+                                    deserializer.skipNext();
+                                else
+                                    updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+                            }
+
+                            beforeStart = false;
+
+                            // We've reached the beginning of our queried slice. If we have an open marker
+                            // we should return that first.
+                            if (openMarker != null)
+                                return new RangeTombstoneBoundMarker(slice.start(), openMarker);
+                        }
+
+                        // If we've crossed an index block boundary, update our informations
+                        if (currentIndexIdx < indexes.size() && file.bytesPastMark(mark) >= currentIndex().width)
+                            updateBlock(++currentIndexIdx);
+
+                        // Return the next atom unless we've reached the end, or we're beyond our slice
+                        // end (note that unless we're on the last block for the slice, there is no point
+                        // in checking the slice end).
+                        if (currentIndexIdx < indexes.size()
+                            && currentIndexIdx <= endIdx
+                            && deserializer.hasNext()
+                            && (currentIndexIdx != endIdx || deserializer.compareNextTo(slice.end()) <= 0))
+                        {
+                            Unfiltered next = deserializer.readNext();
+                            if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+                                updateOpenMarker((RangeTombstoneMarker)next);
+                            return next;
+                        }
+
+                        // If we have an open marker, we should close it before finishing
+                        if (openMarker != null)
+                            return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker());
+
+                        return endOfData();
+                    }
+                    catch (IOException e)
+                    {
+                        try
+                        {
+                            close();
+                        }
+                        catch (IOException suppressed)
+                        {
+                            e.addSuppressed(suppressed);
+                        }
+                        sstable.markSuspect();
+                        throw new CorruptSSTableException(e, file.getPath());
+                    }
+                }
+            };
+        }
+    }
+}


Mime
View raw message