cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [5/5] git commit: remove Thrift from intra-cluster message serialization
Date Mon, 21 May 2012 21:01:37 GMT
remove Thrift from intra-cluster message serialization


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490a0998
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490a0998
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490a0998

Branch: refs/heads/trunk
Commit: 490a0998738d07f72257a637752557a2a9626a8c
Parents: 3fd08dd
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Sat May 19 11:43:23 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon May 21 15:44:59 2012 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/db/IndexScanCommand.java  |   13 +-
 .../org/apache/cassandra/db/RangeSliceCommand.java |  142 ++++++++++++---
 src/java/org/apache/cassandra/db/TypeSizes.java    |   12 ++
 .../org/apache/cassandra/utils/FBUtilities.java    |    4 +-
 4 files changed, 141 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/IndexScanCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexScanCommand.java b/src/java/org/apache/cassandra/db/IndexScanCommand.java
index 859d1d4..ada0c5d 100644
--- a/src/java/org/apache/cassandra/db/IndexScanCommand.java
+++ b/src/java/org/apache/cassandra/db/IndexScanCommand.java
@@ -18,19 +18,18 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
@@ -64,6 +63,8 @@ public class IndexScanCommand
     {
         public void serialize(IndexScanCommand o, DataOutput out, int version) throws IOException
         {
+            assert version < MessagingService.VERSION_12; // 1.2 only uses RangeScanCommand
+
             out.writeUTF(o.keyspace);
             out.writeUTF(o.column_family);
             TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
@@ -74,13 +75,15 @@ public class IndexScanCommand
 
         public IndexScanCommand deserialize(DataInput in, int version) throws IOException
         {
+            assert version < MessagingService.VERSION_12; // 1.2 only uses RangeScanCommand
+
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
 
-            TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
             IndexClause indexClause = new IndexClause();
-            FBUtilities.deserialize(dser, indexClause, in);
             SlicePredicate predicate = new SlicePredicate();
+            TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
+            FBUtilities.deserialize(dser, indexClause, in);
             FBUtilities.deserialize(dser, predicate, in);
             AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(in,
version).toRowBounds();
             return new IndexScanCommand(keyspace, columnFamily, indexClause, predicate, range);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 8516e06..7c59758 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -47,10 +47,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TDeserializer;
@@ -149,8 +146,30 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         if (sc != null)
             ByteBufferUtil.write(sc, dos);
 
-        TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
-        FBUtilities.serialize(ser, sliceCommand.predicate, dos);
+        if (version < MessagingService.VERSION_12)
+        {
+            FBUtilities.serialize(new TSerializer(new TBinaryProtocol.Factory()), sliceCommand.predicate,
dos);
+        }
+        else
+        {
+            SliceRange range = sliceCommand.predicate.slice_range;
+            if (range != null)
+            {
+                dos.writeByte(0);
+                ByteBufferUtil.writeWithShortLength(range.start, dos);
+                ByteBufferUtil.writeWithShortLength(range.finish, dos);
+                dos.writeBoolean(range.reversed);
+                dos.writeInt(range.count);
+            }
+            else
+            {
+                dos.writeByte(1);
+                List<ByteBuffer> columns = sliceCommand.predicate.column_names;
+                dos.writeInt(columns.size());
+                for (ByteBuffer column : columns)
+                    ByteBufferUtil.writeWithShortLength(column, dos);
+            }
+        }
 
         if (version >= MessagingService.VERSION_11)
         {
@@ -162,7 +181,18 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             {
                 dos.writeInt(sliceCommand.row_filter.size());
                 for (IndexExpression expr : sliceCommand.row_filter)
-                    FBUtilities.serialize(ser, expr, dos);
+                {
+                    if (version < MessagingService.VERSION_12)
+                    {
+                        FBUtilities.serialize(new TSerializer(new TBinaryProtocol.Factory()),
expr, dos);
+                    }
+                    else
+                    {
+                        ByteBufferUtil.writeWithShortLength(expr.column_name, dos);
+                        dos.writeInt(expr.op.getValue());
+                        ByteBufferUtil.writeWithLength(expr.value, dos);
+                    }
+                }
             }
         }
         AbstractBounds.serializer.serialize(sliceCommand.range, dos, version);
@@ -188,9 +218,31 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             superColumn = ByteBuffer.wrap(buf);
         }
 
-        TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory());
         SlicePredicate pred = new SlicePredicate();
-        FBUtilities.deserialize(dser, pred, dis);
+        if (version < MessagingService.VERSION_12)
+        {
+            FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()), pred,
dis);
+        }
+        else
+        {
+            int type = dis.readByte();
+            if (type == 0)
+            {
+                pred.slice_range = new SliceRange(ByteBufferUtil.readWithShortLength(dis),
+                                                  ByteBufferUtil.readWithShortLength(dis),
+                                                  dis.readBoolean(),
+                                                  dis.readInt());
+            }
+            else
+            {
+                assert type == 1;
+                int count = dis.readInt();
+                List<ByteBuffer> columns = new ArrayList<ByteBuffer>(count);
+                for (int i = 0; i < count; i++)
+                    columns.add(ByteBufferUtil.readWithShortLength(dis));
+                pred.column_names = columns;
+            }
+        }
 
         List<IndexExpression> rowFilter = null;
         if (version >= MessagingService.VERSION_11)
@@ -199,8 +251,18 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             rowFilter = new ArrayList<IndexExpression>(filterCount);
             for (int i = 0; i < filterCount; i++)
             {
-                IndexExpression expr = new IndexExpression();
-                FBUtilities.deserialize(dser, expr, dis);
+                IndexExpression expr;
+                if (version < MessagingService.VERSION_12)
+                {
+                    expr = new IndexExpression();
+                    FBUtilities.deserialize(new TDeserializer(new TBinaryProtocol.Factory()),
expr, dis);
+                }
+                else
+                {
+                    expr = new IndexExpression(ByteBufferUtil.readWithShortLength(dis),
+                                               IndexOperator.findByValue(dis.readInt()),
+                                               ByteBufferUtil.readWithShortLength(dis));
+                }
                 rowFilter.add(expr);
             }
         }
@@ -233,16 +295,39 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             size += TypeSizes.NATIVE.sizeof(0);
         }
 
-        TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
-        try
+        if (version < MessagingService.VERSION_12)
         {
-            int predicateLength = ser.serialize(rsc.predicate).length;
-            size += TypeSizes.NATIVE.sizeof(predicateLength);
-            size += predicateLength;
+            TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
+            try
+            {
+                int predicateLength = ser.serialize(rsc.predicate).length;
+                if (version < MessagingService.VERSION_12)
+                    size += TypeSizes.NATIVE.sizeof(predicateLength);
+                size += predicateLength;
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        catch (TException e)
+        else
         {
-            throw new RuntimeException(e);
+            SliceRange range = rsc.predicate.slice_range;
+            size += 1;
+            if (range != null)
+            {
+                size += TypeSizes.NATIVE.sizeofWithShortLength(range.start);
+                size += TypeSizes.NATIVE.sizeofWithShortLength(range.finish);
+                size += TypeSizes.NATIVE.sizeof(range.reversed);
+                size += TypeSizes.NATIVE.sizeof(range.count);
+            }
+            else
+            {
+                List<ByteBuffer> columns = rsc.predicate.column_names;
+                size += TypeSizes.NATIVE.sizeof(columns.size());
+                for (ByteBuffer column : columns)
+                    size += TypeSizes.NATIVE.sizeofWithShortLength(column);
+            }
         }
 
         if (version >= MessagingService.VERSION_11)
@@ -256,15 +341,24 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
                 size += TypeSizes.NATIVE.sizeof(rsc.row_filter.size());
                 for (IndexExpression expr : rsc.row_filter)
                 {
-                    try
+                    if (version < MessagingService.VERSION_12)
                     {
-                        int filterLength = ser.serialize(expr).length;
-                        size += TypeSizes.NATIVE.sizeof(filterLength);
-                        size += filterLength;
+                        try
+                        {
+                            int filterLength = new TSerializer(new TBinaryProtocol.Factory()).serialize(expr).length;
+                            size += TypeSizes.NATIVE.sizeof(filterLength);
+                            size += filterLength;
+                        }
+                        catch (TException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
                     }
-                    catch (TException e)
+                    else
                     {
-                        throw new RuntimeException(e);
+                        size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
+                        size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
+                        size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/db/TypeSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java
index 26c9f7d..aac89d0 100644
--- a/src/java/org/apache/cassandra/db/TypeSizes.java
+++ b/src/java/org/apache/cassandra/db/TypeSizes.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.ByteBuffer;
+
 import org.apache.cassandra.utils.FBUtilities;
 
 public abstract class TypeSizes
@@ -59,6 +61,16 @@ public abstract class TypeSizes
         return utflen;
     }
 
+    public int sizeofWithShortLength(ByteBuffer value)
+    {
+        return sizeof((short) value.remaining()) + value.remaining();
+    }
+
+    public int sizeofWithLength(ByteBuffer value)
+    {
+        return sizeof(value.remaining()) + value.remaining();
+    }
+
     public static class NativeDBTypeSizes extends TypeSizes
     {
         public int sizeof(boolean value)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490a0998/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 063d577..8fa32da 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
 import org.apache.commons.lang.StringUtils;
@@ -42,7 +43,6 @@ import org.apache.cassandra.cache.IRowCacheProvider;
 import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -240,6 +240,7 @@ public class FBUtilities
         new File(tmpFilename).renameTo(new File(filename));
     }
 
+    @Deprecated
     public static void serialize(TSerializer serializer, TBase struct, DataOutput out)
     throws IOException
     {
@@ -259,6 +260,7 @@ public class FBUtilities
         out.write(bytes);
     }
 
+    @Deprecated
     public static void deserialize(TDeserializer deserializer, TBase struct, DataInput in)
     throws IOException
     {


Mime
View raw message