cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [15/15] Introduce MessageOut class, which wraps an object to be sent in the "payload" field. The old Header class is inlined into the "parameters" map. patch by jbellis; reviewed by yukim for CASSANDRA-3617
Date Tue, 08 May 2012 17:56:15 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index aea609e..a340a94 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessageSerializer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -68,22 +69,21 @@ public class SerializationsTest extends AbstractSerializationsTester
         IPartitioner part = StorageService.getPartitioner();
         AbstractBounds<RowPosition> bounds = new Range<Token>(part.getRandomToken(),
part.getRandomToken()).toRowBounds();
 
-        Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred,
bounds, 100).getMessage(MessagingService.current_version);
-        Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred,
bounds, 100).getMessage(MessagingService.current_version);
-        Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null,  nonEmptyRangePred,
bounds, 100).getMessage(MessagingService.current_version);
-        Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred,
bounds, 100).getMessage(MessagingService.current_version);
-        Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC,
emptyRangePred, bounds, 100).getMessage(MessagingService.current_version);
-        Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC,
 nonEmptyRangePred, bounds, 100).getMessage(MessagingService.current_version);
-
-        DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
-
-        messageSerializer.serialize(namesCmd, dout, getVersion());
-        messageSerializer.serialize(emptyRangeCmd, dout, getVersion());
-        messageSerializer.serialize(regRangeCmd, dout, getVersion());
-        messageSerializer.serialize(namesCmdSup, dout, getVersion());
-        messageSerializer.serialize(emptyRangeCmdSup, dout, getVersion());
-        messageSerializer.serialize(regRangeCmdSup, dout, getVersion());
-        dout.close();
+        MessageOut<RangeSliceCommand> namesCmd = new RangeSliceCommand(Statics.KS,
"Standard1", null, namesPred, bounds, 100).createMessage();
+        MessageOut<RangeSliceCommand> emptyRangeCmd = new RangeSliceCommand(Statics.KS,
"Standard1", null, emptyRangePred, bounds, 100).createMessage();
+        MessageOut<RangeSliceCommand> regRangeCmd = new RangeSliceCommand(Statics.KS,
"Standard1", null,  nonEmptyRangePred, bounds, 100).createMessage();
+        MessageOut<RangeSliceCommand> namesCmdSup = new RangeSliceCommand(Statics.KS,
"Super1", Statics.SC, namesPred, bounds, 100).createMessage();
+        MessageOut<RangeSliceCommand> emptyRangeCmdSup = new RangeSliceCommand(Statics.KS,
"Super1", Statics.SC, emptyRangePred, bounds, 100).createMessage();
+        MessageOut<RangeSliceCommand> regRangeCmdSup = new RangeSliceCommand(Statics.KS,
"Super1", Statics.SC,  nonEmptyRangePred, bounds, 100).createMessage();
+        
+        DataOutputStream out = getOutput("db.RangeSliceCommand.bin");
+        namesCmd.serialize(out, getVersion());
+        emptyRangeCmd.serialize(out, getVersion());
+        regRangeCmd.serialize(out, getVersion());
+        namesCmdSup.serialize(out, getVersion());
+        emptyRangeCmdSup.serialize(out, getVersion());
+        regRangeCmdSup.serialize(out, getVersion());
+        out.close();
     }
 
     @Test
@@ -111,8 +111,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceByNamesReadCommand.serializer().serialize(superCmd, out, getVersion());
         ReadCommand.serializer().serialize(standardCmd, out, getVersion());
         ReadCommand.serializer().serialize(superCmd, out, getVersion());
-        messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion());
+        standardCmd.createMessage().serialize(out, getVersion());
+        superCmd.createMessage().serialize(out, getVersion());
         out.close();
     }
 
@@ -141,8 +141,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceFromReadCommand.serializer().serialize(superCmd, out, getVersion());
         ReadCommand.serializer().serialize(standardCmd, out, getVersion());
         ReadCommand.serializer().serialize(superCmd, out, getVersion());
-        messageSerializer.serialize(standardCmd.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(superCmd.getMessage(getVersion()), out, getVersion());
+        standardCmd.createMessage().serialize(out, getVersion());
+        superCmd.createMessage().serialize(out, getVersion());
         out.close();
     }
 
@@ -205,12 +205,14 @@ public class SerializationsTest extends AbstractSerializationsTester
         RowMutation.serializer().serialize(standardRm, out, getVersion());
         RowMutation.serializer().serialize(superRm, out, getVersion());
         RowMutation.serializer().serialize(mixedRm, out, getVersion());
-        messageSerializer.serialize(emptyRm.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(standardRowRm.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(superRowRm.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(standardRm.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(superRm.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(mixedRm.getMessage(getVersion()), out, getVersion());
+
+        emptyRm.createMessage().serialize(out, getVersion());
+        standardRowRm.createMessage().serialize(out, getVersion());
+        superRowRm.createMessage().serialize(out, getVersion());
+        standardRm.createMessage().serialize(out, getVersion());
+        superRm.createMessage().serialize(out, getVersion());
+        mixedRm.createMessage().serialize(out, getVersion());
+
         out.close();
     }
 
@@ -245,9 +247,10 @@ public class SerializationsTest extends AbstractSerializationsTester
         Truncation.serializer().serialize(tr, out, getVersion());
         TruncateResponse.serializer().serialize(aff, out, getVersion());
         TruncateResponse.serializer().serialize(neg, out, getVersion());
-        messageSerializer.serialize(tr.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
aff), out, getVersion());
-        messageSerializer.serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(getVersion()),
neg), out, getVersion());
+
+        tr.createMessage().serialize(out, getVersion());
+        aff.createMessage().serialize(out, getVersion());
+        neg.createMessage().serialize(out, getVersion());
         // todo: notice how CF names weren't validated.
         out.close();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/net/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessageSerializer.java b/test/unit/org/apache/cassandra/net/MessageSerializer.java
index e8b7874..be9eda5 100644
--- a/test/unit/org/apache/cassandra/net/MessageSerializer.java
+++ b/test/unit/org/apache/cassandra/net/MessageSerializer.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 
 public class MessageSerializer implements IVersionedSerializer<Message>
 {
+    // TODO imitate backwards-compatibility code from OutboundTcpConnection here
     public void serialize(Message t, DataOutput dos, int version) throws IOException
     {
         Header.serializer().serialize(t.header, dos, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index e8ccfba..d3cd62d 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.sink.IMessageSink;
 import org.apache.cassandra.net.sink.SinkManager;
@@ -152,7 +153,8 @@ public class RemoveTest
 
         for (InetAddress host : hosts)
         {
-            Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new
byte[0], MessagingService.current_version);
+            // TODO how to spoof host here?
+            MessageOut msg = new MessageOut(StorageService.Verb.REPLICATION_FINISHED);
             MessagingService.instance().sendRR(msg, FBUtilities.getBroadcastAddress());
         }
 
@@ -162,6 +164,9 @@ public class RemoveTest
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
     }
 
+    /**
+     * sink that captures STREAM_REQUEST messages and calls finishStreamRequest on it
+     */
     class ReplicationSink implements IMessageSink
     {
         public Message handleMessage(Message msg, String id, InetAddress to)
@@ -173,5 +178,10 @@ public class RemoveTest
 
             return null;
         }
+
+        public MessageOut handleMessage(MessageOut msg, String id, InetAddress to)
+        {
+            return msg;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index d736a95..6ab8685 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -52,8 +52,8 @@ public class SerializationsTest extends AbstractSerializationsTester
     private void testTreeRequestWrite() throws IOException
     {
         DataOutputStream out = getOutput("service.TreeRequest.bin");
-        AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out,
getVersion());
-        messageSerializer.serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req,
getVersion()), out, getVersion());
+        AntiEntropyService.TreeRequest.serializer.serialize(Statics.req, out, getVersion());
+        Statics.req.createMessage().serialize(out, getVersion());
         out.close();
     }
 
@@ -64,7 +64,7 @@ public class SerializationsTest extends AbstractSerializationsTester
             testTreeRequestWrite();
 
         DataInputStream in = getInput("service.TreeRequest.bin");
-        assert AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.deserialize(in, getVersion())
!= null;
+        assert AntiEntropyService.TreeRequest.serializer.deserialize(in, getVersion()) !=
null;
         assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();
     }
@@ -82,10 +82,10 @@ public class SerializationsTest extends AbstractSerializationsTester
         AntiEntropyService.Validator v1 = new AntiEntropyService.Validator(Statics.req, mt);
 
         DataOutputStream out = getOutput("service.TreeResponse.bin");
-        AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v0, out, getVersion());
-        AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.serialize(v1, out, getVersion());
-        messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
v0), out, getVersion());
-        messageSerializer.serialize(AntiEntropyService.TreeResponseVerbHandler.makeVerb(FBUtilities.getBroadcastAddress(),
v1), out, getVersion());
+        AntiEntropyService.Validator.serializer.serialize(v0, out, getVersion());
+        AntiEntropyService.Validator.serializer.serialize(v1, out, getVersion());
+        v0.createMessage().serialize(out, getVersion());
+        v1.createMessage().serialize(out, getVersion());
         out.close();
     }
 
@@ -96,8 +96,8 @@ public class SerializationsTest extends AbstractSerializationsTester
             testTreeResponseWrite();
 
         DataInputStream in = getInput("service.TreeResponse.bin");
-        assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion())
!= null;
-        assert AntiEntropyService.TreeResponseVerbHandler.SERIALIZER.deserialize(in, getVersion())
!= null;
+        assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
+        assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
         assert messageSerializer.deserialize(in, getVersion()) != null;
         assert messageSerializer.deserialize(in, getVersion()) != null;
         in.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a6f0b85/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index d549457..f2ed9eb 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -21,6 +21,17 @@ package org.apache.cassandra.streaming;
  */
 
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowMutation;
@@ -36,13 +47,6 @@ import org.apache.cassandra.net.MessageSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.junit.Test;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
@@ -117,7 +121,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
         DataOutputStream out = getOutput("streaming.StreamReply.bin");
         StreamReply.serializer.serialize(rep, out, getVersion());
-        messageSerializer.serialize(rep.getMessage(getVersion()), out, getVersion());
+        rep.createMessage().serialize(out, getVersion());
         out.close();
     }
 
@@ -156,9 +160,9 @@ public class SerializationsTest extends AbstractSerializationsTester
         StreamRequestMessage.serializer().serialize(msg0, out, getVersion());
         StreamRequestMessage.serializer().serialize(msg1, out, getVersion());
         StreamRequestMessage.serializer().serialize(msg2, out, getVersion());
-        messageSerializer.serialize(msg0.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(msg1.getMessage(getVersion()), out, getVersion());
-        messageSerializer.serialize(msg2.getMessage(getVersion()), out, getVersion());
+        msg0.createMessage().serialize(out, getVersion());
+        msg1.createMessage().serialize(out, getVersion());
+        msg2.createMessage().serialize(out, getVersion());
         out.close();
     }
 


Mime
View raw message