cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [1/4] cassandra git commit: On-wire backward compatibility for 3.0
Date Fri, 07 Aug 2015 22:44:40 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk ed9343edf -> 288f2cf4f


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 4baf6a3..bb2fbf1 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -103,38 +103,6 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         return l.toArray(new ByteBuffer[l.size()]);
     }
 
-    public static class CompositeComponent
-    {
-        public AbstractType<?> comparator;
-        public ByteBuffer   value;
-
-        public CompositeComponent( AbstractType<?> comparator, ByteBuffer value )
-        {
-            this.comparator = comparator;
-            this.value      = value;
-        }
-    }
-
-    public List<CompositeComponent> deconstruct( ByteBuffer bytes )
-    {
-        List<CompositeComponent> list = new ArrayList<CompositeComponent>();
-
-        ByteBuffer bb = bytes.duplicate();
-        readIsStatic(bb);
-        int i = 0;
-
-        while (bb.remaining() > 0)
-        {
-            AbstractType comparator = getComparator(i, bb);
-            ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb);
-
-            list.add( new CompositeComponent(comparator,value) );
-
-            byte b = bb.get(); // Ignore; not relevant here
-            ++i;
-        }
-        return list;
-    }
 
     /*
      * Escapes all occurences of the ':' character from the input, replacing them by "\:".

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 01eb58f..633a994 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -218,6 +218,32 @@ public class CompositeType extends AbstractCompositeType
         return null;
     }
 
+    public static class CompositeComponent
+    {
+        public ByteBuffer value;
+        public byte eoc;
+
+        public CompositeComponent(ByteBuffer value, byte eoc)
+        {
+            this.value = value;
+            this.eoc = eoc;
+        }
+    }
+
+    public static List<CompositeComponent> deconstruct(ByteBuffer bytes)
+    {
+        List<CompositeComponent> list = new ArrayList<>();
+        ByteBuffer bb = bytes.duplicate();
+        readStatic(bb);
+        while (bb.remaining() > 0)
+        {
+            ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb);
+            byte eoc = bb.get();
+            list.add(new CompositeComponent(value, eoc));
+        }
+        return list;
+    }
+
     // Extract CQL3 column name from the full column name.
     public ByteBuffer extractLastComponent(ByteBuffer bb)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
index acdd0e2..0b218f5 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -189,7 +189,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition,
Iterab
         return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false);
     }
 
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection,
boolean reversed)
+    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection,
boolean reversed)
     {
         return new SliceableIterator(this, selection, reversed);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f2e0617..bb73929 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -23,12 +23,15 @@ import java.util.*;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -36,9 +39,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Stores updates made on a partition.
@@ -494,7 +498,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     @Override
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns,
boolean reversed)
+    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns,
boolean reversed)
     {
         maybeBuild();
         return super.sliceableUnfilteredIterator(columns, reversed);
@@ -503,7 +507,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     /**
      * Validates the data contained in this update.
      *
-     * @throws MarshalException if some of the data contained in this update is corrupted.
+     * @throws org.apache.cassandra.serializers.MarshalException if some of the data contained
in this update is corrupted.
      */
     public void validate()
     {
@@ -701,37 +705,19 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     {
         public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws
IOException
         {
-            if (version < MessagingService.VERSION_30)
-            {
-                // TODO
-                throw new UnsupportedOperationException();
-
-                // if (cf == null)
-                // {
-                //     out.writeBoolean(false);
-                //     return;
-                // }
-
-                // out.writeBoolean(true);
-                // serializeCfId(cf.id(), out, version);
-                // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(),
out, version);
-                // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-                // int count = cf.getColumnCount();
-                // out.writeInt(count);
-                // int written = 0;
-                // for (Cell cell : cf)
-                // {
-                //     columnSerializer.serialize(cell, out);
-                //     written++;
-                // }
-                // assert count == written: "Table had " + count + " columns, but " + written
+ " written";
-            }
-
-            CFMetaData.serializer.serialize(update.metadata(), out, version);
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
                 assert !iter.isReverseOrder();
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version,
update.rows.size());
+
+                if (version < MessagingService.VERSION_30)
+                {
+                    LegacyLayout.serializeAsLegacyPartition(iter, out, version);
+                }
+                else
+                {
+                    CFMetaData.serializer.serialize(update.metadata(), out, version);
+                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version,
update.rows.size());
+                }
             }
         }
 
@@ -745,9 +731,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             else
             {
                 assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                DecoratedKey dk = metadata.decorateKey(key);
-                return deserializePre30(in, version, flag, metadata, dk);
+                return deserializePre30(in, version, flag, key);
             }
         }
 
@@ -761,8 +745,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             else
             {
                 assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                return deserializePre30(in, version, flag, metadata, key);
+                return deserializePre30(in, version, flag, key.getKey());
             }
         }
 
@@ -802,48 +785,22 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                                        false);
         }
 
-        private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws
IOException
+        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag
flag, ByteBuffer key) throws IOException
         {
-            // This is only used in mutation, and mutation have never allowed "null" column
families
-            boolean present = in.readBoolean();
-            assert present;
-
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            return metadata;
-        }
-
-        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag
flag, CFMetaData metadata, DecoratedKey dk) throws IOException
-        {
-            LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata,
in, version);
-            int size = in.readInt();
-            Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata,
in, flag, size);
-            SerializationHelper helper = new SerializationHelper(metadata, version, flag);
-            try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata,
dk, info, cells, false, helper))
+            try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in,
version, flag, key))
             {
+                assert iterator != null; // This is only used in mutation, and mutation have
never allowed "null" column families
                 return PartitionUpdate.fromIterator(iterator);
             }
         }
 
         public long serializedSize(PartitionUpdate update, int version)
         {
-            if (version < MessagingService.VERSION_30)
-            {
-                // TODO
-                throw new UnsupportedOperationException("Version is " + version);
-                //if (cf == null)
-                //{
-                //    return TypeSizes.sizeof(false);
-                //}
-                //else
-                //{
-                //    return TypeSizes.sizeof(true)  /* nullness bool */
-                //        + cfIdSerializedSize(cf.id(), typeSizes, version)  /* id */
-                //        + contentSerializedSize(cf, typeSizes, version);
-                //}
-            }
-
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
+                if (version < MessagingService.VERSION_30)
+                    return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
+
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version,
update.rows.size());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index dd625c4..0418e7f 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -22,13 +22,19 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.*;
 
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
 
 /**
  * Static methods to work with partition iterators.
@@ -357,8 +363,7 @@ public abstract class UnfilteredPartitionIterators
     {
         public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version)
throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
+            assert version >= MessagingService.VERSION_30; // We handle backward compatibility
directy in ReadResponse.LegacyRangeSliceReplySerializer
 
             out.writeBoolean(iter.isForThrift());
             while (iter.hasNext())
@@ -374,9 +379,7 @@ public abstract class UnfilteredPartitionIterators
 
         public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int
version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
-
+            assert version >= MessagingService.VERSION_30; // We handle backward compatibility
directy in ReadResponse.LegacyRangeSliceReplySerializer
             final boolean isForThrift = in.readBoolean();
 
             return new AbstractUnfilteredPartitionIterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
index 7e9ceb8..8f9e921 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
@@ -23,11 +23,13 @@ import java.util.function.Predicate;
 
 import com.google.common.base.Function;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -363,6 +365,11 @@ public class BTreeBackedRow extends AbstractRow
             ((ComplexColumnData) current).setValue(path, value);
     }
 
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+    {
+        return () -> new CellInLegacyOrderIterator(metadata);
+    }
+
     private class CellIterator extends AbstractIterator<Cell>
     {
         private Iterator<ColumnData> columnData = iterator();
@@ -392,6 +399,61 @@ public class BTreeBackedRow extends AbstractRow
         }
     }
 
+    private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
+    {
+        private final AbstractType<?> comparator;
+        private final int firstComplexIdx;
+        private int simpleIdx;
+        private int complexIdx;
+        private Iterator<Cell> complexCells;
+        private final Object[] data;
+
+        private CellInLegacyOrderIterator(CFMetaData metadata)
+        {
+            this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC
: ColumnDefinition.Kind.REGULAR);
+
+            // copy btree into array for simple separate iteration of simple and complex
columns
+            this.data = new Object[BTree.size(btree)];
+            BTree.toArray(btree, data, 0);
+
+            int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof
ComplexColumnData);
+            this.firstComplexIdx = idx < 0 ? data.length : idx;
+            this.complexIdx = firstComplexIdx;
+        }
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (simpleIdx >= firstComplexIdx)
+                {
+                    if (complexIdx >= data.length)
+                        return endOfData();
+
+                    complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+                else
+                {
+                    if (complexIdx >= data.length)
+                        return (Cell)data[simpleIdx++];
+
+                    if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes,
((ColumnData) data[complexIdx]).column().name.bytes) < 0)
+                        return (Cell)data[simpleIdx++];
+                    else
+                        complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+            }
+        }
+    }
+
     public static class Builder implements Row.Builder
     {
         // a simple marker class that will sort to the beginning of a run of complex cells
to store the deletion time

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 0c3dc2d..33ad447 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -146,6 +146,18 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
     public Iterable<Cell> cells();
 
     /**
+     * An iterable over the cells of this row that return cells in "legacy order".
+     * <p>
+     * In 3.0+, columns are sorted so that all simple columns are before all complex columns.
Previously
+     * however, the cells where just sorted by the column name. This iterator return cells
in that
+     * legacy order. It's only ever meaningful for backward/thrift compatibility code.
+     *
+     * @param metadata the table this is a row of.
+     * @return an iterable over the cells of this row in "legacy order".
+     */
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata);
+
+    /**
      * Whether the row stores any (non-live) complex deletion for any complex column.
      */
     public boolean hasComplexDeletion();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 2c7932b..25eb0d0 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -210,8 +210,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
-        //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
-        //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
+        put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+        put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 487a14c..16a3e6e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -99,7 +99,7 @@ public abstract class AbstractReadExecutor
                 traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest"
: "data", endpoint);
             logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data",
endpoint);
             if (message == null)
-                message = readCommand.createMessage();
+                message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint));
             MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
         }
 
@@ -277,7 +277,8 @@ public abstract class AbstractReadExecutor
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
-                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(),
extraReplica, handler);
+                int version = MessagingService.instance().getVersion(extraReplica);
+                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version),
extraReplica, handler);
                 speculated = true;
 
                 cfs.metric.speculativeRetries.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index a1b5c96..6bfe94a 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         ReadResponse response = responses.iterator().next().payload;
-        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata()),
command.nowInSec());
+        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(),
command), command.nowInSec());
     }
 
     public PartitionIterator resolve()
@@ -62,7 +62,7 @@ public class DataResolver extends ResponseResolver
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command.metadata()));
+            iters.add(msg.payload.makeIterator(command.metadata(), command));
             sources[i] = msg.from;
         }
 
@@ -406,12 +406,12 @@ public class DataResolver extends ResponseResolver
                 if (StorageProxy.canDoLocalRequest(source))
                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
                 else
-                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(),
source, handler);
+                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
source, handler);
 
                 // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata()),
retryCommand);
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(),
command), retryCommand);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 42aee04..db8adf3 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()),
command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(),
command), command.nowInSec());
     }
 
     /*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
         {
             ReadResponse response = message.payload;
 
-            ByteBuffer newDigest = response.digest(command.metadata());
+            ByteBuffer newDigest = response.digest(command.metadata(), command);
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
         if (logger.isDebugEnabled())
             logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start));
 
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()),
command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(),
command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index d548019..8b1ef32 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -238,9 +238,11 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel,
endpoints.size());
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver,
endpoints.size());
 
-                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : endpoints)
+                {
+                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
                     MessagingService.instance().sendRR(message, endpoint, repairHandler);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2c3c018..1e1f847 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1621,9 +1621,9 @@ public class StorageProxy implements StorageProxyMBean
                                                  keyspace,
                                                  executor.handler.endpoints);
 
-                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : executor.getContactedReplicas())
                 {
+                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing full data read to {}", endpoint);
                     MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
                 }
@@ -1974,9 +1974,9 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
-                MessageOut<ReadCommand> message = rangeCommand.createMessage();
                 for (InetAddress endpoint : toQuery.filteredEndpoints)
                 {
+                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing request to {}", endpoint);
                     MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a8ac8bf..959f7e3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -296,8 +296,8 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION,
new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR,
new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new
ReadCommandVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE,
new ReadCommandVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE,
new ReadCommandVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE,
new RangeSliceVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE,
new RangeSliceVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION,
new CounterMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE,
new TruncateVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE,
new PrepareVerbHandler());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 169944b..2e57a8b 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -47,7 +47,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
         if (state != null)
         {
             lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
-            lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            lastReturnedClustering = state.cellName.hasRemaining()
+                                   ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
+                                   : null;
             restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index bb223b8..28c5206 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -24,11 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
 
 /**
  * Common interface to single partition queries (by slice and by name).
@@ -50,7 +46,9 @@ public class SinglePartitionPager extends AbstractQueryPager
 
         if (state != null)
         {
-            lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            lastReturned = state.cellName.hasRemaining()
+                         ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
+                         : null;
             restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 9353d16..733067e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -237,7 +237,7 @@ public class CassandraServer implements Cassandra.Iface
         if (partition.isEmpty())
             return EMPTY_COLUMNS;
 
-        Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition);
+        Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition).right;
         List<ColumnOrSuperColumn> result;
         if (partition.metadata().isSuper())
         {
@@ -932,7 +932,7 @@ public class CassandraServer implements Cassandra.Iface
             {
                 return result == null
                      ? new CASResult(true)
-                     : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata,
LegacyLayout.fromRowIterator(result)));
+                     : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata,
LegacyLayout.fromRowIterator(result).right));
             }
         }
         catch (UnknownColumnException e)


Mime
View raw message