cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject cassandra git commit: Add a key-value payload for third party usage
Date Tue, 24 Mar 2015 11:04:06 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk a672097db -> 789aa72e5


Add a key-value payload for third party usage

Patch by Robert Stupp; Reviewed by Sylvain Lebresne for CASSANDRA-8553


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/789aa72e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/789aa72e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/789aa72e

Branch: refs/heads/trunk
Commit: 789aa72e56b3bbb8811c96f82ec96a10207b8a0e
Parents: a672097
Author: Robert Stupp <snazy@snazy.de>
Authored: Tue Mar 24 12:02:08 2015 +0100
Committer: Robert Stupp <snazy@snazy.de>
Committed: Tue Mar 24 12:02:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/native_protocol_v4.spec                     |  12 +
 .../org/apache/cassandra/cql3/QueryHandler.java |  19 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  21 ++
 .../cassandra/thrift/CassandraServer.java       |  18 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  34 +++
 .../org/apache/cassandra/transport/Frame.java   |   3 +-
 .../org/apache/cassandra/transport/Message.java |  44 +++-
 .../transport/messages/BatchMessage.java        |   6 +-
 .../transport/messages/ExecuteMessage.java      |   5 +-
 .../transport/messages/PrepareMessage.java      |   3 +-
 .../transport/messages/QueryMessage.java        |   3 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |   6 +-
 .../cassandra/transport/MessagePayloadTest.java | 257 +++++++++++++++++++
 14 files changed, 405 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2944c8..d39db88 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Add a key-value payload for third party usage (CASSANDRA-8553)
  * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)
  * Partition intra-cluster message streams by size, not type (CASSANDRA-8789)
  * Add WriteFailureException to native protocol, notify coordinator of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 69adc17..ed089ae 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -124,6 +124,15 @@ Table of Contents
           a tracing ID. The tracing ID is a [uuid] and is the first thing in
           the frame body. The rest of the body will then be the usual body
           corresponding to the response opcode.
+    0x04: Custom payload flag. For a request or response frame, this indicates
+          that generic key-value custom payload for a custom QueryHandler
+          implementation is present in the frame. Such custom payload is simply
+          ignored by the default QueryHandler implementation.
+          Currently, only QUERY, PREPARE, EXECUTE and BATCH requests support
+          payload.
+          If both trace-flag and payload-flag are set, the generic key-value
+          payload appears after trace's data.
+          Type of custom payload is [bytes map] (see below).
 
   The rest of the flags is currently unused and ignored.
 
@@ -228,6 +237,8 @@ Table of Contents
                       are [string].
     [string multimap] A [short] n, followed by n pair <k><v> where <k>
is a
                       [string] and <v> is a [string list].
+    [bytes map]       A [short] n, followed by n pair <k><v> where <k>
is a
+                      [string] and <v> is a [bytes].
 
 
 4. Messages
@@ -1113,3 +1124,4 @@ Table of Contents
     has been modified, and now includes changes related to user defined functions and user
defined aggregates.
   * Read_failure error code was added.
   * Function_failure error code was added.
+  * Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's
standard QueryHandler)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index d42d90e..8b579d7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.util.Map;
+
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
@@ -27,10 +29,15 @@ import org.apache.cassandra.utils.MD5Digest;
 
 public interface QueryHandler
 {
-    public ResultMessage process(String query, QueryState state, QueryOptions options) throws
RequestExecutionException, RequestValidationException;
-    public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException;
-    public ParsedStatement.Prepared getPrepared(MD5Digest id);
-    public ParsedStatement.Prepared getPreparedForThrift(Integer id);
-    public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions
options) throws RequestExecutionException, RequestValidationException;
-    public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions
options) throws RequestExecutionException, RequestValidationException;
+    ResultMessage process(String query, QueryState state, QueryOptions options, Map<String,
byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
+
+    ResultMessage.Prepared prepare(String query, QueryState state, Map<String, byte[]>
customPayload) throws RequestValidationException;
+
+    ParsedStatement.Prepared getPrepared(MD5Digest id);
+
+    ParsedStatement.Prepared getPreparedForThrift(Integer id);
+
+    ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
+
+    ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 90418c2..fcda13d 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -240,6 +241,11 @@ public class QueryProcessor implements QueryHandler
         return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl,
Collections.<ByteBuffer>emptyList()));
     }
 
+    public ResultMessage process(String query, QueryState state, QueryOptions options, Map<String,
byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+    {
+        return process(query, state, options);
+    }
+
     public ResultMessage process(String queryString, QueryState queryState, QueryOptions
options)
     throws RequestExecutionException, RequestValidationException
     {
@@ -345,6 +351,11 @@ public class QueryProcessor implements QueryHandler
         return UntypedResultSet.create(cqlRows);
     }
 
+    public ResultMessage.Prepared prepare(String query, QueryState state, Map<String,
byte[]> customPayload) throws RequestValidationException
+    {
+        return prepare(query, state);
+    }
+
     public ResultMessage.Prepared prepare(String queryString, QueryState queryState)
     {
         ClientState cState = queryState.getClientState();
@@ -420,6 +431,11 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
+    public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+    {
+        return processPrepared(statement, state, options);
+    }
+
     public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions
options)
     throws RequestExecutionException, RequestValidationException
     {
@@ -443,6 +459,11 @@ public class QueryProcessor implements QueryHandler
         return processStatement(statement, queryState, options);
     }
 
+    public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+    {
+        return processBatch(statement, state, options);
+    }
+
     public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions
options)
     throws RequestExecutionException, RequestValidationException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 6546932..5b49ae3 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1891,7 +1891,10 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             ThriftClientState cState = state();
-            return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(),
QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
+            return ClientState.getCQLQueryHandler().process(queryString,
+                                                            cState.getQueryState(),
+                                                            QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel),
Collections.<ByteBuffer>emptyList()),
+                                                            null).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -1922,7 +1925,9 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             cState.validateLogin();
-            return cState.getCQLQueryHandler().prepare(queryString, cState.getQueryState()).toThriftPreparedResult();
+            return ClientState.getCQLQueryHandler().prepare(queryString,
+                                                       cState.getQueryState(),
+                                                       null).toThriftPreparedResult();
         }
         catch (RequestValidationException e)
         {
@@ -1950,7 +1955,7 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
             ThriftClientState cState = state();
-            ParsedStatement.Prepared prepared = cState.getCQLQueryHandler().getPreparedForThrift(itemId);
+            ParsedStatement.Prepared prepared = ClientState.getCQLQueryHandler().getPreparedForThrift(itemId);
 
             if (prepared == null)
                 throw new InvalidRequestException(String.format("Prepared query with ID %d
not found" +
@@ -1959,9 +1964,10 @@ public class CassandraServer implements Cassandra.Iface
                                                                 itemId));
             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
prepared.statement.getBoundTerms());
 
-            return cState.getCQLQueryHandler().processPrepared(prepared.statement,
-                                                               cState.getQueryState(),
-                                                               QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel),
bindVariables)).toThriftResult();
+            return ClientState.getCQLQueryHandler().processPrepared(prepared.statement,
+                                                                    cState.getQueryState(),
+                                                                    QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel),
bindVariables),
+                                                                    null).toThriftResult();
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index f9425c3..b37e0da 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -181,6 +181,40 @@ public abstract class CBUtil
         return 2 + bytes.length;
     }
 
+    public static Map<String, byte[]> readBytesMap(ByteBuf cb)
+    {
+        int length = cb.readUnsignedShort();
+        Map<String, byte[]> m = new HashMap<>(length);
+        for (int i = 0; i < length; i++)
+        {
+            String k = readString(cb);
+            byte[] v = readBytes(cb);
+            m.put(k, v);
+        }
+        return m;
+    }
+
+    public static void writeBytesMap(Map<String, byte[]> m, ByteBuf cb)
+    {
+        cb.writeShort(m.size());
+        for (Map.Entry<String, byte[]> entry : m.entrySet())
+        {
+            writeString(entry.getKey(), cb);
+            writeBytes(entry.getValue(), cb);
+        }
+    }
+
+    public static int sizeOfBytesMap(Map<String, byte[]> m)
+    {
+        int size = 2;
+        for (Map.Entry<String, byte[]> entry : m.entrySet())
+        {
+            size += sizeOfString(entry.getKey());
+            size += sizeOfBytes(entry.getValue());
+        }
+        return size;
+    }
+
     public static ConsistencyLevel readConsistencyLevel(ByteBuf cb)
     {
         return ConsistencyLevel.fromCode(cb.readUnsignedShort());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 7591c83..b72259d 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -111,7 +111,8 @@ public class Frame
         {
             // The order of that enum matters!!
             COMPRESSED,
-            TRACING;
+            TRACING,
+            CUSTOM_PAYLOAD;
 
             private static final Flag[] ALL_VALUES = values();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index d890737..3382593 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -146,6 +147,7 @@ public abstract class Message
     protected Connection connection;
     private int streamId;
     private Frame sourceFrame;
+    private Map<String, byte[]> customPayload;
 
     protected Message(Type type)
     {
@@ -183,6 +185,16 @@ public abstract class Message
         return sourceFrame;
     }
 
+    public Map<String, byte[]> getCustomPayload()
+    {
+        return customPayload;
+    }
+
+    public void setCustomPayload(Map<String, byte[]> customPayload)
+    {
+        this.customPayload = customPayload;
+    }
+
     public static abstract class Request extends Message
     {
         protected boolean tracingRequested;
@@ -239,14 +251,20 @@ public abstract class Message
         {
             boolean isRequest = frame.header.type.direction == Direction.REQUEST;
             boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
+            boolean isCustomPayload = frame.header.flags.contains(Frame.Header.Flag.CUSTOM_PAYLOAD);
 
             UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
+            Map<String, byte[]> customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body);
 
             try
             {
+                if (isCustomPayload && frame.header.version < Server.VERSION_4)
+                    throw new ProtocolException("Received frame with CUSTOM_PAYLOAD flag
for native protocol version < 4");
+
                 Message message = frame.header.type.codec.decode(frame.body, frame.header.version);
                 message.setStreamId(frame.header.streamId);
                 message.setSourceFrame(frame);
+                message.setCustomPayload(customPayload);
 
                 if (isRequest)
                 {
@@ -294,23 +312,41 @@ public abstract class Message
                 if (message instanceof Response)
                 {
                     UUID tracingId = ((Response)message).getTracingId();
+                    Map<String, byte[]> customPayload = message.getCustomPayload();
+                    if (tracingId != null)
+                        messageSize += CBUtil.sizeOfUUID(tracingId);
+                    if (customPayload != null)
+                    {
+                        if (version < Server.VERSION_4)
+                            throw new ProtocolException("Must not send frame with CUSTOM_PAYLOAD
flag for native protocol version < 4");
+                        messageSize += CBUtil.sizeOfBytesMap(customPayload);
+                    }
+                    body = CBUtil.allocator.buffer(messageSize);
                     if (tracingId != null)
                     {
-                        body = CBUtil.allocator.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
                         CBUtil.writeUUID(tracingId, body);
                         flags.add(Frame.Header.Flag.TRACING);
                     }
-                    else
+                    if (customPayload != null)
                     {
-                        body = CBUtil.allocator.buffer(messageSize);
+                        CBUtil.writeBytesMap(customPayload, body);
+                        flags.add(Frame.Header.Flag.CUSTOM_PAYLOAD);
                     }
                 }
                 else
                 {
                     assert message instanceof Request;
-                    body = CBUtil.allocator.buffer(messageSize);
                     if (((Request)message).isTracingRequested())
                         flags.add(Frame.Header.Flag.TRACING);
+                    Map<String, byte[]> payload = message.getCustomPayload();
+                    if (payload != null)
+                        messageSize += CBUtil.sizeOfBytesMap(payload);
+                    body = CBUtil.allocator.buffer(messageSize);
+                    if (payload != null)
+                    {
+                        CBUtil.writeBytesMap(payload, body);
+                        flags.add(Frame.Header.Flag.CUSTOM_PAYLOAD);
+                    }
                 }
 
                 try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 64b0826..3acdbdd 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -50,8 +50,8 @@ public class BatchMessage extends Message.Request
 
             byte type = body.readByte();
             int n = body.readUnsignedShort();
-            List<Object> queryOrIds = new ArrayList<Object>(n);
-            List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n);
+            List<Object> queryOrIds = new ArrayList<>(n);
+            List<List<ByteBuffer>> variables = new ArrayList<>(n);
             for (int i = 0; i < n; i++)
             {
                 byte kind = body.readByte();
@@ -212,7 +212,7 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound
terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong
one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
-            Message.Response response = handler.processBatch(batch, state, batchOptions);
+            Message.Response response = handler.processBatch(batch, state, batchOptions,
getCustomPayload());
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 815935a..50f6619 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
@@ -101,7 +102,7 @@ public class ExecuteMessage extends Message.Request
     {
         try
         {
-            QueryHandler handler = state.getClientState().getCQLQueryHandler();
+            QueryHandler handler = ClientState.getCQLQueryHandler();
             ParsedStatement.Prepared prepared = handler.getPrepared(statementId);
             if (prepared == null)
                 throw new PreparedQueryNotFoundException(statementId);
@@ -131,7 +132,7 @@ public class ExecuteMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 prepared query", state.getClientAddress(),
builder.build());
             }
 
-            Message.Response response = handler.processPrepared(statement, state, options);
+            Message.Response response = handler.processPrepared(statement, state, options,
getCustomPayload());
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 1db63c3..f54d1d9 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -22,6 +22,7 @@ import java.util.UUID;
 import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
 
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
@@ -74,7 +75,7 @@ public class PrepareMessage extends Message.Request
                 Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(),
ImmutableMap.of("query", query));
             }
 
-            Message.Response response = state.getClientState().getCQLQueryHandler().prepare(query,
state);
+            Message.Response response = ClientState.getCQLQueryHandler().prepare(query, state,
getCustomPayload());
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 96accb4..4e21678 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
@@ -115,7 +116,7 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
-            Message.Response response = state.getClientState().getCQLQueryHandler().process(query,
state, options);
+            Message.Response response = ClientState.getCQLQueryHandler().process(query, state,
options, getCustomPayload());
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index d8914a9..e49250b 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -73,8 +73,8 @@ public abstract class CQLTester
     private static final AtomicInteger seqNumber = new AtomicInteger();
 
     private static org.apache.cassandra.transport.Server server;
-    private static final int nativePort;
-    private static final InetAddress nativeAddr;
+    protected static final int nativePort;
+    protected static final InetAddress nativeAddr;
     private static final Cluster[] cluster;
     private static final Session[] session;
 
@@ -215,7 +215,7 @@ public abstract class CQLTester
     }
 
     // lazy initialization for all tests that require Java Driver
-    private static void requireNetwork() throws ConfigurationException
+    protected static void requireNetwork() throws ConfigurationException
     {
         if (server != null)
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/789aa72e/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
new file mode 100644
index 0000000..44f1f7b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.BatchQueryOptions;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.messages.BatchMessage;
+import org.apache.cassandra.transport.messages.ExecuteMessage;
+import org.apache.cassandra.transport.messages.PrepareMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.MD5Digest;
+
+public class MessagePayloadTest extends CQLTester
+{
+    public static Map<String, byte[]> requestPayload;
+    public static Map<String, byte[]> responsePayload;
+
+    private static Field cqlQueryHandlerField;
+    private static boolean modifiersAccessible;
+
+    @BeforeClass
+    public static void makeCqlQueryHandlerAccessible()
+    {
+        try
+        {
+            cqlQueryHandlerField = ClientState.class.getDeclaredField("cqlQueryHandler");
+            cqlQueryHandlerField.setAccessible(true);
+
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersAccessible = modifiersField.isAccessible();
+            modifiersField.setAccessible(true);
+            modifiersField.setInt(cqlQueryHandlerField, cqlQueryHandlerField.getModifiers()
& ~Modifier.FINAL);
+        }
+        catch (IllegalAccessException | NoSuchFieldException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @BeforeClass
+    public static void resetCqlQueryHandlerField()
+    {
+        if (cqlQueryHandlerField == null)
+            return;
+        try
+        {
+            cqlQueryHandlerField.setAccessible(false);
+
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersField.setInt(cqlQueryHandlerField, cqlQueryHandlerField.getModifiers()
| Modifier.FINAL);
+            modifiersField.setAccessible(modifiersAccessible);
+        }
+        catch (IllegalAccessException | NoSuchFieldException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    public void testMessagePayload() throws Throwable
+    {
+        QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null);
+        cqlQueryHandlerField.set(null, new TestQueryHandler());
+        try
+        {
+            requireNetwork();
+
+            Assert.assertSame(TestQueryHandler.class, ClientState.getCQLQueryHandler().getClass());
+
+            SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
+            try
+            {
+                client.connect(false);
+
+                Map<String, byte[]> reqMap;
+                Map<String, byte[]> respMap;
+
+                QueryMessage queryMessage = new QueryMessage(
+                                                            "CREATE TABLE " + KEYSPACE +
".atable (pk int PRIMARY KEY, v text)",
+                                                            QueryOptions.DEFAULT
+                );
+                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE
+ ".atable");
+
+                reqMap = Collections.singletonMap("foo", "42".getBytes());
+                responsePayload = respMap = Collections.singletonMap("bar", "42".getBytes());
+                queryMessage.setCustomPayload(reqMap);
+                Message.Response queryResponse = client.execute(queryMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, queryResponse.getCustomPayload());
+
+                reqMap = Collections.singletonMap("foo", "43".getBytes());
+                responsePayload = respMap = Collections.singletonMap("bar", "43".getBytes());
+                prepareMessage.setCustomPayload(reqMap);
+                ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, prepareResponse.getCustomPayload());
+
+                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId,
QueryOptions.DEFAULT);
+                reqMap = Collections.singletonMap("foo", "44".getBytes());
+                responsePayload = respMap = Collections.singletonMap("bar", "44".getBytes());
+                executeMessage.setCustomPayload(reqMap);
+                Message.Response executeResponse = client.execute(executeMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, executeResponse.getCustomPayload());
+
+                BatchMessage batchMessage = new BatchMessage(BatchStatement.Type.UNLOGGED,
+                                                             Collections.<Object>singletonList("INSERT
INTO " + KEYSPACE + ".atable (pk,v) VALUES (1, 'foo')"),
+                                                             Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+                                                             QueryOptions.DEFAULT);
+                reqMap = Collections.singletonMap("foo", "45".getBytes());
+                responsePayload = respMap = Collections.singletonMap("bar", "45".getBytes());
+                batchMessage.setCustomPayload(reqMap);
+                Message.Response batchResponse = client.execute(batchMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, batchResponse.getCustomPayload());
+            }
+            finally
+            {
+                client.close();
+            }
+        }
+        finally
+        {
+            cqlQueryHandlerField.set(null, queryHandler);
+        }
+    }
+
+    private static void payloadEquals(Map<String, byte[]> map1, Map<String, byte[]>
map2)
+    {
+        Assert.assertNotNull(map1);
+        Assert.assertNotNull(map2);
+        Assert.assertEquals(map1.keySet(), map2.keySet());
+        for (Map.Entry<String, byte[]> e : map1.entrySet())
+            Assert.assertArrayEquals(e.getValue(), map2.get(e.getKey()));
+    }
+
+    public static class TestQueryHandler implements QueryHandler
+    {
+        public ParsedStatement.Prepared getPrepared(MD5Digest id)
+        {
+            return QueryProcessor.instance.getPrepared(id);
+        }
+
+        public ParsedStatement.Prepared getPreparedForThrift(Integer id)
+        {
+            return QueryProcessor.instance.getPreparedForThrift(id);
+        }
+
+        public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions
options) throws RequestExecutionException, RequestValidationException
+        {
+            return processPrepared(statement, state, options, null);
+        }
+
+        public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions
options) throws RequestExecutionException, RequestValidationException
+        {
+            return processBatch(statement, state, options, null);
+        }
+
+        public ResultMessage process(String query, QueryState state, QueryOptions options)
throws RequestExecutionException, RequestValidationException
+        {
+            return process(query, state, options, null);
+        }
+
+        public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException
+        {
+            return prepare(query, state, null);
+        }
+
+        public ResultMessage.Prepared prepare(String query, QueryState state, Map<String,
byte[]> customPayload) throws RequestValidationException
+        {
+            if (customPayload != null)
+                requestPayload = customPayload;
+            ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, state,
customPayload);
+            if (customPayload != null)
+            {
+                result.setCustomPayload(responsePayload);
+                responsePayload = null;
+            }
+            return result;
+        }
+
+        public ResultMessage process(String query, QueryState state, QueryOptions options,
Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+        {
+            if (customPayload != null)
+                requestPayload = customPayload;
+            ResultMessage result = QueryProcessor.instance.process(query, state, options,
customPayload);
+            if (customPayload != null)
+            {
+                result.setCustomPayload(responsePayload);
+                responsePayload = null;
+            }
+            return result;
+        }
+
+        public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+        {
+            if (customPayload != null)
+                requestPayload = customPayload;
+            ResultMessage result = QueryProcessor.instance.processBatch(statement, state,
options, customPayload);
+            if (customPayload != null)
+            {
+                result.setCustomPayload(responsePayload);
+                responsePayload = null;
+            }
+            return result;
+        }
+
+        public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions
options, Map<String, byte[]> customPayload) throws RequestExecutionException, RequestValidationException
+        {
+            if (customPayload != null)
+                requestPayload = customPayload;
+            ResultMessage result = QueryProcessor.instance.processPrepared(statement, state,
options, customPayload);
+            if (customPayload != null)
+            {
+                result.setCustomPayload(responsePayload);
+                responsePayload = null;
+            }
+            return result;
+        }
+    }
+}


Mime
View raw message