cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject cassandra git commit: Token serialization should accept partitioner explicitly
Date Thu, 29 Jan 2015 21:48:53 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 6145d50f5 -> 806facc8c


Token serialization should accept partitioner explicitly

patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-8268


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/806facc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/806facc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/806facc8

Branch: refs/heads/trunk
Commit: 806facc8ca87a8d1f6fa14056c68ac43dc5bde5c
Parents: 6145d50
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Fri Jan 30 00:46:44 2015 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Jan 30 00:48:12 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +-
 .../apache/cassandra/db/PagedRangeCommand.java  |  3 +-
 .../apache/cassandra/db/RangeSliceCommand.java  |  3 +-
 .../org/apache/cassandra/db/RowPosition.java    | 16 ++---
 .../apache/cassandra/dht/AbstractBounds.java    | 33 +++++------
 .../dht/IPartitionerDependentSerializer.java    | 61 ++++++++++++++++++++
 src/java/org/apache/cassandra/dht/Token.java    | 19 +++---
 .../apache/cassandra/net/MessagingService.java  | 14 +++++
 .../apache/cassandra/repair/RepairJobDesc.java  |  3 +-
 .../repair/messages/AnticompactionRequest.java  | 10 +++-
 .../repair/messages/PrepareMessage.java         |  8 ++-
 .../repair/messages/RepairMessage.java          |  2 +-
 .../cassandra/repair/messages/SyncRequest.java  |  6 +-
 .../cassandra/streaming/StreamRequest.java      | 14 +++--
 .../org/apache/cassandra/utils/MerkleTree.java  | 43 +++++++-------
 15 files changed, 163 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0cd0b4d..a85a6e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
  * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
  * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
  * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
@@ -137,8 +138,6 @@
  * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
  * Fix sstableupgrade throws exception (CASSANDRA-8688)
 Merged from 2.0:
-=======
-2.0.13:
  * Fix SSTableSimpleUnsortedWriter ConcurrentModificationException (CASSANDRA-8619)
  * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
  * Add batch remove iterator to ABSC (CASSANDRA-8414, 8666)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
index 614f0f7..ebedecf 100644
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -129,6 +129,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
             out.writeUTF(cmd.columnFamily);
             out.writeLong(cmd.timestamp);
 
+            MessagingService.validatePartitioner(cmd.keyRange);
             AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
 
             CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
@@ -158,7 +159,7 @@ public class PagedRangeCommand extends AbstractRangeCommand
             String columnFamily = in.readUTF();
             long timestamp = in.readLong();
 
-            AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in,
version).toRowBounds();
+            AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in,
MessagingService.globalPartitioner(), version).toRowBounds();
 
             CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 4d2955b..6009524 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -172,6 +172,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
                 expr.writeTo(out);
             }
         }
+        MessagingService.validatePartitioner(sliceCommand.keyRange);
         AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version);
         out.writeInt(sliceCommand.maxResults);
         out.writeBoolean(sliceCommand.countCQL3Rows);
@@ -195,7 +196,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         {
             rowFilter.add(IndexExpression.readFrom(in));
         }
-        AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in,
version).toRowBounds();
+        AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in,
MessagingService.globalPartitioner(), version).toRowBounds();
 
         int maxResults = in.readInt();
         boolean countCQL3Rows = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/db/RowPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowPosition.java b/src/java/org/apache/cassandra/db/RowPosition.java
index 3bcd627..3fa0465 100644
--- a/src/java/org/apache/cassandra/db/RowPosition.java
+++ b/src/java/org/apache/cassandra/db/RowPosition.java
@@ -56,7 +56,7 @@ public interface RowPosition extends RingPosition<RowPosition>
     public Kind kind();
     public boolean isMinimum();
 
-    public static class RowPositionSerializer implements ISerializer<RowPosition>
+    public static class RowPositionSerializer implements IPartitionerDependentSerializer<RowPosition>
     {
         /*
          * We need to be able to serialize both Token.KeyBound and
@@ -69,17 +69,17 @@ public interface RowPosition extends RingPosition<RowPosition>
          * token is recreated on the other side). In the other cases, we then
          * serialize the token.
          */
-        public void serialize(RowPosition pos, DataOutputPlus out) throws IOException
+        public void serialize(RowPosition pos, DataOutputPlus out, int version) throws IOException
         {
             Kind kind = pos.kind();
             out.writeByte(kind.ordinal());
             if (kind == Kind.ROW_KEY)
                 ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out);
             else
-                Token.serializer.serialize(pos.getToken(), out);
+                Token.serializer.serialize(pos.getToken(), out, version);
         }
 
-        public RowPosition deserialize(DataInput in) throws IOException
+        public RowPosition deserialize(DataInput in, IPartitioner p, int version) throws
IOException
         {
             Kind kind = Kind.fromOrdinal(in.readByte());
             if (kind == Kind.ROW_KEY)
@@ -89,23 +89,23 @@ public interface RowPosition extends RingPosition<RowPosition>
             }
             else
             {
-                Token t = Token.serializer.deserialize(in);
+                Token t = Token.serializer.deserialize(in, p, version);
                 return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound();
             }
         }
 
-        public long serializedSize(RowPosition pos, TypeSizes typeSizes)
+        public long serializedSize(RowPosition pos, int version)
         {
             Kind kind = pos.kind();
             int size = 1; // 1 byte for enum
             if (kind == Kind.ROW_KEY)
             {
                 int keySize = ((DecoratedKey)pos).getKey().remaining();
-                size += typeSizes.sizeof((short) keySize) + keySize;
+                size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
             }
             else
             {
-                size += Token.serializer.serializedSize(pos.getToken(), typeSizes);
+                size += Token.serializer.serializedSize(pos.getToken(), version);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index c7a3505..f045acf 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -20,13 +20,12 @@ package org.apache.cassandra.dht;
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.List;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.Pair;
 
@@ -122,7 +121,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>>
implements Seria
 
     public abstract AbstractBounds<T> withNewRight(T newRight);
 
-    public static class AbstractBoundsSerializer implements IVersionedSerializer<AbstractBounds<?>>
+    public static class AbstractBoundsSerializer implements IPartitionerDependentSerializer<AbstractBounds<?>>
     {
         public void serialize(AbstractBounds<?> range, DataOutputPlus out, int version)
throws IOException
         {
@@ -133,13 +132,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>>
implements Seria
             out.writeInt(kindInt(range));
             if (range.left instanceof Token)
             {
-                Token.serializer.serialize((Token) range.left, out);
-                Token.serializer.serialize((Token) range.right, out);
+                Token.serializer.serialize((Token) range.left, out, version);
+                Token.serializer.serialize((Token) range.right, out, version);
             }
             else
             {
-                RowPosition.serializer.serialize((RowPosition) range.left, out);
-                RowPosition.serializer.serialize((RowPosition) range.right, out);
+                RowPosition.serializer.serialize((RowPosition) range.left, out, version);
+                RowPosition.serializer.serialize((RowPosition) range.right, out, version);
             }
         }
 
@@ -151,7 +150,7 @@ public abstract class AbstractBounds<T extends RingPosition<T>>
implements Seria
             return kind;
         }
 
-        public AbstractBounds<?> deserialize(DataInput in, int version) throws IOException
+        public AbstractBounds<?> deserialize(DataInput in, IPartitioner p, int version)
throws IOException
         {
             int kind = in.readInt();
             boolean isToken = kind >= 0;
@@ -161,13 +160,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>>
implements Seria
             RingPosition<?> left, right;
             if (isToken)
             {
-                left = Token.serializer.deserialize(in);
-                right = Token.serializer.deserialize(in);
+                left = Token.serializer.deserialize(in, p, version);
+                right = Token.serializer.deserialize(in, p, version);
             }
             else
             {
-                left = RowPosition.serializer.deserialize(in);
-                right = RowPosition.serializer.deserialize(in);
+                left = RowPosition.serializer.deserialize(in, p, version);
+                right = RowPosition.serializer.deserialize(in, p, version);
             }
 
             if (kind == Type.RANGE.ordinal())
@@ -180,13 +179,13 @@ public abstract class AbstractBounds<T extends RingPosition<T>>
implements Seria
             int size = TypeSizes.NATIVE.sizeof(kindInt(ab));
             if (ab.left instanceof Token)
             {
-                size += Token.serializer.serializedSize((Token) ab.left, TypeSizes.NATIVE);
-                size += Token.serializer.serializedSize((Token) ab.right, TypeSizes.NATIVE);
+                size += Token.serializer.serializedSize((Token) ab.left, version);
+                size += Token.serializer.serializedSize((Token) ab.right, version);
             }
             else
             {
-                size += RowPosition.serializer.serializedSize((RowPosition) ab.left, TypeSizes.NATIVE);
-                size += RowPosition.serializer.serializedSize((RowPosition) ab.right, TypeSizes.NATIVE);
+                size += RowPosition.serializer.serializedSize((RowPosition) ab.left, version);
+                size += RowPosition.serializer.serializedSize((RowPosition) ab.right, version);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
new file mode 100644
index 0000000..3a9a768
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/IPartitionerDependentSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Versioned serializer where the serialization depends on partitioner.
+ *
+ * On serialization the partitioner is given by the entity being serialized. To deserialize
the partitioner used must
+ * be known to the calling method.
+ */
+public interface IPartitionerDependentSerializer<T>
+{
+    /**
+     * Serialize the specified type into the specified DataOutputStream instance.
+     *
+     * @param t type that needs to be serialized
+     * @param out DataOutput into which serialization needs to happen.
+     * @param version protocol version
+     * @throws java.io.IOException if serialization fails
+     */
+    public void serialize(T t, DataOutputPlus out, int version) throws IOException;
+
+    /**
+     * Deserialize into the specified DataInputStream instance.
+     * @param in DataInput from which deserialization needs to happen.
+     * @param p Partitioner that will be used to construct tokens. Needs to match the partitioner
that was used to
+     *     serialize the token.
+     * @param version protocol version
+     * @return the type that was deserialized
+     * @throws IOException if deserialization fails
+     */
+    public T deserialize(DataInput in, IPartitioner p, int version) throws IOException;
+
+    /**
+     * Calculate serialized size of object without actually serializing.
+     * @param t object to calculate serialized size
+     * @param version protocol version
+     * @return serialized size of object t
+     */
+    public long serializedSize(T t, int version);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 719fd46..76918a7 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -22,12 +22,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class Token implements RingPosition<Token>, Serializable
@@ -46,27 +44,26 @@ public abstract class Token implements RingPosition<Token>, Serializable
         public abstract void validate(String token) throws ConfigurationException;
     }
 
-    public static class TokenSerializer implements ISerializer<Token>
+    public static class TokenSerializer implements IPartitionerDependentSerializer<Token>
     {
-        public void serialize(Token token, DataOutputPlus out) throws IOException
+        public void serialize(Token token, DataOutputPlus out, int version) throws IOException
         {
-            IPartitioner p = StorageService.getPartitioner();
+            IPartitioner p = token.getPartitioner();
             ByteBuffer b = p.getTokenFactory().toByteArray(token);
             ByteBufferUtil.writeWithLength(b, out);
         }
 
-        public Token deserialize(DataInput in) throws IOException
+        public Token deserialize(DataInput in, IPartitioner p, int version) throws IOException
         {
-            IPartitioner p = StorageService.getPartitioner();
             int size = in.readInt();
             byte[] bytes = new byte[size];
             in.readFully(bytes);
             return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
         }
 
-        public long serializedSize(Token object, TypeSizes typeSizes)
+        public long serializedSize(Token object, int version)
         {
-            IPartitioner p = StorageService.getPartitioner();
+            IPartitioner p = object.getPartitioner();
             ByteBuffer b = p.getTokenFactory().toByteArray(object);
             return TypeSizes.NATIVE.sizeof(b.remaining()) + b.remaining();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b33cf81..c333b04 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -35,6 +35,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,9 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.gms.GossipDigestAck;
@@ -1039,4 +1042,15 @@ public final class MessagingService implements MessagingServiceMBean
         }
         return result;
     }
+
+    public static IPartitioner globalPartitioner()
+    {
+        return DatabaseDescriptor.getPartitioner();
+    }
+
+    public static void validatePartitioner(AbstractBounds<?> bounds)
+    {
+        if (globalPartitioner() != bounds.left.getPartitioner())
+            throw new AssertionError();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 5ce5969..c4a713d 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -100,6 +100,7 @@ public class RepairJobDesc
             UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
             out.writeUTF(desc.keyspace);
             out.writeUTF(desc.columnFamily);
+            MessagingService.validatePartitioner(desc.range);
             AbstractBounds.serializer.serialize(desc.range, out, version);
         }
 
@@ -114,7 +115,7 @@ public class RepairJobDesc
             UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
-            Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in,
version);
+            Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in,
MessagingService.globalPartitioner(), version).toTokenBounds();
             return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily,
range);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 239ab0e..455e5fb 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class AnticompactionRequest extends RepairMessage
@@ -51,8 +52,11 @@ public class AnticompactionRequest extends RepairMessage
         {
             UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
             out.writeInt(message.successfulRanges.size());
-            for (Range r : message.successfulRanges)
+            for (Range<Token> r : message.successfulRanges)
+            {
+                MessagingService.validatePartitioner(r);
                 Range.serializer.serialize(r, out, version);
+            }
         }
 
         public AnticompactionRequest deserialize(DataInput in, int version) throws IOException
@@ -61,14 +65,14 @@ public class AnticompactionRequest extends RepairMessage
             int rangeCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
-                ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+                ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(),
version).toTokenBounds());
             return new AnticompactionRequest(parentRepairSession, ranges);
         }
 
         public long serializedSize(AnticompactionRequest message, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession,
version);
-            for (Range r : message.successfulRanges)
+            for (Range<Token> r : message.successfulRanges)
                 size += Range.serializer.serializedSize(r, version);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 035ccc4..d63bf70 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 
@@ -58,8 +59,11 @@ public class PrepareMessage extends RepairMessage
                 UUIDSerializer.serializer.serialize(cfId, out, version);
             UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
             out.writeInt(message.ranges.size());
-            for (Range r : message.ranges)
+            for (Range<Token> r : message.ranges)
+            {
+                MessagingService.validatePartitioner(r);
                 Range.serializer.serialize(r, out, version);
+            }
             out.writeBoolean(message.isIncremental);
         }
 
@@ -73,7 +77,7 @@ public class PrepareMessage extends RepairMessage
             int rangeCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
-                ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+                ranges.add((Range<Token>) Range.serializer.deserialize(in, MessagingService.globalPartitioner(),
version).toTokenBounds());
             boolean isIncremental = in.readBoolean();
             return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index d500928..6af3bb3 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -81,7 +81,7 @@ public abstract class RepairMessage
         return new MessageOut<>(MessagingService.Verb.REPAIR_MESSAGE, this, RepairMessage.serializer);
     }
 
-    public static class RepairMessageSerializer implements IVersionedSerializer<RepairMessage>
+    public static class RepairMessageSerializer implements MessageSerializer<RepairMessage>
     {
         public void serialize(RepairMessage message, DataOutputPlus out, int version) throws
IOException
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index c4d0ab6..077132a 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
 
 /**
@@ -66,7 +67,10 @@ public class SyncRequest extends RepairMessage
             CompactEndpointSerializationHelper.serialize(message.dst, out);
             out.writeInt(message.ranges.size());
             for (Range<Token> range : message.ranges)
+            {
+                MessagingService.validatePartitioner(range);
                 AbstractBounds.serializer.serialize(range, out, version);
+            }
         }
 
         public SyncRequest deserialize(DataInput in, int version) throws IOException
@@ -78,7 +82,7 @@ public class SyncRequest extends RepairMessage
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
-                ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in,
version).toTokenBounds());
+                ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in,
MessagingService.globalPartitioner(), version).toTokenBounds());
             return new SyncRequest(desc, owner, src, dst, ranges);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 9c5b974..0fe40cf 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 public class StreamRequest
 {
@@ -55,8 +56,9 @@ public class StreamRequest
             out.writeInt(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {
-                Token.serializer.serialize(range.left, out);
-                Token.serializer.serialize(range.right, out);
+                MessagingService.validatePartitioner(range);
+                Token.serializer.serialize(range.left, out, version);
+                Token.serializer.serialize(range.right, out, version);
             }
             out.writeInt(request.columnFamilies.size());
             for (String cf : request.columnFamilies)
@@ -71,8 +73,8 @@ public class StreamRequest
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
             {
-                Token left = Token.serializer.deserialize(in);
-                Token right = Token.serializer.deserialize(in);
+                Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(),
version);
+                Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(),
version);
                 ranges.add(new Range<>(left, right));
             }
             int cfCount = in.readInt();
@@ -89,8 +91,8 @@ public class StreamRequest
             size += TypeSizes.NATIVE.sizeof(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {
-                size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
-                size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
+                size += Token.serializer.serializedSize(range.left, version);
+                size += Token.serializer.serializedSize(range.right, version);
             }
             size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
             for (String cf : request.columnFamilies)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/806facc8/src/java/org/apache/cassandra/utils/MerkleTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTree.java b/src/java/org/apache/cassandra/utils/MerkleTree.java
index 394b12a..4fec62d 100644
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@ -28,6 +28,7 @@ import com.google.common.collect.PeekingIterator;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IPartitionerDependentSerializer;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -85,8 +86,8 @@ public class MerkleTree implements Serializable
             out.writeLong(mt.size);
             out.writeUTF(mt.partitioner.getClass().getCanonicalName());
             // full range
-            Token.serializer.serialize(mt.fullRange.left, out);
-            Token.serializer.serialize(mt.fullRange.right, out);
+            Token.serializer.serialize(mt.fullRange.left, out, version);
+            Token.serializer.serialize(mt.fullRange.right, out, version);
             Hashable.serializer.serialize(mt.root, out, version);
         }
 
@@ -106,13 +107,13 @@ public class MerkleTree implements Serializable
             }
 
             // full range
-            Token left = Token.serializer.deserialize(in);
-            Token right = Token.serializer.deserialize(in);
+            Token left = Token.serializer.deserialize(in, partitioner, version);
+            Token right = Token.serializer.deserialize(in, partitioner, version);
             Range<Token> fullRange = new Range<>(left, right);
 
             MerkleTree mt = new MerkleTree(partitioner, fullRange, hashdepth, maxsize);
             mt.size = size;
-            mt.root = Hashable.serializer.deserialize(in, version);
+            mt.root = Hashable.serializer.deserialize(in, partitioner, version);
             return mt;
         }
 
@@ -124,8 +125,8 @@ public class MerkleTree implements Serializable
                  + TypeSizes.NATIVE.sizeof(mt.partitioner.getClass().getCanonicalName());
 
             // full range
-            size += Token.serializer.serializedSize(mt.fullRange.left, TypeSizes.NATIVE);
-            size += Token.serializer.serializedSize(mt.fullRange.right, TypeSizes.NATIVE);
+            size += Token.serializer.serializedSize(mt.fullRange.left, version);
+            size += Token.serializer.serializedSize(mt.fullRange.right, version);
 
             size += Hashable.serializer.serializedSize(mt.root, version);
             return size;
@@ -811,7 +812,7 @@ public class MerkleTree implements Serializable
             return buff.toString();
         }
 
-        private static class InnerSerializer implements IVersionedSerializer<Inner>
+        private static class InnerSerializer implements IPartitionerDependentSerializer<Inner>
         {
             public void serialize(Inner inner, DataOutputPlus out, int version) throws IOException
             {
@@ -822,20 +823,20 @@ public class MerkleTree implements Serializable
                     out.writeInt(inner.hash.length);
                     out.write(inner.hash);
                 }
-                Token.serializer.serialize(inner.token, out);
+                Token.serializer.serialize(inner.token, out, version);
                 Hashable.serializer.serialize(inner.lchild, out, version);
                 Hashable.serializer.serialize(inner.rchild, out, version);
             }
 
-            public Inner deserialize(DataInput in, int version) throws IOException
+            public Inner deserialize(DataInput in, IPartitioner p, int version) throws IOException
             {
                 int hashLen = in.readInt();
                 byte[] hash = hashLen >= 0 ? new byte[hashLen] : null;
                 if (hash != null)
                     in.readFully(hash);
-                Token token = Token.serializer.deserialize(in);
-                Hashable lchild = Hashable.serializer.deserialize(in, version);
-                Hashable rchild = Hashable.serializer.deserialize(in, version);
+                Token token = Token.serializer.deserialize(in, p, version);
+                Hashable lchild = Hashable.serializer.deserialize(in, p, version);
+                Hashable rchild = Hashable.serializer.deserialize(in, p, version);
                 return new Inner(token, lchild, rchild);
             }
 
@@ -845,7 +846,7 @@ public class MerkleTree implements Serializable
                 ? TypeSizes.NATIVE.sizeof(-1)
                         : TypeSizes.NATIVE.sizeof(inner.hash().length) + inner.hash().length;
 
-                size += Token.serializer.serializedSize(inner.token, TypeSizes.NATIVE)
+                size += Token.serializer.serializedSize(inner.token, version)
                 + Hashable.serializer.serializedSize(inner.lchild, version)
                 + Hashable.serializer.serializedSize(inner.rchild, version);
                 return size;
@@ -892,7 +893,7 @@ public class MerkleTree implements Serializable
             return "#<Leaf " + Hashable.toString(hash()) + ">";
         }
 
-        private static class LeafSerializer implements IVersionedSerializer<Leaf>
+        private static class LeafSerializer implements IPartitionerDependentSerializer<Leaf>
         {
             public void serialize(Leaf leaf, DataOutputPlus out, int version) throws IOException
             {
@@ -907,7 +908,7 @@ public class MerkleTree implements Serializable
                 }
             }
 
-            public Leaf deserialize(DataInput in, int version) throws IOException
+            public Leaf deserialize(DataInput in, IPartitioner p, int version) throws IOException
             {
                 int hashLen = in.readInt();
                 byte[] hash = hashLen < 0 ? null : new byte[hashLen];
@@ -955,7 +956,7 @@ public class MerkleTree implements Serializable
     static abstract class Hashable implements Serializable
     {
         private static final long serialVersionUID = 1L;
-        private static final IVersionedSerializer<Hashable> serializer = new HashableSerializer();
+        private static final IPartitionerDependentSerializer<Hashable> serializer =
new HashableSerializer();
 
         protected byte[] hash;
         protected long sizeOfRange;
@@ -1033,7 +1034,7 @@ public class MerkleTree implements Serializable
             return "[" + Hex.bytesToHex(hash) + "]";
         }
 
-        private static class HashableSerializer implements IVersionedSerializer<Hashable>
+        private static class HashableSerializer implements IPartitionerDependentSerializer<Hashable>
         {
             public void serialize(Hashable h, DataOutputPlus out, int version) throws IOException
             {
@@ -1051,13 +1052,13 @@ public class MerkleTree implements Serializable
                     throw new IOException("Unexpected Hashable: " + h.getClass().getCanonicalName());
             }
 
-            public Hashable deserialize(DataInput in, int version) throws IOException
+            public Hashable deserialize(DataInput in, IPartitioner p, int version) throws
IOException
             {
                 byte ident = in.readByte();
                 if (Inner.IDENT == ident)
-                    return Inner.serializer.deserialize(in, version);
+                    return Inner.serializer.deserialize(in, p, version);
                 else if (Leaf.IDENT == ident)
-                    return Leaf.serializer.deserialize(in, version);
+                    return Leaf.serializer.deserialize(in, p, version);
                 else
                     throw new IOException("Unexpected Hashable: " + ident);
             }


Mime
View raw message