cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [41/50] cassandra git commit: Wait until the message is being send to decide which serializer must be used
Date Mon, 18 Jul 2016 15:43:56 GMT
Wait until the message is being send to decide which serializer must be used

patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-11393


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbd287ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbd287ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbd287ad

Branch: refs/heads/cassandra-3.8
Commit: fbd287ad2ed09190dd9c6e152b82215e81020847
Parents: e99ee19
Author: Benjamin Lerer <b.lerer@gmail.com>
Authored: Thu Jul 14 11:33:08 2016 +0200
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Thu Jul 14 11:33:08 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  1 +
 .../cassandra/db/PartitionRangeReadCommand.java |  7 +--
 .../org/apache/cassandra/db/ReadCommand.java    | 65 ++++++++++----------
 .../org/apache/cassandra/db/ReadResponse.java   | 43 +++++--------
 .../db/SinglePartitionReadCommand.java          |  2 +-
 .../io/ForwardingVersionedSerializer.java       | 57 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  |  6 +-
 8 files changed, 113 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70210a8..3829046 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
  * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
  * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
  * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 100bcf4..b71ebf6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1124,6 +1124,7 @@ public class DatabaseDescriptor
             case READ:
                 return getReadRpcTimeout();
             case RANGE_SLICE:
+            case PAGED_RANGE:
                 return getRangeRpcTimeout();
             case TRUNCATE:
                 return getTruncateRpcTimeout();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 842ad5f..99e24c8 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -253,12 +253,9 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public MessageOut<ReadCommand> createMessage(int version)
     {
-        if (version >= MessagingService.VERSION_30)
-            return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
-
         return dataRange().isPaging()
-             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer)
-             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
+             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c792a5a..36969f8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -58,9 +59,39 @@ public abstract class ReadCommand implements ReadQuery
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
 
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
+
+    // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer'
for earlier version.
+    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+    public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyReadCommandSerializer : serializer;
+        }
+    };
+
     // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer'
for earlier version.
     // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
-    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new
RangeSliceSerializer();
+    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new
ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyRangeSliceCommandSerializer : serializer;
+        }
+    };
+
+    // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer'
for earlier version.
+    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
+    public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new
ForwardingVersionedSerializer<ReadCommand>()
+    {
+        protected IVersionedSerializer<ReadCommand> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyPagedRangeCommandSerializer : serializer;
+        }
+    };
 
     public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer
= new LegacyRangeSliceCommandSerializer();
     public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer
= new LegacyPagedRangeCommandSerializer();
@@ -567,7 +598,6 @@ public abstract class ReadCommand implements ReadQuery
 
         public void serialize(ReadCommand command, DataOutputPlus out, int version) throws
IOException
         {
-            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer
to be used directly
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
@@ -587,8 +617,7 @@ public abstract class ReadCommand implements ReadQuery
 
         public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                return legacyReadCommandSerializer.deserialize(in, version);
+            assert version >= MessagingService.VERSION_30;
 
             Kind kind = Kind.values()[in.readByte()];
             int flags = in.readByte();
@@ -628,7 +657,6 @@ public abstract class ReadCommand implements ReadQuery
 
         public long serializedSize(ReadCommand command, int version)
         {
-            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer
to be used directly
             assert version >= MessagingService.VERSION_30;
 
             return 2 // kind + flags
@@ -643,33 +671,6 @@ public abstract class ReadCommand implements ReadQuery
         }
     }
 
-    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as
long as we maintain pre-3.0
-    // compatibility
-    private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand>
-    {
-        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws
IOException
-        {
-            if (version < MessagingService.VERSION_30)
-                legacyRangeSliceCommandSerializer.serialize(command, out, version);
-            else
-                serializer.serialize(command, out, version);
-        }
-
-        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
-                 : serializer.deserialize(in, version);
-        }
-
-        public long serializedSize(ReadCommand command, int version)
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceCommandSerializer.serializedSize(command, version)
-                 : serializer.serializedSize(command, version);
-        }
-    }
-
     private enum LegacyType
     {
         GET_BY_NAMES((byte)1),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 8bd1be6..12a200f 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -46,11 +47,20 @@ public abstract class ReadResponse
 {
     // Serializer for single partition read response
     public static final IVersionedSerializer<ReadResponse> serializer = new Serializer();
-    // Serializer for partition range read response (this actually delegate to 'serializer'
in 3.0 and to
-    // 'legacyRangeSliceReplySerializer' in older version.
-    public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new
RangeSliceSerializer();
     // Serializer for the pre-3.0 rang slice responses.
     public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer
= new LegacyRangeSliceReplySerializer();
+    // Serializer for partition range read response (this actually delegate to 'serializer'
in 3.0 and to
+    // 'legacyRangeSliceReplySerializer' in older version.
+    public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new
ForwardingVersionedSerializer<ReadResponse>()
+    {
+        @Override
+        protected IVersionedSerializer<ReadResponse> delegate(int version)
+        {
+            return version < MessagingService.VERSION_30
+                    ? legacyRangeSliceReplySerializer
+                    : serializer;
+        }
+    };
 
     // This is used only when serializing data responses and we can't it easily in other
cases. So this can be null, which is slighly
     // hacky, but as this hack doesn't escape this class, and it's easy enough to validate
that it's not null when we need, it's "good enough".
@@ -411,31 +421,6 @@ public abstract class ReadResponse
         }
     }
 
-    private static class RangeSliceSerializer implements IVersionedSerializer<ReadResponse>
-    {
-        public void serialize(ReadResponse response, DataOutputPlus out, int version) throws
IOException
-        {
-            if (version < MessagingService.VERSION_30)
-                legacyRangeSliceReplySerializer.serialize(response, out, version);
-            else
-                serializer.serialize(response, out, version);
-        }
-
-        public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceReplySerializer.deserialize(in, version)
-                 : serializer.deserialize(in, version);
-        }
-
-        public long serializedSize(ReadResponse response, int version)
-        {
-            return version < MessagingService.VERSION_30
-                 ? legacyRangeSliceReplySerializer.serializedSize(response, version)
-                 : serializer.serializedSize(response, version);
-        }
-    }
-
     private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse>
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int version) throws
IOException
@@ -477,6 +462,8 @@ public abstract class ReadResponse
 
         public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
         {
+            assert version < MessagingService.VERSION_30;
+
             int partitionCount = in.readInt();
             ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount);
             for (int i = 0; i < partitionCount; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 6784770..73eb9bd 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -894,7 +894,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     public MessageOut<ReadCommand> createMessage(int version)
     {
-        return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30
? legacyReadCommandSerializer : serializer);
+        return new MessageOut<>(MessagingService.Verb.READ, this, readSerializer);
     }
 
     protected void appendCQLWhereClause(StringBuilder sb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
new file mode 100644
index 0000000..64f91d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/ForwardingVersionedSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * A serializer which forwards all its method calls to another serializer. Subclasses should
override one or more
+ * methods to modify the behavior of the backing serializer as desired per the decorator
pattern.
+ */
+public abstract class ForwardingVersionedSerializer<T> implements IVersionedSerializer<T>
+{
+    protected ForwardingVersionedSerializer()
+    {
+    }
+
+    /**
+     * Returns the backing delegate instance that methods are forwarded to.
+     *
+     * @param version the server version
+     * @return the backing delegate instance that methods are forwarded to.
+     */
+    protected abstract IVersionedSerializer<T> delegate(int version);
+
+    public void serialize(T t, DataOutputPlus out, int version) throws IOException
+    {
+        delegate(version).serialize(t, out, version);
+    }
+
+    public T deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return delegate(version).deserialize(in, version);
+    }
+
+    public long serializedSize(T t, int version)
+    {
+        return delegate(version).serializedSize(t, version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbd287ad/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index fac46eb..d01419f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -215,9 +215,9 @@ public final class MessagingService implements MessagingServiceMBean
 
         put(Verb.MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
-        put(Verb.READ, ReadCommand.serializer);
+        put(Verb.READ, ReadCommand.readSerializer);
         put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
-        put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
+        put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
@@ -247,7 +247,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
-        put(Verb.PAGED_RANGE, ReadResponse.legacyRangeSliceReplySerializer);
+        put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);
         put(Verb.SNAPSHOT, null);


Mime
View raw message