cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [36/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:48:00 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index dedff6f..bad096f 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -19,158 +19,500 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.service.RowDataResolver;
-import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tracing.Tracing;
 
-public abstract class ReadCommand implements IReadCommand, Pageable
+/**
+ * General interface for storage-engine read commands (common to both range and
+ * single partition commands).
+ * <p>
+ * This contains all the informations needed to do a local read.
+ */
+public abstract class ReadCommand implements ReadQuery
 {
-    public enum Type
+    protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
+
+    public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+
+    private final Kind kind;
+    private final CFMetaData metadata;
+    private final int nowInSec;
+
+    private final ColumnFilter columnFilter;
+    private final RowFilter rowFilter;
+    private final DataLimits limits;
+
+    private boolean isDigestQuery;
+    private final boolean isForThrift;
+
+    protected static abstract class SelectionDeserializer
     {
-        GET_BY_NAMES((byte)1),
-        GET_SLICES((byte)2);
+        public abstract ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+    }
 
-        public final byte serializedValue;
+    protected enum Kind
+    {
+        SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
+        PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
 
-        private Type(byte b)
-        {
-            this.serializedValue = b;
-        }
+        private SelectionDeserializer selectionDeserializer;
 
-        public static Type fromSerializedValue(byte b)
+        private Kind(SelectionDeserializer selectionDeserializer)
         {
-            return b == 1 ? GET_BY_NAMES : GET_SLICES;
+            this.selectionDeserializer = selectionDeserializer;
         }
     }
 
-    public static final ReadCommandSerializer serializer = new ReadCommandSerializer();
+    protected ReadCommand(Kind kind,
+                          boolean isDigestQuery,
+                          boolean isForThrift,
+                          CFMetaData metadata,
+                          int nowInSec,
+                          ColumnFilter columnFilter,
+                          RowFilter rowFilter,
+                          DataLimits limits)
+    {
+        this.kind = kind;
+        this.isDigestQuery = isDigestQuery;
+        this.isForThrift = isForThrift;
+        this.metadata = metadata;
+        this.nowInSec = nowInSec;
+        this.columnFilter = columnFilter;
+        this.rowFilter = rowFilter;
+        this.limits = limits;
+    }
 
-    public MessageOut<ReadCommand> createMessage()
+    protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
+    protected abstract long selectionSerializedSize(int version);
+
+    /**
+     * The metadata for the table queried.
+     *
+     * @return the metadata for the table queried.
+     */
+    public CFMetaData metadata()
     {
-        return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
+        return metadata;
     }
 
-    public final String ksName;
-    public final String cfName;
-    public final ByteBuffer key;
-    public final long timestamp;
-    private boolean isDigestQuery = false;
-    protected final Type commandType;
+    /**
+     * The time in seconds to use as "now" for this query.
+     * <p>
+     * We use the same time as "now" for the whole query to avoid considering different
+     * values as expired during the query, which would be buggy (would throw of counting amongst other
+     * things).
+     *
+     * @return the time (in seconds) to use as "now".
+     */
+    public int nowInSec()
+    {
+        return nowInSec;
+    }
+
+    /**
+     * The configured timeout for this command.
+     *
+     * @return the configured timeout for this command.
+     */
+    public abstract long getTimeout();
 
-    protected ReadCommand(String ksName, ByteBuffer key, String cfName, long timestamp, Type cmdType)
+    /**
+     * A filter on which (non-PK) columns must be returned by the query.
+     *
+     * @return which columns must be fetched by this query.
+     */
+    public ColumnFilter columnFilter()
     {
-        this.ksName = ksName;
-        this.key = key;
-        this.cfName = cfName;
-        this.timestamp = timestamp;
-        this.commandType = cmdType;
+        return columnFilter;
     }
 
-    public static ReadCommand create(String ksName, ByteBuffer key, String cfName, long timestamp, IDiskAtomFilter filter)
+    /**
+     * Filters/Resrictions on CQL rows.
+     * <p>
+     * This contains the restrictions that are not directly handled by the
+     * {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
+     * restrictions and can include some PK columns restrictions when those can't be
+     * satisfied entirely by the clustering index filter (because not all clustering columns
+     * have been restricted for instance). If there is 2ndary indexes on the table,
+     * one of this restriction might be handled by a 2ndary index.
+     *
+     * @return the filter holding the expression that rows must satisfy.
+     */
+    public RowFilter rowFilter()
     {
-        if (filter instanceof SliceQueryFilter)
-            return new SliceFromReadCommand(ksName, key, cfName, timestamp, (SliceQueryFilter)filter);
-        else
-            return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, (NamesQueryFilter)filter);
+        return rowFilter;
     }
 
+    /**
+     * The limits set on this query.
+     *
+     * @return the limits set on this query.
+     */
+    public DataLimits limits()
+    {
+        return limits;
+    }
+
+    /**
+     * Whether this query is a digest one or not.
+     *
+     * @return Whether this query is a digest query.
+     */
     public boolean isDigestQuery()
     {
         return isDigestQuery;
     }
 
+    /**
+     * Sets whether this command should be a digest one or not.
+     *
+     * @param isDigestQuery whether the command should be set as a digest one or not.
+     * @return this read command.
+     */
     public ReadCommand setIsDigestQuery(boolean isDigestQuery)
     {
         this.isDigestQuery = isDigestQuery;
         return this;
     }
 
-    public String getColumnFamilyName()
+    /**
+     * Whether this query is for thrift or not.
+     *
+     * @return whether this query is for thrift.
+     */
+    public boolean isForThrift()
     {
-        return cfName;
+        return isForThrift;
     }
 
+    /**
+     * The clustering index filter this command to use for the provided key.
+     * <p>
+     * Note that that method should only be called on a key actually queried by this command
+     * and in practice, this will almost always return the same filter, but for the sake of
+     * paging, the filter on the first key of a range command might be slightly different.
+     *
+     * @param key a partition key queried by this command.
+     *
+     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
+     */
+    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
+
+    /**
+     * Returns a copy of this command.
+     *
+     * @return a copy of this command.
+     */
     public abstract ReadCommand copy();
 
-    public abstract Row getRow(Keyspace keyspace);
+    /**
+     * Whether the provided row, identified by its primary key components, is selected by
+     * this read command.
+     *
+     * @param partitionKey the partition key for the row to test.
+     * @param clustering the clustering for the row to test.
+     *
+     * @return whether the row of partition key {@code partitionKey} and clustering
+     * {@code clustering} is selected by this command.
+     */
+    public abstract boolean selects(DecoratedKey partitionKey, Clustering clustering);
 
-    public abstract IDiskAtomFilter filter();
+    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 
-    public String getKeyspace()
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
-        return ksName;
+        return isDigestQuery()
+             ? ReadResponse.createDigestResponse(iterator)
+             : ReadResponse.createDataResponse(iterator);
     }
 
-    // maybeGenerateRetryCommand is used to generate a retry for short reads
-    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
+    protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs)
     {
-        return null;
+        return cfs.indexManager.getBestIndexSearcherFor(this);
     }
 
-    // maybeTrim removes columns from a response that is too long
-    public void maybeTrim(Row row)
+    /**
+     * Executes this command on the local host.
+     *
+     * @param cfs the store for the table queried by this command.
+     *
+     * @return an iterator over the result of executing this command locally.
+     */
+    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
+                                  // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
+    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
     {
-        // noop
+        long startTimeNanos = System.nanoTime();
+
+        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
+        SecondaryIndexSearcher searcher = getIndexSearcher(cfs);
+        UnfilteredPartitionIterator resultIterator = searcher == null
+                                         ? queryStorage(cfs, orderGroup)
+                                         : searcher.search(this, orderGroup);
+
+        try
+        {
+            resultIterator = UnfilteredPartitionIterators.convertExpiredCellsToTombstones(resultIterator, nowInSec);
+            resultIterator = withMetricsRecording(withoutExpiredTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
+
+            // TODO: we should push the dropping of columns down the layers because
+            // 1) it'll be more efficient
+            // 2) it could help us solve #6276
+            // But there is not reason not to do this as a followup so keeping it here for now (we'll have
+            // to be wary of cached row if we move this down the layers)
+            if (!metadata().getDroppedColumns().isEmpty())
+                resultIterator = UnfilteredPartitionIterators.removeDroppedColumns(resultIterator, metadata().getDroppedColumns());
+
+            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
+            // no point in checking it again.
+            RowFilter updatedFilter = searcher == null
+                                       ? rowFilter()
+                                       : rowFilter().without(searcher.primaryClause(this));
+
+            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+            // processing we do on it).
+            return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec());
+        }
+        catch (RuntimeException | Error e)
+        {
+            resultIterator.close();
+            throw e;
+        }
     }
 
-    public long getTimeout()
+    protected abstract void recordLatency(ColumnFamilyMetrics metric, long latencyNanos);
+
+    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
     {
-        return DatabaseDescriptor.getReadRpcTimeout();
+        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
     }
-}
 
-class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
-    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+    public ReadOrderGroup startOrderGroup()
+    {
+        return ReadOrderGroup.forCommand(this);
+    }
+
+    /**
+     * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
+     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
+     */
+    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final ColumnFamilyMetrics metric, final long startTimeNanos)
     {
-        out.writeByte(command.commandType.serializedValue);
-        switch (command.commandType)
+        return new WrappingUnfilteredPartitionIterator(iter)
         {
-            case GET_BY_NAMES:
-                SliceByNamesReadCommand.serializer.serialize(command, out, version);
-                break;
-            case GET_SLICES:
-                SliceFromReadCommand.serializer.serialize(command, out, version);
-                break;
-            default:
-                throw new AssertionError();
-        }
+            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
+            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
+
+            private final boolean respectTombstoneThresholds = !ReadCommand.this.metadata().ksName.equals(SystemKeyspace.NAME);
+
+            private int liveRows = 0;
+            private int tombstones = 0;
+
+            private DecoratedKey currentKey;
+
+            @Override
+            public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            {
+                currentKey = iter.partitionKey();
+
+                return new WrappingUnfilteredRowIterator(iter)
+                {
+                    public Unfiltered next()
+                    {
+                        Unfiltered unfiltered = super.next();
+                        if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                        {
+                            Row row = (Row) unfiltered;
+                            if (row.hasLiveData(ReadCommand.this.nowInSec()))
+                                ++liveRows;
+                            for (Cell cell : row)
+                                if (!cell.isLive(ReadCommand.this.nowInSec()))
+                                    countTombstone(row.clustering());
+                        }
+                        else
+                        {
+                            countTombstone(unfiltered.clustering());
+                        }
+
+                        return unfiltered;
+                    }
+
+                    private void countTombstone(ClusteringPrefix clustering)
+                    {
+                        ++tombstones;
+                        if (tombstones > failureThreshold && respectTombstoneThresholds)
+                        {
+                            String query = ReadCommand.this.toCQLString();
+                            Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+                            throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
+                        }
+                    }
+                };
+            }
+
+            @Override
+            public void close()
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    recordLatency(metric, System.nanoTime() - startTimeNanos);
+
+                    metric.tombstoneScannedHistogram.update(tombstones);
+                    metric.liveScannedHistogram.update(liveRows);
+
+                    boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
+                    if (warnTombstones)
+                    {
+                        String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
+                        ClientWarn.warn(msg);
+                        logger.warn(msg);
+                    }
+
+                    Tracing.trace("Read {} live and {} tombstone cells{}", new Object[]{ liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "") });
+                }
+            }
+        };
+    }
+
+    /**
+     * Creates a message for this command.
+     */
+    public MessageOut<ReadCommand> createMessage()
+    {
+        // TODO: we should use different verbs for old message (RANGE_SLICE, PAGED_RANGE)
+        return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
     }
 
-    public ReadCommand deserialize(DataInput in, int version) throws IOException
+    protected abstract void appendCQLWhereClause(StringBuilder sb);
+
+    // Skip expired tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
+    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for expired tombstones (which
+    // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
+    protected UnfilteredPartitionIterator withoutExpiredTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
     {
-        ReadCommand.Type msgType = ReadCommand.Type.fromSerializedValue(in.readByte());
-        switch (msgType)
+        return new TombstonePurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()))
         {
-            case GET_BY_NAMES:
-                return SliceByNamesReadCommand.serializer.deserialize(in, version);
-            case GET_SLICES:
-                return SliceFromReadCommand.serializer.deserialize(in, version);
-            default:
-                throw new AssertionError();
-        }
+            protected long getMaxPurgeableTimestamp()
+            {
+                return Long.MAX_VALUE;
+            }
+        };
     }
 
-    public long serializedSize(ReadCommand command, int version)
+    /**
+     * Recreate the CQL string corresponding to this query.
+     * <p>
+     * Note that in general the returned string will not be exactly the original user string, first
+     * because there isn't always a single syntax for a given query,  but also because we don't have
+     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
+     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
+     * debugging purpose which is what this is for.
+     */
+    public String toCQLString()
     {
-        switch (command.commandType)
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ").append(columnFilter());
+        sb.append(" FROM ").append(metadata().ksName).append(".").append(metadata.cfName);
+        appendCQLWhereClause(sb);
+
+        if (limits() != DataLimits.NONE)
+            sb.append(" ").append(limits());
+        return sb.toString();
+    }
+
+    private static class Serializer implements IVersionedSerializer<ReadCommand>
+    {
+        private static int digestFlag(boolean isDigest)
+        {
+            return isDigest ? 0x01 : 0;
+        }
+
+        private static boolean isDigest(int flags)
         {
-            case GET_BY_NAMES:
-                return 1 + SliceByNamesReadCommand.serializer.serializedSize(command, version);
-            case GET_SLICES:
-                return 1 + SliceFromReadCommand.serializer.serializedSize(command, version);
-            default:
-                throw new AssertionError();
+            return (flags & 0x01) != 0;
+        }
+
+        private static int thriftFlag(boolean isForThrift)
+        {
+            return isForThrift ? 0x02 : 0;
+        }
+
+        private static boolean isForThrift(int flags)
+        {
+            return (flags & 0x02) != 0;
+        }
+
+        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                throw new UnsupportedOperationException();
+
+            out.writeByte(command.kind.ordinal());
+            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+            CFMetaData.serializer.serialize(command.metadata(), out, version);
+            out.writeInt(command.nowInSec());
+            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
+            RowFilter.serializer.serialize(command.rowFilter(), out, version);
+            DataLimits.serializer.serialize(command.limits(), out, version);
+
+            command.serializeSelection(out, version);
+        }
+
+        public ReadCommand deserialize(DataInput in, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                throw new UnsupportedOperationException();
+
+            Kind kind = Kind.values()[in.readByte()];
+            int flags = in.readByte();
+            boolean isDigest = isDigest(flags);
+            boolean isForThrift = isForThrift(flags);
+            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+            int nowInSec = in.readInt();
+            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
+            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
+            DataLimits limits = DataLimits.serializer.deserialize(in, version);
+
+            return kind.selectionDeserializer.deserialize(in, version, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        }
+
+        public long serializedSize(ReadCommand command, int version)
+        {
+            if (version < MessagingService.VERSION_30)
+                throw new UnsupportedOperationException();
+
+            TypeSizes sizes = TypeSizes.NATIVE;
+
+            return 2 // kind + flags
+                 + CFMetaData.serializer.serializedSize(command.metadata(), version, sizes)
+                 + sizes.sizeof(command.nowInSec())
+                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version, sizes)
+                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
+                 + DataLimits.serializer.serializedSize(command.limits(), version)
+                 + command.selectionSerializedSize(version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
new file mode 100644
index 0000000..f85d406
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.Tracing;
+
+public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
+{
+    public void doVerb(MessageIn<ReadCommand> message, int id)
+    {
+        if (StorageService.instance.isBootstrapMode())
+        {
+            throw new RuntimeException("Cannot service reads while bootstrapping!");
+        }
+
+        ReadCommand command = message.payload;
+        ReadResponse response;
+        try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
+        {
+            response = command.createResponse(iterator);
+        }
+
+        MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer);
+
+        Tracing.trace("Enqueuing response to {}", message.from);
+        MessagingService.instance().sendReply(reply, id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
new file mode 100644
index 0000000..0a5bee8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class ReadOrderGroup implements AutoCloseable
+{
+    // For every reads
+    private final OpOrder.Group baseOp;
+
+    // For index reads
+    private final OpOrder.Group indexOp;
+    private final OpOrder.Group writeOp;
+
+    private ReadOrderGroup(OpOrder.Group baseOp, OpOrder.Group indexOp, OpOrder.Group writeOp)
+    {
+        this.baseOp = baseOp;
+        this.indexOp = indexOp;
+        this.writeOp = writeOp;
+    }
+
+    public OpOrder.Group baseReadOpOrderGroup()
+    {
+        return baseOp;
+    }
+
+    public OpOrder.Group indexReadOpOrderGroup()
+    {
+        return indexOp;
+    }
+
+    public OpOrder.Group writeOpOrderGroup()
+    {
+        return writeOp;
+    }
+
+    public static ReadOrderGroup emptyGroup()
+    {
+        return new ReadOrderGroup(null, null, null);
+    }
+
+    public static ReadOrderGroup forCommand(ReadCommand command)
+    {
+        ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
+        ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command);
+
+        if (indexCfs == null)
+        {
+            return new ReadOrderGroup(baseCfs.readOrdering.start(), null, null);
+        }
+        else
+        {
+            OpOrder.Group baseOp = null, indexOp = null, writeOp;
+            // OpOrder.start() shouldn't fail, but better safe than sorry.
+            try
+            {
+                baseOp = baseCfs.readOrdering.start();
+                indexOp = indexCfs.readOrdering.start();
+                // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
+                // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
+                writeOp = baseCfs.keyspace.writeOrder.start();
+                return new ReadOrderGroup(baseOp, indexOp, writeOp);
+            }
+            catch (RuntimeException e)
+            {
+                // Note that must have writeOp == null since ReadOrderGroup ctor can't fail
+                try
+                {
+                    if (baseOp != null)
+                        baseOp.close();
+                }
+                finally
+                {
+                    if (indexOp != null)
+                        indexOp.close();
+                }
+                throw e;
+            }
+        }
+    }
+
+    private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
+    {
+        SecondaryIndexSearcher searcher = command.getIndexSearcher(baseCfs);
+        if (searcher == null)
+            return null;
+
+        SecondaryIndex index = searcher.highestSelectivityIndex(command.rowFilter());
+        return index == null || !(index instanceof AbstractSimplePerColumnSecondaryIndex)
+             ? null
+             : ((AbstractSimplePerColumnSecondaryIndex)index).getIndexCfs();
+    }
+
+    public void close()
+    {
+        try
+        {
+            if (baseOp != null)
+                baseOp.close();
+        }
+        finally
+        {
+            if (indexOp != null)
+            {
+                try
+                {
+                    indexOp.close();
+                }
+                finally
+                {
+                    writeOp.close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
new file mode 100644
index 0000000..3ad0f82
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.PagingState;
+
+/**
+ * Generic abstraction for read queries.
+ * <p>
+ * The main implementation of this is {@link ReadCommand} but we have this interface because
+ * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a
+ * {@code ReadCommand}.
+ */
+public interface ReadQuery
+{
+    public static final ReadQuery EMPTY = new ReadQuery()
+    {
+        public ReadOrderGroup startOrderGroup()
+        {
+            return ReadOrderGroup.emptyGroup();
+        }
+
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+        {
+            return PartitionIterators.EMPTY;
+        }
+
+        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+        {
+            return PartitionIterators.EMPTY;
+        }
+
+        public DataLimits limits()
+        {
+            // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
+            // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
+            // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
+            return DataLimits.cqlLimits(0);
+        }
+
+        public QueryPager getPager(PagingState state)
+        {
+            return QueryPager.EMPTY;
+        }
+
+        public QueryPager getLocalPager()
+        {
+            return QueryPager.EMPTY;
+        }
+    };
+
+    /**
+     * Starts a new read operation.
+     * <p>
+     * This must be called before {@link executeInternal} and passed to it to protect the read.
+     * The returned object <b>must</b> be closed on all path and it is thus strongly advised to
+     * use it in a try-with-ressource construction.
+     *
+     * @return a newly started order group for this {@code ReadQuery}.
+     */
+    public ReadOrderGroup startOrderGroup();
+
+    /**
+     * Executes the query at the provided consistency level.
+     *
+     * @param consistency the consistency level to achieve for the query.
+     * @param clientState the {@code ClientState} for the query. In practice, this can be null unless
+     * {@code consistency} is a serial consistency.
+     *
+     * @return the result of the query.
+     */
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException;
+
+    /**
+     * Execute the query for internal queries (that is, it basically executes the query locally).
+     *
+     * @param orderGroup the {@code ReadOrderGroup} protecting the read.
+     * @return the result of the query.
+     */
+    public PartitionIterator executeInternal(ReadOrderGroup orderGroup);
+
+    /**
+     * Returns a pager for the query.
+     *
+     * @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be
+     * {@code null} if this is the start of paging.
+     *
+     * @return a pager for the query.
+     */
+    public QueryPager getPager(PagingState pagingState);
+
+    /**
+     * The limits for the query.
+     *
+     * @return The limits for the query.
+     */
+    public DataLimits limits();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 39022a4..6453077 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -19,96 +19,219 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
-/*
- * The read response message is sent by the server when reading data
- * this encapsulates the keyspacename and the row that has been read.
- * The keyspace name is needed so that we can use it to create repairs.
- */
-public class ReadResponse
+public abstract class ReadResponse
 {
-    public static final IVersionedSerializer<ReadResponse> serializer = new ReadResponseSerializer();
+    public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
+    public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
 
-    private final Row row;
-    private final ByteBuffer digest;
-
-    public ReadResponse(ByteBuffer digest)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data)
     {
-        assert digest != null;
-        this.digest= digest;
-        this.row = null;
+        return new DataResponse(data);
     }
 
-    public ReadResponse(Row row)
+    public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data)
     {
-        assert row != null;
-        this.row = row;
-        this.digest = null;
+        return new DigestResponse(makeDigest(data));
     }
 
-    public Row row()
-    {
-        return row;
-    }
+    public abstract UnfilteredPartitionIterator makeIterator();
+    public abstract ByteBuffer digest();
+    public abstract boolean isDigestQuery();
 
-    public ByteBuffer digest()
+    protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
     {
-        return digest;
+        MessageDigest digest = FBUtilities.threadLocalMD5Digest();
+        UnfilteredPartitionIterators.digest(iterator, digest);
+        return ByteBuffer.wrap(digest.digest());
     }
 
-    public boolean isDigestQuery()
+    private static class DigestResponse extends ReadResponse
     {
-        return digest != null;
+        private final ByteBuffer digest;
+
+        private DigestResponse(ByteBuffer digest)
+        {
+            assert digest.hasRemaining();
+            this.digest = digest;
+        }
+
+        public UnfilteredPartitionIterator makeIterator()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer digest()
+        {
+            return digest;
+        }
+
+        public boolean isDigestQuery()
+        {
+            return true;
+        }
     }
-}
 
-class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
-{
-    public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
+    private static class DataResponse extends ReadResponse
     {
-        out.writeInt(response.isDigestQuery() ? response.digest().remaining() : 0);
-        ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        out.write(buffer);
-        out.writeBoolean(response.isDigestQuery());
-        if (!response.isDigestQuery())
-            Row.serializer.serialize(response.row(), out, version);
+        // The response, serialized in the current messaging version
+        private final ByteBuffer data;
+        private final SerializationHelper.Flag flag;
+
+        private DataResponse(ByteBuffer data)
+        {
+            this.data = data;
+            this.flag = SerializationHelper.Flag.FROM_REMOTE;
+        }
+
+        private DataResponse(UnfilteredPartitionIterator iter)
+        {
+            try (DataOutputBuffer buffer = new DataOutputBuffer())
+            {
+                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version);
+                this.data = buffer.buffer();
+                this.flag = SerializationHelper.Flag.LOCAL;
+            }
+            catch (IOException e)
+            {
+                // We're serializing in memory so this shouldn't happen
+                throw new RuntimeException(e);
+            }
+        }
+
+        public UnfilteredPartitionIterator makeIterator()
+        {
+            try
+            {
+                DataInput in = new DataInputStream(ByteBufferUtil.inputStream(data));
+                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, flag);
+            }
+            catch (IOException e)
+            {
+                // We're deserializing in memory so this shouldn't happen
+                throw new RuntimeException(e);
+            }
+        }
+
+        public ByteBuffer digest()
+        {
+            try (UnfilteredPartitionIterator iterator = makeIterator())
+            {
+                return makeDigest(iterator);
+            }
+        }
+
+        public boolean isDigestQuery()
+        {
+            return false;
+        }
     }
 
-    public ReadResponse deserialize(DataInput in, int version) throws IOException
+    private static class Serializer implements IVersionedSerializer<ReadResponse>
     {
-        byte[] digest = null;
-        int digestSize = in.readInt();
-        if (digestSize > 0)
+        public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
         {
-            digest = new byte[digestSize];
-            in.readFully(digest, 0, digestSize);
+            if (version < MessagingService.VERSION_30)
+            {
+                // TODO
+                throw new UnsupportedOperationException();
+            }
+
+            boolean isDigest = response.isDigestQuery();
+            ByteBufferUtil.writeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+            if (!isDigest)
+            {
+                // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
+                // version, we'll have to deserialize/re-serialize the data to be in the proper version.
+                assert version == MessagingService.VERSION_30;
+                ByteBuffer data = ((DataResponse)response).data;
+                ByteBufferUtil.writeWithLength(data, out);
+            }
         }
-        boolean isDigest = in.readBoolean();
-        assert isDigest == digestSize > 0;
 
-        Row row = null;
-        if (!isDigest)
+        public ReadResponse deserialize(DataInput in, int version) throws IOException
         {
-            // This is coming from a remote host
-            row = Row.serializer.deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
+            if (version < MessagingService.VERSION_30)
+            {
+                // TODO
+                throw new UnsupportedOperationException();
+            }
+
+            ByteBuffer digest = ByteBufferUtil.readWithShortLength(in);
+            if (digest.hasRemaining())
+                return new DigestResponse(digest);
+
+            assert version == MessagingService.VERSION_30;
+            ByteBuffer data = ByteBufferUtil.readWithLength(in);
+            return new DataResponse(data);
         }
 
-        return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
+        public long serializedSize(ReadResponse response, int version)
+        {
+            if (version < MessagingService.VERSION_30)
+            {
+                // TODO
+                throw new UnsupportedOperationException();
+            }
+
+            TypeSizes sizes = TypeSizes.NATIVE;
+            boolean isDigest = response.isDigestQuery();
+            long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER, sizes);
+
+            if (!isDigest)
+            {
+                // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
+                // version, we'll have to deserialize/re-serialize the data to be in the proper version.
+                assert version == MessagingService.VERSION_30;
+                ByteBuffer data = ((DataResponse)response).data;
+                size += ByteBufferUtil.serializedSizeWithLength(data, sizes);
+            }
+            return size;
+        }
     }
 
-    public long serializedSize(ReadResponse response, int version)
+    private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
     {
-        TypeSizes typeSizes = TypeSizes.NATIVE;
-        ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        int size = typeSizes.sizeof(buffer.remaining());
-        size += buffer.remaining();
-        size += typeSizes.sizeof(response.isDigestQuery());
-        if (!response.isDigestQuery())
-            size += Row.serializer.serializedSize(response.row(), version);
-        return size;
+        public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
+        {
+            // TODO
+            throw new UnsupportedOperationException();
+            //        out.writeInt(rsr.rows.size());
+            //        for (Row row : rsr.rows)
+            //            Row.serializer.serialize(row, out, version);
+        }
+
+        public ReadResponse deserialize(DataInput in, int version) throws IOException
+        {
+            // TODO
+            throw new UnsupportedOperationException();
+            //        int rowCount = in.readInt();
+            //        List<Row> rows = new ArrayList<Row>(rowCount);
+            //        for (int i = 0; i < rowCount; i++)
+            //            rows.add(Row.serializer.deserialize(in, version));
+            //        return new RangeSliceReply(rows);
+        }
+
+        public long serializedSize(ReadResponse response, int version)
+        {
+            // TODO
+            throw new UnsupportedOperationException();
+            //        int size = TypeSizes.NATIVE.sizeof(rsr.rows.size());
+            //        for (Row row : rsr.rows)
+            //            size += Row.serializer.serializedSize(row, version);
+            //        return size;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
deleted file mode 100644
index 8c167ed..0000000
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tracing.Tracing;
-
-public class ReadVerbHandler implements IVerbHandler<ReadCommand>
-{
-    public void doVerb(MessageIn<ReadCommand> message, int id)
-    {
-        if (StorageService.instance.isBootstrapMode())
-        {
-            throw new RuntimeException("Cannot service reads while bootstrapping!");
-        }
-
-        ReadCommand command = message.payload;
-        Keyspace keyspace = Keyspace.open(command.ksName);
-        Row row = command.getRow(keyspace);
-
-        MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
-                                                                      getResponse(command, row),
-                                                                      ReadResponse.serializer);
-        Tracing.trace("Enqueuing response to {}", message.from);
-        MessagingService.instance().sendReply(reply, id, message.from);
-    }
-
-    public static ReadResponse getResponse(ReadCommand command, Row row)
-    {
-        if (command.isDigestQuery())
-        {
-            return new ReadResponse(ColumnFamily.digest(row.cf));
-        }
-        else
-        {
-            return new ReadResponse(row);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
deleted file mode 100644
index 41f5a50..0000000
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-
-public class RetriedSliceFromReadCommand extends SliceFromReadCommand
-{
-    static final Logger logger = LoggerFactory.getLogger(RetriedSliceFromReadCommand.class);
-    public final int originalCount;
-
-    public RetriedSliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter, int originalCount)
-    {
-        super(keyspaceName, key, cfName, timestamp, filter);
-        this.originalCount = originalCount;
-    }
-
-    @Override
-    public ReadCommand copy()
-    {
-        return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery());
-    }
-
-    @Override
-    public int getOriginalRequestedCount()
-    {
-        return originalCount;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "RetriedSliceFromReadCommand(" + "cmd=" + super.toString() + ", originalCount=" + originalCount + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableClustering.java b/src/java/org/apache/cassandra/db/ReusableClustering.java
new file mode 100644
index 0000000..e2760aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReusableClustering.java
@@ -0,0 +1,82 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class ReusableClustering extends Clustering
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new ReusableClustering(0));
+
+    protected final ByteBuffer[] values;
+
+    protected ReusableWriter writer;
+
+    public ReusableClustering(int size)
+    {
+        this.values = new ByteBuffer[size];
+    }
+
+    public int size()
+    {
+        return values.length;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        return values[i];
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        return values;
+    }
+
+    public Writer writer()
+    {
+        if (writer == null)
+            writer = new ReusableWriter();
+        return writer;
+    }
+
+    public void reset()
+    {
+        Arrays.fill(values, null);
+        if (writer != null)
+            writer.reset();
+    }
+
+    protected class ReusableWriter implements Writer
+    {
+        int idx;
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            values[idx++] = value;
+        }
+
+        private void reset()
+        {
+            idx = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
new file mode 100644
index 0000000..d2f19f7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+// Note that we abuse a bit ReusableClustering to store Slice.Bound infos, but it's convenient so ...
+public class ReusableClusteringPrefix extends ReusableClustering
+{
+    private Kind kind;
+    private int size;
+
+    public ReusableClusteringPrefix(int size)
+    {
+        super(size);
+    }
+
+    public ClusteringPrefix get()
+    {
+        // We use ReusableClusteringPrefix when writing sstables (in ColumnIndex) and we
+        // don't write static clustering there.
+        assert kind != Kind.STATIC_CLUSTERING;
+        if (kind == Kind.CLUSTERING)
+        {
+            assert values.length == size;
+            return this;
+        }
+
+        return Slice.Bound.create(kind, Arrays.copyOfRange(values, 0, size));
+    }
+
+    public void copy(ClusteringPrefix clustering)
+    {
+        kind = clustering.kind();
+        for (int i = 0; i < clustering.size(); i++)
+            values[i] = clustering.get(i);
+        size = clustering.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
new file mode 100644
index 0000000..43530b0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public class ReusableLivenessInfo extends AbstractLivenessInfo
+{
+    private long timestamp;
+    private int ttl;
+    private int localDeletionTime;
+
+    public ReusableLivenessInfo()
+    {
+        reset();
+    }
+
+    public LivenessInfo setTo(LivenessInfo info)
+    {
+        return setTo(info.timestamp(), info.ttl(), info.localDeletionTime());
+    }
+
+    public LivenessInfo setTo(long timestamp, int ttl, int localDeletionTime)
+    {
+        this.timestamp = timestamp;
+        this.ttl = ttl;
+        this.localDeletionTime = localDeletionTime;
+        return this;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    public int ttl()
+    {
+        return ttl;
+    }
+
+    public int localDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
+    public void reset()
+    {
+        this.timestamp = LivenessInfo.NO_TIMESTAMP;
+        this.ttl = LivenessInfo.NO_TTL;
+        this.localDeletionTime = LivenessInfo.NO_DELETION_TIME;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java
deleted file mode 100644
index a826894..0000000
--- a/src/java/org/apache/cassandra/db/Row.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class Row
-{
-    public static final RowSerializer serializer = new RowSerializer();
-
-    public final DecoratedKey key;
-    public final ColumnFamily cf;
-
-    public Row(DecoratedKey key, ColumnFamily cf)
-    {
-        assert key != null;
-        // cf may be null, indicating no data
-        this.key = key;
-        this.cf = cf;
-    }
-
-    public Row(ByteBuffer key, ColumnFamily updates)
-    {
-        this(StorageService.getPartitioner().decorateKey(key), updates);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "Row(" +
-               "key=" + key +
-               ", cf=" + cf +
-               ')';
-    }
-
-    public int getLiveCount(IDiskAtomFilter filter, long now)
-    {
-        return cf == null ? 0 : filter.getLiveCount(cf, now);
-    }
-
-    public static class RowSerializer implements IVersionedSerializer<Row>
-    {
-        public void serialize(Row row, DataOutputPlus out, int version) throws IOException
-        {
-            ByteBufferUtil.writeWithShortLength(row.key.getKey(), out);
-            ColumnFamily.serializer.serialize(row.cf, out, version);
-        }
-
-        public Row deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
-        {
-            return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)),
-                           ColumnFamily.serializer.deserialize(in, flag, version));
-        }
-
-        public Row deserialize(DataInput in, int version) throws IOException
-        {
-            return deserialize(in, version, ColumnSerializer.Flag.LOCAL);
-        }
-
-        public long serializedSize(Row row, int version)
-        {
-            int keySize = row.key.getKey().remaining();
-            return TypeSizes.NATIVE.sizeof((short) keySize) + keySize + ColumnFamily.serializer.serializedSize(row.cf, TypeSizes.NATIVE, version);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 4ff61ce..016e26e 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import com.google.common.primitives.Ints;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.IndexHelper;
@@ -45,7 +46,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         this.position = position;
     }
 
-    public int promotedSize(ISerializer<T> idxSerializer)
+    public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header)
     {
         return 0;
     }
@@ -100,34 +101,39 @@ public class RowIndexEntry<T> implements IMeasurableMemory
     public static interface IndexSerializer<T>
     {
         void serialize(RowIndexEntry<T> rie, DataOutputPlus out) throws IOException;
-        RowIndexEntry<T> deserialize(DataInput in, Version version) throws IOException;
+        RowIndexEntry<T> deserialize(DataInput in) throws IOException;
         public int serializedSize(RowIndexEntry<T> rie);
     }
 
     public static class Serializer implements IndexSerializer<IndexHelper.IndexInfo>
     {
-        private final ISerializer<IndexHelper.IndexInfo> idxSerializer;
+        private final CFMetaData metadata;
+        private final Version version;
+        private final SerializationHeader header;
 
-        public Serializer(ISerializer<IndexHelper.IndexInfo> idxSerializer)
+        public Serializer(CFMetaData metadata, Version version, SerializationHeader header)
         {
-            this.idxSerializer = idxSerializer;
+            this.metadata = metadata;
+            this.version = version;
+            this.header = header;
         }
 
         public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, DataOutputPlus out) throws IOException
         {
             out.writeLong(rie.position);
-            out.writeInt(rie.promotedSize(idxSerializer));
+            out.writeInt(rie.promotedSize(metadata, version, header));
 
             if (rie.isIndexed())
             {
                 DeletionTime.serializer.serialize(rie.deletionTime(), out);
                 out.writeInt(rie.columnsIndex().size());
+                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
                 for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    idxSerializer.serialize(info, out);
+                    idxSerializer.serialize(info, out, header);
             }
         }
 
-        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in, Version version) throws IOException
+        public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInput in) throws IOException
         {
             long position = in.readLong();
 
@@ -137,9 +143,10 @@ public class RowIndexEntry<T> implements IMeasurableMemory
                 DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
 
                 int entries = in.readInt();
+                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
                 List<IndexHelper.IndexInfo> columnsIndex = new ArrayList<>(entries);
                 for (int i = 0; i < entries; i++)
-                    columnsIndex.add(idxSerializer.deserialize(in));
+                    columnsIndex.add(idxSerializer.deserialize(in, header));
 
                 return new IndexedEntry(position, deletionTime, columnsIndex);
             }
@@ -166,7 +173,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
         {
-            int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(idxSerializer));
+            int size = TypeSizes.NATIVE.sizeof(rie.position) + TypeSizes.NATIVE.sizeof(rie.promotedSize(metadata, version, header));
 
             if (rie.isIndexed())
             {
@@ -175,8 +182,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory
                 size += DeletionTime.serializer.serializedSize(rie.deletionTime(), TypeSizes.NATIVE);
                 size += TypeSizes.NATIVE.sizeof(index.size());
 
+                IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
                 for (IndexHelper.IndexInfo info : index)
-                    size += idxSerializer.serializedSize(info, TypeSizes.NATIVE);
+                    size += idxSerializer.serializedSize(info, header, TypeSizes.NATIVE);
             }
 
 
@@ -217,13 +225,14 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
-        public int promotedSize(ISerializer<IndexHelper.IndexInfo> idxSerializer)
+        public int promotedSize(CFMetaData metadata, Version version, SerializationHeader header)
         {
             TypeSizes typeSizes = TypeSizes.NATIVE;
             long size = DeletionTime.serializer.serializedSize(deletionTime, typeSizes);
             size += typeSizes.sizeof(columnsIndex.size()); // number of entries
+            IndexHelper.IndexInfo.Serializer idxSerializer = metadata.serializers().indexSerializer(version);
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += idxSerializer.serializedSize(info, typeSizes);
+                size += idxSerializer.serializedSize(info, header, typeSizes);
 
             return Ints.checkedCast(size);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
deleted file mode 100644
index 3473e96..0000000
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
-import org.apache.cassandra.db.columniterator.LazyColumnIterator;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-public class RowIteratorFactory
-{
-
-    private static final Comparator<OnDiskAtomIterator> COMPARE_BY_KEY = new Comparator<OnDiskAtomIterator>()
-    {
-        public int compare(OnDiskAtomIterator o1, OnDiskAtomIterator o2)
-        {
-            return DecoratedKey.comparator.compare(o1.getKey(), o2.getKey());
-        }
-    };
-
-
-    /**
-     * Get a row iterator over the provided memtables and sstables, between the provided keys
-     * and filtered by the queryfilter.
-     * @param memtables Memtables pending flush.
-     * @param sstables SStables to scan through.
-     * @param range The data range to fetch
-     * @param cfs
-     * @return A row iterator following all the given restrictions
-     */
-    public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
-                                                     final Collection<SSTableReader> sstables,
-                                                     final DataRange range,
-                                                     final ColumnFamilyStore cfs,
-                                                     final long now)
-    {
-        // fetch data from current memtable, historical memtables, and SSTables in the correct order.
-        final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<>(Iterables.size(memtables) + sstables.size());
-
-        for (Memtable memtable : memtables)
-            iterators.add(new ConvertToColumnIterator(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
-
-        for (SSTableReader sstable : sstables)
-            iterators.add(sstable.getScanner(range));
-
-        // reduce rows from all sources into a single row
-        return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
-        {
-            private final int gcBefore = cfs.gcBefore(now);
-            private final List<OnDiskAtomIterator> colIters = new ArrayList<>();
-            private DecoratedKey key;
-            private ColumnFamily returnCF;
-
-            @Override
-            protected void onKeyChange()
-            {
-                this.returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, range.columnFilter.isReversed());
-            }
-
-            public void reduce(OnDiskAtomIterator current)
-            {
-                this.colIters.add(current);
-                this.key = current.getKey();
-                this.returnCF.delete(current.getColumnFamily());
-            }
-
-            protected Row getReduced()
-            {
-                // First check if this row is in the rowCache. If it is and it covers our filter, we can skip the rest
-                ColumnFamily cached = cfs.getRawCachedRow(key);
-                IDiskAtomFilter filter = range.columnFilter(key.getKey());
-
-                try
-                {
-                    if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
-                    {
-                        // not cached: collate
-                        QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, key, gcBefore, now);
-                    }
-                    else
-                    {
-                        QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
-                        returnCF = cfs.filterColumnFamily(cached, keyFilter);
-                    }
-                }
-                catch(TombstoneOverwhelmingException e)
-                {
-                    e.setKey(key);
-                    throw e;
-                }
-
-                Row rv = new Row(key, returnCF);
-                colIters.clear();
-                key = null;
-                return rv;
-            }
-        });
-    }
-
-    /**
-     * Get a ColumnIterator for a specific key in the memtable.
-     */
-    private static class ConvertToColumnIterator implements CloseableIterator<OnDiskAtomIterator>
-    {
-        private final DataRange range;
-        private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter;
-
-        public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter)
-        {
-            this.range = range;
-            this.iter = iter;
-        }
-
-        public boolean hasNext()
-        {
-            return iter.hasNext();
-        }
-
-        /*
-         * Note that when doing get_paged_slice, we reset the start of the queryFilter after we've fetched the
-         * first row. This means that this iterator should not use in any way the filter to fetch a row before
-         * we call next(). Which prevents us for using guava AbstractIterator.
-         * This is obviously rather fragile and we should consider refactoring that code, but such refactor will go
-         * deep into the storage engine code so this will have to do until then.
-         */
-        public OnDiskAtomIterator next()
-        {
-            final Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
-            return new LazyColumnIterator(entry.getKey(), new IColumnIteratorFactory()
-            {
-                public OnDiskAtomIterator create()
-                {
-                    return range.columnFilter(entry.getKey().getKey()).getColumnIterator(entry.getKey(), entry.getValue());
-                }
-            });
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void close()
-        {
-            // pass
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
deleted file mode 100644
index 3fa0465..0000000
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public interface RowPosition extends RingPosition<RowPosition>
-{
-    public static enum Kind
-    {
-        // Only add new values to the end of the enum, the ordinal is used
-        // during serialization
-        ROW_KEY, MIN_BOUND, MAX_BOUND;
-
-        private static final Kind[] allKinds = Kind.values();
-
-        static Kind fromOrdinal(int ordinal)
-        {
-            return allKinds[ordinal];
-        }
-    }
-
-    public static final class ForKey
-    {
-        public static RowPosition get(ByteBuffer key, IPartitioner p)
-        {
-            return key == null || key.remaining() == 0 ? p.getMinimumToken().minKeyBound() : p.decorateKey(key);
-        }
-    }
-
-    public static final RowPositionSerializer serializer = new RowPositionSerializer();
-
-    public Kind kind();
-    public boolean isMinimum();
-
-    public static class RowPositionSerializer implements IPartitionerDependentSerializer<RowPosition>
-    {
-        /*
-         * We need to be able to serialize both Token.KeyBound and
-         * DecoratedKey. To make this compact, we first write a byte whose
-         * meaning is:
-         *   - 0: DecoratedKey
-         *   - 1: a 'minimum' Token.KeyBound
-         *   - 2: a 'maximum' Token.KeyBound
-         * In the case of the DecoratedKey, we then serialize the key (the
-         * token is recreated on the other side). In the other cases, we then
-         * serialize the token.
-         */
-        public void serialize(RowPosition pos, DataOutputPlus out, int version) throws IOException
-        {
-            Kind kind = pos.kind();
-            out.writeByte(kind.ordinal());
-            if (kind == Kind.ROW_KEY)
-                ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out);
-            else
-                Token.serializer.serialize(pos.getToken(), out, version);
-        }
-
-        public RowPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException
-        {
-            Kind kind = Kind.fromOrdinal(in.readByte());
-            if (kind == Kind.ROW_KEY)
-            {
-                ByteBuffer k = ByteBufferUtil.readWithShortLength(in);
-                return StorageService.getPartitioner().decorateKey(k);
-            }
-            else
-            {
-                Token t = Token.serializer.deserialize(in, p, version);
-                return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound();
-            }
-        }
-
-        public long serializedSize(RowPosition pos, int version)
-        {
-            Kind kind = pos.kind();
-            int size = 1; // 1 byte for enum
-            if (kind == Kind.ROW_KEY)
-            {
-                int keySize = ((DecoratedKey)pos).getKey().remaining();
-                size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
-            }
-            else
-            {
-                size += Token.serializer.serializedSize(pos.getToken(), version);
-            }
-            return size;
-        }
-    }
-}


Mime
View raw message