cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/2] git commit: change Message IDs to ints patch by jbellis; reviewed by vijay for CASSANDRA-5307
Date Thu, 07 Mar 2013 00:29:24 GMT
Updated Branches:
  refs/heads/trunk 4cd67af55 -> 209d067af


change Message IDs to ints
patch by jbellis; reviewed by vijay for CASSANDRA-5307


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1936648a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1936648a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1936648a

Branch: refs/heads/trunk
Commit: 1936648a1188b8948048610eb2ccea316a39c647
Parents: 4cd67af
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Mar 4 11:37:38 2013 +0100
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Mar 7 00:29:14 2013 +0000

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/db/CounterMutationVerbHandler.java   |    2 +-
 .../cassandra/db/DefinitionsUpdateVerbHandler.java |    2 +-
 .../cassandra/db/MigrationRequestVerbHandler.java  |    2 +-
 .../apache/cassandra/db/ReadRepairVerbHandler.java |    2 +-
 .../org/apache/cassandra/db/ReadVerbHandler.java   |    2 +-
 .../cassandra/db/RowMutationVerbHandler.java       |    4 +-
 .../cassandra/db/SchemaCheckVerbHandler.java       |    2 +-
 .../apache/cassandra/db/TruncateVerbHandler.java   |    2 +-
 .../org/apache/cassandra/dht/BootStrapper.java     |    2 +-
 .../cassandra/gms/GossipDigestAck2VerbHandler.java |    2 +-
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |    2 +-
 .../cassandra/gms/GossipDigestSynVerbHandler.java  |    2 +-
 .../cassandra/gms/GossipShutdownVerbHandler.java   |    2 +-
 .../org/apache/cassandra/net/IVerbHandler.java     |    2 +-
 .../cassandra/net/IncomingTcpConnection.java       |   11 +++-
 .../apache/cassandra/net/MessageDeliveryTask.java  |    4 +-
 src/java/org/apache/cassandra/net/MessageIn.java   |    2 +-
 .../org/apache/cassandra/net/MessagingService.java |   46 +++++++--------
 .../cassandra/net/OutboundTcpConnection.java       |   20 ++++---
 .../apache/cassandra/net/ResponseVerbHandler.java  |    2 +-
 .../apache/cassandra/net/sink/IMessageSink.java    |    4 +-
 .../org/apache/cassandra/net/sink/SinkManager.java |    4 +-
 .../cassandra/service/AntiEntropyService.java      |    4 +-
 .../cassandra/service/IndexScanVerbHandler.java    |    2 +-
 .../org/apache/cassandra/service/PBSPredictor.java |   28 +++++-----
 .../cassandra/service/RangeSliceVerbHandler.java   |    2 +-
 .../cassandra/service/SnapshotVerbHandler.java     |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |    9 ++-
 .../apache/cassandra/streaming/FileStreamTask.java |   14 +++-
 .../cassandra/streaming/IncomingStreamReader.java  |    8 +--
 .../streaming/ReplicationFinishedVerbHandler.java  |    2 +-
 .../cassandra/streaming/StreamInSession.java       |   15 +----
 .../streaming/StreamReplyVerbHandler.java          |    2 +-
 .../streaming/StreamRequestVerbHandler.java        |    2 +-
 .../cassandra/streaming/StreamingRepairTask.java   |    4 +-
 .../apache/cassandra/db/SerializationsTest.java    |   32 +++++-----
 .../apache/cassandra/service/PBSPredictorTest.java |   16 +++---
 .../org/apache/cassandra/service/RemoveTest.java   |    4 +-
 .../cassandra/service/SerializationsTest.java      |    6 +-
 .../cassandra/streaming/SerializationsTest.java    |    8 +-
 41 files changed, 143 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a89bf6d..02d0edf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0
+ * Change Message IDs to ints (CASSANDRA-5307)
  * Move sstable level information into the Stats component, removing the
    need for a separate Manifest file (CASSANDRA-4872)
  * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index f28e2bd..38ee66f 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -34,7 +34,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
 {
     private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 
-    public void doVerb(final MessageIn<CounterMutation> message, final String id)
+    public void doVerb(final MessageIn<CounterMutation> message, final int id)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index fdce853..5d59549 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -39,7 +39,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Row
 {
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
 
-    public void doVerb(final MessageIn<Collection<RowMutation>> message, String id)
+    public void doVerb(final MessageIn<Collection<RowMutation>> message, int id)
     {
         logger.debug("Received schema mutation push from " + message.from);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 97fd641..e3152ad 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -36,7 +36,7 @@ public class MigrationRequestVerbHandler implements IVerbHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn message, int id)
     {
         logger.debug("Received migration request from {}.", message.from);
         MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 373117a..fca4938 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.net.MessagingService;
 
 public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
 {
-    public void doVerb(MessageIn<RowMutation> message, String id)
+    public void doVerb(MessageIn<RowMutation> message, int id)
     {
         RowMutation rm = message.payload;
         rm.apply();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index fbd4f9b..0fbf532 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -34,7 +34,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
 
-    public void doVerb(MessageIn<ReadCommand> message, String id)
+    public void doVerb(MessageIn<ReadCommand> message, int id)
     {
         if (StorageService.instance.isBootstrapMode())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index c2126f5..9a1a43c 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -32,7 +32,7 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
 {
     private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
 
-    public void doVerb(MessageIn<RowMutation> message, String id)
+    public void doVerb(MessageIn<RowMutation> message, int id)
     {
         try
         {
@@ -79,7 +79,7 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
         {
             // Send a message to each of the addresses on our Forward List
             InetAddress address = CompactEndpointSerializationHelper.deserialize(dis);
-            String id = dis.readUTF();
+            int id = dis.readInt();
             logger.debug("Forwarding message to {}@{}", id, address);
             // Let the response go back to the coordinator
             MessagingService.instance().sendOneWay(message, id, address);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index dbbcaa6..1a1f7a9 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -33,7 +33,7 @@ public class SchemaCheckVerbHandler implements IVerbHandler
 {
     private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn message, int id)
     {
         logger.debug("Received schema check request.");
         MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index ceb732d..73825c3 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -30,7 +30,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation>
 {
     private static final Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
 
-    public void doVerb(MessageIn<Truncation> message, String id)
+    public void doVerb(MessageIn<Truncation> message, int id)
     {
         Truncation t = message.payload;
         Tracing.trace("Applying truncation of {}.{}", t.keyspace, t.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index ff76534..a1dfce8 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -205,7 +205,7 @@ public class BootStrapper
     @Deprecated
     public static class BootstrapTokenVerbHandler implements IVerbHandler
     {
-        public void doVerb(MessageIn message, String id)
+        public void doVerb(MessageIn message, int id)
         {
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index 35d3f90..240bb40 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -30,7 +30,7 @@ public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestAck2> message, String id)
+    public void doVerb(MessageIn<GossipDigestAck2> message, int id)
     {
         if (logger.isTraceEnabled())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index cff9afc..6152395 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -34,7 +34,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestAck> message, String id)
+    public void doVerb(MessageIn<GossipDigestAck> message, int id)
     {
         InetAddress from = message.from;
         if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 29f802a..ee23656 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -33,7 +33,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipDigestSynVerbHandler.class);
 
-    public void doVerb(MessageIn<GossipDigestSyn> message, String id)
+    public void doVerb(MessageIn<GossipDigestSyn> message, int id)
     {
         InetAddress from = message.from;
         if (logger.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
index d400a73..ef71208 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -27,7 +27,7 @@ public class GossipShutdownVerbHandler implements IVerbHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipShutdownVerbHandler.class);
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn message, int id)
     {
         if (!Gossiper.instance.isEnabled())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index 8ae63e2..7f835c0 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -34,5 +34,5 @@ public interface IVerbHandler<T>
      * @param message - incoming message that needs handling.
      * @param id
      */
-    public void doVerb(MessageIn<T> message, String id);
+    public void doVerb(MessageIn<T> message, int id);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index aa8378e..c5e9a52 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -47,7 +47,7 @@ public class IncomingTcpConnection extends Thread
         {
             try
             {
-                this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize().intValue());
+                this.socket.setReceiveBufferSize(DatabaseDescriptor.getInternodeRecvBufferSize());
             }
             catch (SocketException se)
             {
@@ -190,8 +190,13 @@ public class IncomingTcpConnection extends Thread
         if (version < MessagingService.VERSION_12)
             input.readInt(); // size of entire message. in 1.0+ this is just a placeholder
 
-        String id = input.readUTF();
-        long timestamp = System.currentTimeMillis();;
+        int id;
+        if (version < MessagingService.VERSION_20)
+            id = Integer.valueOf(input.readUTF());
+        else
+            id = input.readInt();
+
+        long timestamp = System.currentTimeMillis();
         if (version >= MessagingService.VERSION_12)
         {
             // make sure to readInt, even if cross_node_to is not enabled

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e6abdda..d7645b6 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -26,9 +26,9 @@ public class MessageDeliveryTask implements Runnable
 
     private final MessageIn message;
     private final long constructionTime;
-    private final String id;
+    private final int id;
 
-    public MessageDeliveryTask(MessageIn message, String id, long timestamp)
+    public MessageDeliveryTask(MessageIn message, int id, long timestamp)
     {
         assert message != null;
         this.message = message;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 38e376f..5c43035 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -52,7 +52,7 @@ public class MessageIn<T>
         return new MessageIn<T>(from, payload, parameters, verb, version);
     }
 
-    public static <T2> MessageIn<T2> read(DataInput in, int version, String id) throws IOException
+    public static <T2> MessageIn<T2> read(DataInput in, int version, int id) throws IOException
     {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 84438b9..7db956c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -79,7 +79,7 @@ public final class MessagingService implements MessagingServiceMBean
     /**
      * we preface every message with this number so the recipient can validate the sender is sane
      */
-    static final int PROTOCOL_MAGIC = 0xCA552DFA;
+    public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* All verb handler identifiers */
     public enum Verb
@@ -123,8 +123,6 @@ public final class MessagingService implements MessagingServiceMBean
         // remember to add new verbs at the end, since we serialize by ordinal
     }
 
-    public static final Verb[] VERBS = Verb.values();
-
     public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
     {{
         put(Verb.MUTATION, Stage.MUTATION);
@@ -213,7 +211,7 @@ public final class MessagingService implements MessagingServiceMBean
     }};
 
     /* This records all the results mapped by message Id */
-    private final ExpiringMap<String, CallbackInfo> callbacks;
+    private final ExpiringMap<Integer, CallbackInfo> callbacks;
 
     /**
      * a placeholder class that means "deserialize using the callback." We can't implement this without
@@ -316,9 +314,9 @@ public final class MessagingService implements MessagingServiceMBean
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
-        Function<Pair<String, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<String, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
+        Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
         {
-            public Object apply(Pair<String, ExpiringMap.CacheableObject<CallbackInfo>> pair)
+            public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
                 CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
@@ -336,7 +334,7 @@ public final class MessagingService implements MessagingServiceMBean
             }
         };
 
-        callbacks = new ExpiringMap<String, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
+        callbacks = new ExpiringMap<Integer, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -411,7 +409,7 @@ public final class MessagingService implements MessagingServiceMBean
             logger.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort());
         }
 
-        ServerSocketChannel serverChannel = null;
+        ServerSocketChannel serverChannel;
         try
         {
             serverChannel = ServerSocketChannel.open();
@@ -515,9 +513,9 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
+    public int addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout)
     {
-        String messageId = nextId();
+        int messageId = nextId();
         CallbackInfo previous;
 
         // If HH is enabled and this is a mutation message => store the message to track for potential hints.
@@ -532,16 +530,15 @@ public final class MessagingService implements MessagingServiceMBean
 
     private static final AtomicInteger idGen = new AtomicInteger(0);
 
-    // TODO make these integers to avoid unnecessary int -> string -> int conversions
-    private static String nextId()
+    private static int nextId()
     {
-        return Integer.toString(idGen.incrementAndGet());
+        return idGen.incrementAndGet();
     }
 
     /*
      * @see #sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
      */
-    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
+    public int sendRR(MessageOut message, InetAddress to, IMessageCallback cb)
     {
         return sendRR(message, to, cb, message.getTimeout());
     }
@@ -552,6 +549,7 @@ public final class MessagingService implements MessagingServiceMBean
      * Also holds the message (only mutation messages) to determine if it
      * needs to trigger a hint (uses StorageProxy for that).
      *
+     *
      * @param message message to be sent.
      * @param to      endpoint to which the message needs to be sent
      * @param cb      callback interface which is used to pass the responses or
@@ -560,9 +558,9 @@ public final class MessagingService implements MessagingServiceMBean
      * @param timeout the timeout used for expiration
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
     {
-        String id = addCallback(cb, message, to, timeout);
+        int id = addCallback(cb, message, to, timeout);
 
         if (cb instanceof AbstractWriteResponseHandler)
         {
@@ -582,7 +580,7 @@ public final class MessagingService implements MessagingServiceMBean
         sendOneWay(message, nextId(), to);
     }
 
-    public void sendReply(MessageOut message, String id, InetAddress to)
+    public void sendReply(MessageOut message, int id, InetAddress to)
     {
         sendOneWay(message, id, to);
     }
@@ -594,7 +592,7 @@ public final class MessagingService implements MessagingServiceMBean
      * @param message messages to be sent.
      * @param to      endpoint to which the message needs to be sent
      */
-    public void sendOneWay(MessageOut message, String id, InetAddress to)
+    public void sendOneWay(MessageOut message, int id, InetAddress to)
     {
         if (logger.isTraceEnabled())
             logger.trace(FBUtilities.getBroadcastAddress() + " sending " + message.verb + " to " + id + "@" + to);
@@ -618,7 +616,7 @@ public final class MessagingService implements MessagingServiceMBean
 
     public <T> IAsyncResult<T> sendRR(MessageOut message, InetAddress to)
     {
-        IAsyncResult<T> iar = new AsyncResult();
+        IAsyncResult<T> iar = new AsyncResult<T>();
         sendRR(message, to, iar);
         return iar;
     }
@@ -699,7 +697,7 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
-    public void receive(MessageIn message, String id, long timestamp)
+    public void receive(MessageIn message, int id, long timestamp)
     {
         Tracing.instance().initializeFromMessage(message);
         Tracing.trace("Message received from {}", message.from);
@@ -729,22 +727,22 @@ public final class MessagingService implements MessagingServiceMBean
         stage.execute(runnable);
     }
 
-    public void setCallbackForTests(String messageId, CallbackInfo callback)
+    public void setCallbackForTests(int messageId, CallbackInfo callback)
     {
         callbacks.put(messageId, callback);
     }
 
-    public CallbackInfo getRegisteredCallback(String messageId)
+    public CallbackInfo getRegisteredCallback(int messageId)
     {
         return callbacks.get(messageId);
     }
 
-    public CallbackInfo removeRegisteredCallback(String messageId)
+    public CallbackInfo removeRegisteredCallback(int messageId)
     {
         return callbacks.remove(messageId);
     }
 
-    public long getRegisteredCallbackAge(String messageId)
+    public long getRegisteredCallbackAge(int messageId)
     {
         return callbacks.getAge(messageId);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 2da678c..0cc974f 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -77,7 +77,7 @@ public class OutboundTcpConnection extends Thread
         return remoteDC.equals(localDC);
     }
 
-    public void enqueue(MessageOut<?> message, String id)
+    public void enqueue(MessageOut<?> message, int id)
     {
         expireMessages();
         try
@@ -95,12 +95,12 @@ public class OutboundTcpConnection extends Thread
         active.clear();
         backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
-        enqueue(CLOSE_SENTINEL, null);
+        enqueue(CLOSE_SENTINEL, -1);
     }
 
     void softCloseSocket()
     {
-        enqueue(CLOSE_SENTINEL, null);
+        enqueue(CLOSE_SENTINEL, -1);
     }
 
     public int getTargetVersion()
@@ -201,7 +201,7 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    public static void write(MessageOut message, String id, long timestamp, DataOutputStream out, int version) throws IOException
+    public static void write(MessageOut message, int id, long timestamp, DataOutputStream out, int version) throws IOException
     {
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
         if (version < MessagingService.VERSION_12)
@@ -211,7 +211,11 @@ public class OutboundTcpConnection extends Thread
             out.writeInt(-1);
         }
 
-        out.writeUTF(id);
+        if (version < MessagingService.VERSION_20)
+            out.writeUTF(String.valueOf(id));
+        else
+            out.writeInt(id);
+
         if (version >= MessagingService.VERSION_12)
         {
             // int cast cuts off the high-order half of the timestamp, which we can assume remains
@@ -280,7 +284,7 @@ public class OutboundTcpConnection extends Thread
                 {
                     try
                     {
-                        socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize().intValue());
+                        socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize());
                     }
                     catch (SocketException se)
                     {
@@ -369,10 +373,10 @@ public class OutboundTcpConnection extends Thread
     private static class QueuedMessage
     {
         final MessageOut<?> message;
-        final String id;
+        final int id;
         final long timestamp;
 
-        QueuedMessage(MessageOut<?> message, String id, long timestamp)
+        QueuedMessage(MessageOut<?> message, int id, long timestamp)
         {
             this.message = message;
             this.id = id;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index d0931c3..b3a5f32 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -26,7 +26,7 @@ public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger = LoggerFactory.getLogger( ResponseVerbHandler.class );
 
-    public void doVerb(MessageIn message, String id)
+    public void doVerb(MessageIn message, int id)
     {
         long latency = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
         CallbackInfo callbackInfo = MessagingService.instance().removeRegisteredCallback(id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
index 74d7c70..721360c 100644
--- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
+++ b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.net.MessageOut;
 
 public interface IMessageSink
 {
-    public MessageOut handleMessage(MessageOut message, String id, InetAddress to);
+    public MessageOut handleMessage(MessageOut message, int id, InetAddress to);
 
-    public MessageIn handleMessage(MessageIn message, String id, InetAddress to);
+    public MessageIn handleMessage(MessageIn message, int id, InetAddress to);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/net/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java
index dc3085c..7b67afe 100644
--- a/src/java/org/apache/cassandra/net/sink/SinkManager.java
+++ b/src/java/org/apache/cassandra/net/sink/SinkManager.java
@@ -38,7 +38,7 @@ public class SinkManager
         sinks.clear();
     }
 
-    public static MessageOut processOutboundMessage(MessageOut message, String id, InetAddress to)
+    public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to)
     {
         if (sinks.isEmpty())
             return message;
@@ -52,7 +52,7 @@ public class SinkManager
         return message;
     }
 
-    public static MessageIn processInboundMessage(MessageIn message, String id)
+    public static MessageIn processInboundMessage(MessageIn message, int id)
     {
         if (sinks.isEmpty())
             return message;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index a7df82d..f564824 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -457,7 +457,7 @@ public class AntiEntropyService
         /**
          * Trigger a validation compaction which will return the tree upon completion.
          */
-        public void doVerb(MessageIn<TreeRequest> message, String id)
+        public void doVerb(MessageIn<TreeRequest> message, int id)
         {
             TreeRequest remotereq = message.payload;
             TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.cf);
@@ -476,7 +476,7 @@ public class AntiEntropyService
      */
     public static class TreeResponseVerbHandler implements IVerbHandler<Validator>
     {
-        public void doVerb(MessageIn<Validator> message, String id)
+        public void doVerb(MessageIn<Validator> message, int id)
         {
             // deserialize the remote tree, and register it
             Validator response = message.payload;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 6d03009..2e3a921 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -34,7 +34,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
-    public void doVerb(MessageIn<IndexScanCommand> message, String id)
+    public void doVerb(MessageIn<IndexScanCommand> message, int id)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/PBSPredictor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PBSPredictor.java b/src/java/org/apache/cassandra/service/PBSPredictor.java
index 64d19de..85ef304 100644
--- a/src/java/org/apache/cassandra/service/PBSPredictor.java
+++ b/src/java/org/apache/cassandra/service/PBSPredictor.java
@@ -185,11 +185,11 @@ public class PBSPredictor implements PBSPredictorMBean
     }
 
     // used for LRU replacement
-    private final Queue<String> writeMessageIds = new LinkedBlockingQueue<String>();
-    private final Queue<String> readMessageIds = new LinkedBlockingQueue<String>();
+    private final Queue<Integer> writeMessageIds = new LinkedBlockingQueue<Integer>();
+    private final Queue<Integer> readMessageIds = new LinkedBlockingQueue<Integer>();
 
-    private final Map<String, MessageLatencyCollection> messageIdToWriteLats = new ConcurrentHashMap<String, MessageLatencyCollection>();
-    private final Map<String, MessageLatencyCollection> messageIdToReadLats = new ConcurrentHashMap<String, MessageLatencyCollection>();
+    private final Map<Integer, MessageLatencyCollection> messageIdToWriteLats = new ConcurrentHashMap<Integer, MessageLatencyCollection>();
+    private final Map<Integer, MessageLatencyCollection> messageIdToReadLats = new ConcurrentHashMap<Integer, MessageLatencyCollection>();
 
     private Random random;
     private boolean initialized = false;
@@ -455,7 +455,7 @@ public class PBSPredictor implements PBSPredictorMBean
                                        percentileLatency);
     }
 
-    public void startWriteOperation(String id)
+    public void startWriteOperation(int id)
     {
         if (!logLatencies)
             return;
@@ -463,7 +463,7 @@ public class PBSPredictor implements PBSPredictorMBean
         startWriteOperation(id, System.currentTimeMillis());
     }
 
-    public void startWriteOperation(String id, long startTime)
+    public void startWriteOperation(int id, long startTime)
     {
         if (!logLatencies)
             return;
@@ -476,14 +476,14 @@ public class PBSPredictor implements PBSPredictorMBean
         // the maximum number of entries is sloppy, but that's acceptable for our purposes
         if (writeMessageIds.size() > maxLoggedLatencies)
         {
-            String toEvict = writeMessageIds.remove();
+            Integer toEvict = writeMessageIds.remove();
             messageIdToWriteLats.remove(toEvict);
         }
 
         messageIdToWriteLats.put(id, new MessageLatencyCollection(startTime));
     }
 
-    public void startReadOperation(String id)
+    public void startReadOperation(int id)
     {
         if (!logLatencies)
             return;
@@ -491,7 +491,7 @@ public class PBSPredictor implements PBSPredictorMBean
         startReadOperation(id, System.currentTimeMillis());
     }
 
-    public void startReadOperation(String id, long startTime)
+    public void startReadOperation(int id, long startTime)
     {
         if (!logLatencies)
             return;
@@ -503,14 +503,14 @@ public class PBSPredictor implements PBSPredictorMBean
         // the maximum number of entries is sloppy, but that's acceptable for our purposes
         if (readMessageIds.size() > maxLoggedLatencies)
         {
-            String toEvict = readMessageIds.remove();
+            Integer toEvict = readMessageIds.remove();
             messageIdToReadLats.remove(toEvict);
         }
 
         messageIdToReadLats.put(id, new MessageLatencyCollection(startTime));
     }
 
-    public void logWriteResponse(String id, long constructionTime)
+    public void logWriteResponse(int id, long constructionTime)
     {
         if (!logLatencies)
             return;
@@ -518,7 +518,7 @@ public class PBSPredictor implements PBSPredictorMBean
         logWriteResponse(id, constructionTime, System.currentTimeMillis());
     }
 
-    public void logWriteResponse(String id, long responseCreationTime, long receivedTime)
+    public void logWriteResponse(int id, long responseCreationTime, long receivedTime)
     {
         if (!logLatencies)
             return;
@@ -534,7 +534,7 @@ public class PBSPredictor implements PBSPredictorMBean
         writeLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime));
     }
 
-    public void logReadResponse(String id, long constructionTime)
+    public void logReadResponse(int id, long constructionTime)
     {
         if (!logLatencies)
             return;
@@ -542,7 +542,7 @@ public class PBSPredictor implements PBSPredictorMBean
         logReadResponse(id, constructionTime, System.currentTimeMillis());
     }
 
-    public void logReadResponse(String id, long responseCreationTime, long receivedTime)
+    public void logReadResponse(int id, long responseCreationTime, long receivedTime)
     {
         if (!logLatencies)
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index d8588a8..a095680 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -46,7 +46,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
             return cfs.getRangeSlice(command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging);
     }
 
-    public void doVerb(MessageIn<RangeSliceCommand> message, String id)
+    public void doVerb(MessageIn<RangeSliceCommand> message, int id)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index f15e8c5..a24164f 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -32,7 +32,7 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
     private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
 
-    public void doVerb(MessageIn<SnapshotCommand> message, String id)
+    public void doVerb(MessageIn<SnapshotCommand> message, int id)
     {
         SnapshotCommand command = message.payload;
         if (command.clear_snapshot)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f157049..649cfde 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -623,7 +623,8 @@ public class StorageProxy implements StorageProxyMBean
         InetAddress target = iter.next();
 
         // direct writes to local DC or old Cassandra versions
-        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_11)
+        // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
+        if (localDC || MessagingService.instance().getVersion(target) < MessagingService.VERSION_20)
         {
             // yes, the loop and non-loop code here are the same; this is clunky but we want to avoid
             // creating a second iterator since we already have a perfectly good one
@@ -644,13 +645,13 @@ public class StorageProxy implements StorageProxyMBean
         {
             InetAddress destination = iter.next();
             CompactEndpointSerializationHelper.serialize(destination, dos);
-            String id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
-            dos.writeUTF(id);
+            int id = MessagingService.instance().addCallback(handler, message, destination, message.getTimeout());
+            dos.writeInt(id);
             logger.trace("Adding FWD message to {}@{}", id, destination);
         }
         message = message.withParameter(RowMutation.FORWARD_TO, bos.toByteArray());
         // send the combined message + forward headers
-        String id = MessagingService.instance().sendRR(message, target, handler);
+        int id = MessagingService.instance().sendRR(message, target, handler);
         logger.trace("Sending message to {}@{}", id, target);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 8472d54..4f40162 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -202,15 +204,19 @@ public class FileStreamTask extends WrappedRunnable
         }
     }
 
+    public static void sendReply(MessageOut message, DataOutputStream out) throws IOException
+    {
+        out.writeInt(MessagingService.PROTOCOL_MAGIC);
+        message.serialize(out, MessagingService.current_version);
+    }
+
     protected void receiveReply() throws IOException
     {
         MessagingService.validateMagic(input.readInt());
-        String id = input.readUTF();
-        input.readInt(); // skip timestamp
         // since we reject streaming with different version, using current_version here is fine
-        MessageIn message = MessageIn.read(input, MessagingService.current_version, id);
+        MessageIn message = MessageIn.read(input, MessagingService.current_version, -1);
         assert message.verb == MessagingService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
-        handler.doVerb(message, id);
+        handler.doVerb(message, -1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index a7ce4d1..8dca4d6 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -38,8 +38,6 @@ import org.apache.cassandra.db.compaction.PrecompactedRow;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -67,11 +65,7 @@ public class IncomingStreamReader
             if (!StreamInSession.hasSession(header.sessionId))
             {
                 StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
-                OutboundTcpConnection.write(reply.createMessage(),
-                                            header.sessionId.toString(),
-                                            System.currentTimeMillis(),
-                                            new DataOutputStream(socket.getOutputStream()),
-                                            MessagingService.instance().getVersion(host));
+                FileStreamTask.sendReply(reply.createMessage(), new DataOutputStream(socket.getOutputStream()));
                 throw new IOException("Session " + header.sessionId + " already closed.");
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
index ff68eaf..4297b34 100644
--- a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
@@ -30,7 +30,7 @@ public class ReplicationFinishedVerbHandler implements IVerbHandler
 {
     private static final Logger logger = LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
 
-    public void doVerb(MessageIn msg, String id)
+    public void doVerb(MessageIn msg, int id)
     {
         StorageService.instance.confirmReplication(msg.from);
         MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 96c31da..370183f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.OutboundTcpConnection;
 import org.apache.cassandra.utils.UUIDGen;
 
 /** each context gets its own StreamInSession. So there may be >1 Session per host */
@@ -148,11 +147,8 @@ public class StreamInSession extends AbstractStreamSession
     public void sendMessage(MessageOut<StreamReply> message) throws IOException
     {
         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
-        OutboundTcpConnection.write(message,
-                                    String.valueOf(getSessionId()),
-                                    System.currentTimeMillis(),
-                                    out,
-                                    MessagingService.instance().getVersion(getHost()));
+        FileStreamTask.sendReply(message,
+                                 out);
         out.flush();
     }
 
@@ -200,11 +196,8 @@ public class StreamInSession extends AbstractStreamSession
             try
             {
                 if (socket != null)
-                    OutboundTcpConnection.write(reply.createMessage(),
-                                                sessionId.toString(),
-                                                System.currentTimeMillis(),
-                                                new DataOutputStream(socket.getOutputStream()),
-                                                MessagingService.instance().getVersion(getHost()));
+                    FileStreamTask.sendReply(reply.createMessage(),
+                                             new DataOutputStream(socket.getOutputStream()));
                 else
                     logger.debug("No socket to reply to {} with!", getHost());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index ebcee8a..b69b6d0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -27,7 +27,7 @@ public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
 
-    public void doVerb(MessageIn<StreamReply> message, String id)
+    public void doVerb(MessageIn<StreamReply> message, int id)
     {
         StreamReply reply = message.payload;
         logger.debug("Received StreamReply {}", reply);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
index 023f523..bbed34d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
@@ -31,7 +31,7 @@ public class StreamRequestVerbHandler implements IVerbHandler<StreamRequest>
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
 
-    public void doVerb(MessageIn<StreamRequest> message, String id)
+    public void doVerb(MessageIn<StreamRequest> message, int id)
     {
         if (logger.isDebugEnabled())
             logger.debug("Received a StreamRequestMessage from {}", message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 5d456a6..bbc03fb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -193,7 +193,7 @@ public class StreamingRepairTask implements Runnable
 
     public static class StreamingRepairRequest implements IVerbHandler<StreamingRepairTask>
     {
-        public void doVerb(MessageIn<StreamingRepairTask> message, String id)
+        public void doVerb(MessageIn<StreamingRepairTask> message, int id)
         {
             StreamingRepairTask task = message.payload;
             assert task.src.equals(FBUtilities.getBroadcastAddress());
@@ -208,7 +208,7 @@ public class StreamingRepairTask implements Runnable
 
     public static class StreamingRepairResponse implements IVerbHandler<UUID>
     {
-        public void doVerb(MessageIn<UUID> message, String id)
+        public void doVerb(MessageIn<UUID> message, int id)
         {
             UUID taskid = message.payload;
             StreamingRepairTask task = tasks.get(taskid);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index 429b6a5..5efc15c 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -107,7 +107,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("db.RangeSliceCommand.bin");
         for (int i = 0; i < 6; i++)
-            MessageIn.read(in, getVersion(), "id");
+            MessageIn.read(in, getVersion(), -1);
         in.close();
     }
 
@@ -141,8 +141,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert SliceByNamesReadCommand.serializer.deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer.deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 
@@ -177,8 +177,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert SliceFromReadCommand.serializer.deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer.deserialize(in, getVersion()) != null;
         assert ReadCommand.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 
@@ -260,12 +260,12 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert RowMutation.serializer.deserialize(in, getVersion()) != null;
         assert RowMutation.serializer.deserialize(in, getVersion()) != null;
         assert RowMutation.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 
@@ -301,14 +301,14 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert Truncation.serializer.deserialize(in, getVersion()) != null;
         assert TruncateResponse.serializer.deserialize(in, getVersion()) != null;
         assert TruncateResponse.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
 
         // set up some fake callbacks so deserialization knows that what it's deserializing is a TruncateResponse
-        MessagingService.instance().setCallbackForTests("tr1", new CallbackInfo(null, null, TruncateResponse.serializer));
-        MessagingService.instance().setCallbackForTests("tr2", new CallbackInfo(null, null, TruncateResponse.serializer));
+        MessagingService.instance().setCallbackForTests(1, new CallbackInfo(null, null, TruncateResponse.serializer));
+        MessagingService.instance().setCallbackForTests(2, new CallbackInfo(null, null, TruncateResponse.serializer));
 
-        assert MessageIn.read(in, getVersion(), "tr1") != null;
-        assert MessageIn.read(in, getVersion(), "tr2") != null;
+        assert MessageIn.read(in, getVersion(), 1) != null;
+        assert MessageIn.read(in, getVersion(), 2) != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
index 92e863d..a0e2ef0 100644
--- a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
+++ b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
@@ -25,13 +25,13 @@ public class PBSPredictorTest
 {
     private static PBSPredictor predictor = PBSPredictor.instance();
 
-    private void createWriteResponse(long W, long A, String id)
+    private void createWriteResponse(long W, long A, int id)
     {
         predictor.startWriteOperation(id, 0);
         predictor.logWriteResponse(id, W, W+A);
     }
 
-    private void createReadResponse(long R, long S, String id)
+    private void createReadResponse(long R, long S, int id)
     {
         predictor.startReadOperation(id, 0);
         predictor.logReadResponse(id, R, R+S);
@@ -51,13 +51,13 @@ public class PBSPredictorTest
 
             for (int i = 0; i < 10; ++i)
             {
-                createWriteResponse(10, 0, String.format("W%d", i));
-                createReadResponse(0, 0, String.format("R%d", i));
+                createWriteResponse(10, 0, 10 * i);
+                createReadResponse(0, 0, 10 * i + 1);
             }
 
             for (int i = 0; i < 10; ++i)
             {
-                createWriteResponse(0, 0, String.format("WS%d", i));
+                createWriteResponse(0, 0, 10 * i + 2);
             }
 
             // 10ms after write
@@ -78,7 +78,7 @@ public class PBSPredictorTest
 
             for (int i = 0; i < 10; ++i)
             {
-                createWriteResponse(20, 0, String.format("WL%d", i));
+                createWriteResponse(20, 0, 10 * i + 3);
             }
 
             // 5ms after write
@@ -94,8 +94,8 @@ public class PBSPredictorTest
 
             for (int i = 0; i < 10; ++i)
             {
-                createWriteResponse(100, 100, String.format("WVL%d", i));
-                createReadResponse(100, 100, String.format("RL%d", i));
+                createWriteResponse(100, 100, 10 * i + 4);
+                createReadResponse(100, 100, 10 * i + 5);
             }
 
             result = predictor.doPrediction(2,1,1,0f,1, 0.99f);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 4e21a7b..278c8f1 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -165,7 +165,7 @@ public class RemoveTest
      */
     class ReplicationSink implements IMessageSink
     {
-        public MessageIn handleMessage(MessageIn msg, String id, InetAddress to)
+        public MessageIn handleMessage(MessageIn msg, int id, InetAddress to)
         {
             if (!msg.verb.equals(MessagingService.Verb.STREAM_REQUEST))
                 return msg;
@@ -175,7 +175,7 @@ public class RemoveTest
             return null;
         }
 
-        public MessageOut handleMessage(MessageOut msg, String id, InetAddress to)
+        public MessageOut handleMessage(MessageOut msg, int id, InetAddress to)
         {
             return msg;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 4553dd1..5d8c23b 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -61,7 +61,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("service.TreeRequest.bin");
         assert AntiEntropyService.TreeRequest.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 
@@ -98,8 +98,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         DataInputStream in = getInput("service.TreeResponse.bin");
         assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
         assert AntiEntropyService.Validator.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1936648a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index a81feba..6db5b15 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -143,7 +143,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
         DataInputStream in = getInput("streaming.StreamReply.bin");
         assert StreamReply.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 
@@ -192,9 +192,9 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
         assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
         assert StreamRequest.serializer.deserialize(in, getVersion()) != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
-        assert MessageIn.read(in, getVersion(), "id") != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
+        assert MessageIn.read(in, getVersion(), -1) != null;
         in.close();
     }
 


Mime
View raw message