cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [1/2] git commit: Add tracing support to the binary protocol
Date Thu, 01 Nov 2012 08:53:13 GMT
Updated Branches:
  refs/heads/trunk 191a34a73 -> 45b4fd8e9


Add tracing support to the binary protocol

patch by slebresne; reviewed by yukim for CASSANDRA-4699


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45b4fd8e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45b4fd8e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45b4fd8e

Branch: refs/heads/trunk
Commit: 45b4fd8e9c7230b14058761399283124567aef63
Parents: 191a34a
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Thu Nov 1 09:52:03 2012 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Nov 1 09:52:03 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 doc/native_protocol.spec                           |   24 ++++-
 .../apache/cassandra/cql/AbstractModification.java |   10 +-
 .../org/apache/cassandra/cql/BatchStatement.java   |    4 +-
 .../org/apache/cassandra/cql/DeleteStatement.java  |    8 +-
 .../org/apache/cassandra/cql/QueryProcessor.java   |   12 +-
 .../org/apache/cassandra/cql/UpdateStatement.java  |    8 +-
 .../org/apache/cassandra/cql3/CQLStatement.java    |    9 +-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   21 ++--
 .../cassandra/cql3/statements/BatchStatement.java  |    4 +-
 .../cassandra/cql3/statements/DeleteStatement.java |    3 +-
 .../cql3/statements/ModificationStatement.java     |   12 +-
 .../statements/PermissionAlteringStatement.java    |    7 +-
 .../cql3/statements/SchemaAlteringStatement.java   |    5 +-
 .../cassandra/cql3/statements/SelectStatement.java |    5 +-
 .../cql3/statements/TruncateStatement.java         |    5 +-
 .../cassandra/cql3/statements/UpdateStatement.java |    3 +-
 .../cassandra/cql3/statements/UseStatement.java    |    7 +-
 .../org/apache/cassandra/service/ClientState.java  |   79 +-------------
 .../org/apache/cassandra/service/QueryState.java   |   85 +++++++++++++++
 .../cassandra/service/ThriftSessionManager.java    |   70 ------------
 .../apache/cassandra/thrift/CassandraServer.java   |   42 ++++----
 .../apache/cassandra/thrift/CustomTHsHaServer.java |    1 -
 .../cassandra/thrift/CustomTNonBlockingServer.java |    1 -
 .../cassandra/thrift/CustomTThreadPoolServer.java  |    1 -
 .../thrift/TCustomNonblockingServerSocket.java     |    1 -
 .../apache/cassandra/thrift/ThriftClientState.java |   74 +++++++++++++
 .../cassandra/thrift/ThriftSessionManager.java     |   70 ++++++++++++
 .../org/apache/cassandra/transport/CBUtil.java     |   14 +++
 src/java/org/apache/cassandra/transport/Frame.java |    6 +-
 .../org/apache/cassandra/transport/Message.java    |   77 ++++++++++++-
 .../cassandra/transport/ServerConnection.java      |   19 +++-
 .../transport/messages/CredentialsMessage.java     |    5 +-
 .../transport/messages/ExecuteMessage.java         |   34 ++++++-
 .../transport/messages/OptionsMessage.java         |    3 +-
 .../transport/messages/PrepareMessage.java         |   33 ++++++-
 .../cassandra/transport/messages/QueryMessage.java |   33 ++++++-
 .../transport/messages/RegisterMessage.java        |    3 +-
 .../transport/messages/StartupMessage.java         |   14 ++--
 39 files changed, 551 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0bb732b..c530234 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -47,6 +47,7 @@
  * Force provided columns in clustering key order in 'CLUSTERING ORDER BY' (CASSANDRA-4881)
  * Fix composite index bug (CASSANDRA-4884)
  * Fix short read protection for CQL3 (CASSANDRA-4882)
+ * Add tracing support to the binary protocol (CASSANDRA-4699)
 Merged from 1.1:
  * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
  * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index f534cde..9c27bb3 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -95,11 +95,24 @@ Table of Contents
 
 2.2. flags
 
-  Flags applying to this frame. Currently only one bit (the lower-most one, the
-  one masked by 0x01) has a meaning and indicates whether the frame body is
-  compressed. The actual compression to use should have been set up beforehand
-  through the Startup message (which thus cannot be compressed; Section 4.1.1).
-  The rest of the flags is kept for future use.
+  Flags applying to this frame. The flags have the following meaning (described
+  by the mask that allow to select them):
+    0x01: Compression flag. If set, the frame body is compressed. The actual
+          compression to use should have been set up beforehand through the
+          Startup message (which thus cannot be compressed; Section 4.1.1).
+    0x02: Tracing flag. For a request frame, this indicate the client requires
+          tracing of the request. Note that not all requests support tracing.
+          Currently, only QUERY, PREPARE and EXECUTE queries support tracing.
+          Other requests will simply ignore the tracing flag if set. If a
+          request support tracing and the tracing flag was set, the response to
+          this request will have the tracing flag set and contain tracing
+          information.
+          If a response frame has the tracing flag set, its body contains
+          a tracing ID. The tracing ID is a [uuid] and is the first thing in
+          the frame body. The rest of the body will then be the usual body
+          corresponding to the response opcode.
+
+  The rest of the flags is currently unused and ignored.
 
 2.3. stream
 
@@ -159,6 +172,7 @@ Table of Contents
     [string]       A [short] n, followed by n bytes representing an UTF-8
                    string.
     [long string]  An [int] n, followed by n bytes representing an UTF-8 string.
+    [uuid]         A 16 bytes long uuid.
     [string list]  A [short] n, followed by n [string].
     [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
                    no byte should follow and the value represented is `null`.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql/AbstractModification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AbstractModification.java b/src/java/org/apache/cassandra/cql/AbstractModification.java
index 781094f..2ec9f54 100644
--- a/src/java/org/apache/cassandra/cql/AbstractModification.java
+++ b/src/java/org/apache/cassandra/cql/AbstractModification.java
@@ -21,10 +21,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.thrift.ThriftClientState;
 
 public abstract class AbstractModification
 {
@@ -77,9 +77,9 @@ public abstract class AbstractModification
         return cLevel != null;
     }
 
-    public long getTimestamp(ClientState clientState)
+    public long getTimestamp(ThriftClientState clientState)
     {
-        return timestamp == null ? clientState.getTimestamp() : timestamp;
+        return timestamp == null ? clientState.getQueryState().getTimestamp() : timestamp;
     }
 
     public boolean isSetTimestamp()
@@ -102,7 +102,7 @@ public abstract class AbstractModification
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<ByteBuffer> variables)
+    public abstract List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException;
 
     /**
@@ -116,6 +116,6 @@ public abstract class AbstractModification
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<ByteBuffer> variables)
+    public abstract List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/BatchStatement.java b/src/java/org/apache/cassandra/cql/BatchStatement.java
index 95849e7..5cb7a0c 100644
--- a/src/java/org/apache/cassandra/cql/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql/BatchStatement.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ThriftClientState;
 
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
@@ -75,7 +75,7 @@ public class BatchStatement
         return timeToLive;
     }
 
-    public List<IMutation> getMutations(String keyspace, ClientState clientState, List<ByteBuffer> variables)
+    public List<IMutation> getMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         List<IMutation> batch = new LinkedList<IMutation>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/DeleteStatement.java b/src/java/org/apache/cassandra/cql/DeleteStatement.java
index 45d6e74..4190c80 100644
--- a/src/java/org/apache/cassandra/cql/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ThriftClientState;
 
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 import static org.apache.cassandra.cql.QueryProcessor.validateColumnName;
@@ -62,13 +62,13 @@ public class DeleteStatement extends AbstractModification
         return keys;
     }
 
-    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<ByteBuffer> variables)
+    public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         return prepareRowMutations(keyspace, clientState, null, variables);
     }
 
-    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<ByteBuffer> variables)
+    public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         CFMetaData metadata = validateColumnFamily(keyspace, columnFamily);
@@ -86,7 +86,7 @@ public class DeleteStatement extends AbstractModification
         return rowMutations;
     }
 
-    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
+    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
     throws InvalidRequestException
     {
         RowMutation rm = new RowMutation(keyspace, key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 7e898a9..cdfe033 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.MigrationManager;
@@ -54,6 +53,7 @@ import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.RequestType;
 import org.apache.cassandra.thrift.SchemaDisagreementException;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.thrift.ThriftClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -224,7 +224,7 @@ public class QueryProcessor
         return rows.subList(0, select.getNumRecords() < rows.size() ? select.getNumRecords() : rows.size());
     }
 
-    private static void batchUpdate(ClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<ByteBuffer> variables )
+    private static void batchUpdate(ThriftClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<ByteBuffer> variables )
     throws RequestValidationException, RequestExecutionException
     {
         String globalKeyspace = clientState.getKeyspace();
@@ -397,7 +397,7 @@ public class QueryProcessor
                                Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
     }
 
-    public static CqlResult processStatement(CQLStatement statement,ClientState clientState, List<ByteBuffer> variables )
+    public static CqlResult processStatement(CQLStatement statement,ThriftClientState clientState, List<ByteBuffer> variables )
     throws RequestExecutionException, RequestValidationException
     {
         String keyspace = null;
@@ -809,14 +809,14 @@ public class QueryProcessor
         return null;    // We should never get here.
     }
 
-    public static CqlResult process(String queryString, ClientState clientState)
+    public static CqlResult process(String queryString, ThriftClientState clientState)
     throws RequestValidationException, RequestExecutionException
     {
         logger.trace("CQL QUERY: {}", queryString);
         return processStatement(getStatement(queryString), clientState, new ArrayList<ByteBuffer>(0));
     }
 
-    public static CqlPreparedResult prepare(String queryString, ClientState clientState)
+    public static CqlPreparedResult prepare(String queryString, ThriftClientState clientState)
     throws InvalidRequestException, SyntaxException
     {
         logger.trace("CQL QUERY: {}", queryString);
@@ -833,7 +833,7 @@ public class QueryProcessor
         return new CqlPreparedResult(statementId, statement.boundTerms);
     }
 
-    public static CqlResult processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
+    public static CqlResult processPrepared(CQLStatement statement, ThriftClientState clientState, List<ByteBuffer> variables)
     throws RequestValidationException, RequestExecutionException
     {
         // Check to see if there are any bound variables to verify

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/UpdateStatement.java b/src/java/org/apache/cassandra/cql/UpdateStatement.java
index e6183c8..69daea6 100644
--- a/src/java/org/apache/cassandra/cql/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
-import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ThriftClientState;
 
 import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
 import static org.apache.cassandra.cql.QueryProcessor.validateKey;
@@ -122,14 +122,14 @@ public class UpdateStatement extends AbstractModification
     }
 
     /** {@inheritDoc} */
-    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<ByteBuffer> variables)
+    public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         return prepareRowMutations(keyspace, clientState, null, variables);
     }
 
     /** {@inheritDoc} */
-    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<ByteBuffer> variables)
+    public List<IMutation> prepareRowMutations(String keyspace, ThriftClientState clientState, Long timestamp, List<ByteBuffer> variables)
     throws InvalidRequestException, UnauthorizedException
     {
         List<String> cfamsSeen = new ArrayList<String>();
@@ -182,7 +182,7 @@ public class UpdateStatement extends AbstractModification
      *
      * @throws InvalidRequestException on the wrong request
      */
-    private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp, ClientState clientState, List<ByteBuffer> variables)
+    private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables)
     throws InvalidRequestException
     {
         validateKey(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 90883e6..63f9cc6 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.exceptions.*;
 
 public interface CQLStatement
@@ -50,16 +51,16 @@ public interface CQLStatement
     /**
      * Execute the statement and return the resulting result or null if there is no result.
      *
-     * @param state the current client state
+     * @param state the current query state
      * @param variables the values for bounded variables. The implementation
      * can assume that each bound term have a corresponding value.
      */
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variante of execute used for internal query against the system tables, and thus only query the local node.
      *
-     * @param state the current client state
+     * @param state the current query state
      */
-    public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage executeInternal(QueryState state) throws RequestValidationException, RequestExecutionException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index b306220..c1737e8 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
 import org.apache.cassandra.utils.SemanticVersion;
@@ -121,20 +122,21 @@ public class QueryProcessor
         }
     }
 
-    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
+    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
+        ClientState clientState = queryState.getClientState();
         statement.checkAccess(clientState);
         statement.validate(clientState);
-        ResultMessage result = statement.execute(cl, clientState, variables);
+        ResultMessage result = statement.execute(cl, queryState, variables);
         return result == null ? new ResultMessage.Void() : result;
     }
 
-    public static ResultMessage process(String queryString, ConsistencyLevel cl, ClientState clientState)
+    public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("CQL QUERY: {}", queryString);
-        return processStatement(getStatement(queryString, clientState).statement, cl, clientState, Collections.<ByteBuffer>emptyList());
+        return processStatement(getStatement(queryString, queryState.getClientState()).statement, cl, queryState, Collections.<ByteBuffer>emptyList());
     }
 
     public static UntypedResultSet processInternal(String query)
@@ -142,10 +144,11 @@ public class QueryProcessor
         try
         {
             ClientState state = new ClientState(true);
+            QueryState qState = new QueryState(state);
             state.setKeyspace(Table.SYSTEM_KS);
             CQLStatement statement = getStatement(query, state).statement;
             statement.validate(state);
-            ResultMessage result = statement.executeInternal(state);
+            ResultMessage result = statement.executeInternal(qState);
             if (result instanceof ResultMessage.Rows)
                 return new UntypedResultSet(((ResultMessage.Rows)result).result);
             else
@@ -181,13 +184,13 @@ public class QueryProcessor
         logger.trace("CQL QUERY: {}", queryString);
 
         ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
-        ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState, prepared, forThrift);
+        ResultMessage.Prepared msg = storePreparedStatement(queryString, prepared, forThrift);
 
         assert prepared.statement.getBoundsTerms() == prepared.boundNames.size();
         return msg;
     }
 
-    private static ResultMessage.Prepared storePreparedStatement(String queryString, ClientState clientState, ParsedStatement.Prepared prepared, boolean forThrift)
+    private static ResultMessage.Prepared storePreparedStatement(String queryString, ParsedStatement.Prepared prepared, boolean forThrift)
     {
         if (forThrift)
         {
@@ -209,7 +212,7 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, ClientState clientState, List<ByteBuffer> variables)
+    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
     throws RequestExecutionException, RequestValidationException
     {
         // Check to see if there are any bound variables to verify
@@ -227,7 +230,7 @@ public class QueryProcessor
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
-        return processStatement(statement, cl, clientState, variables);
+        return processStatement(statement, cl, queryState, variables);
     }
 
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6ab0271..ae94f27 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -97,7 +97,7 @@ public class BatchStatement extends ModificationStatement
             statement.validateConsistency(cl);
     }
 
-    public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
@@ -107,7 +107,7 @@ public class BatchStatement extends ModificationStatement
                 statement.setTimestamp(getTimestamp(now));
 
             // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : statement.getMutations(clientState, variables, local, cl, now))
+            for (IMutation m : statement.getMutations(variables, local, cl, now))
             {
                 if (m instanceof CounterMutation && type != Type.COUNTER)
                     throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 76c4374..d08ded9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.Pair;
 
@@ -69,7 +68,7 @@ public class DeleteStatement extends ModificationStatement
             cl.validateForWrite(cfDef.cfm.ksName);
     }
 
-    public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    public Collection<RowMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         // keys

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index afdff22..ea50ea7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 
 /**
@@ -82,14 +83,14 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
 
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         validateConsistency(cl);
 
-        Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl, state.getTimestamp());
+        Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
 
         // The type should have been set by now or we have a bug
         assert type != null;
@@ -113,9 +114,9 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         return null;
     }
 
-    public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null, state.getTimestamp()))
+        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }
@@ -199,7 +200,6 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
     /**
      * Convert statement into a list of mutations to apply on the server
      *
-     * @param clientState current client status
      * @param variables value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
      * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
@@ -208,7 +208,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException;
 
     public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
index 48c2d03..dd6e190 100644
--- a/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/PermissionAlteringStatement.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 public abstract class PermissionAlteringStatement extends ParsedStatement implements CQLStatement
@@ -45,14 +46,14 @@ public abstract class PermissionAlteringStatement extends ParsedStatement implem
     public void validate(ClientState state)
     {}
 
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException
     {
-        return execute(state, variables);
+        return execute(state.getClientState(), variables);
     }
 
     public abstract ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws UnauthorizedException, InvalidRequestException;
 
-    public ResultMessage executeInternal(ClientState state)
+    public ResultMessage executeInternal(QueryState state)
     {
         // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 2b3f5dd..4d40e99 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
 /**
@@ -67,14 +68,14 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public void validate(ClientState state) throws RequestValidationException
     {}
 
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException
     {
         announceMigration();
         String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();
         return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
     }
 
-    public ResultMessage executeInternal(ClientState state)
+    public ResultMessage executeInternal(QueryState state)
     {
         // executeInternal is for local query only, thus altering schema is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index cef8a5f..e70f90a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.RangeSliceVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -117,7 +118,7 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
@@ -155,7 +156,7 @@ public class SelectStatement implements CQLStatement
         return rows;
     }
 
-    public ResultMessage.Rows executeInternal(ClientState state) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index af4b4e0..4cd0e9b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.ThriftValidation;
 
@@ -53,7 +54,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
     {
         try
         {
@@ -74,7 +75,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         return null;
     }
 
-    public ResultMessage executeInternal(ClientState state)
+    public ResultMessage executeInternal(QueryState state)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 8c19b10..6fbf444 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.cql3.operations.Operation;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -105,7 +104,7 @@ public class UpdateStatement extends ModificationStatement
     }
 
     /** {@inheritDoc} */
-    public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    public Collection<IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index c381978..8329057 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 
 public class UseStatement extends ParsedStatement implements CQLStatement
 {
@@ -49,13 +50,13 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, ClientState state, List<ByteBuffer> variables) throws InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException
     {
-        state.setKeyspace(keyspace);
+        state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);
     }
 
-    public ResultMessage executeInternal(ClientState state)
+    public ResultMessage executeInternal(QueryState state)
     {
         // Internal queries are exclusively on the system keyspace and 'use' is thus useless
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 554feab..c326d9c 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -28,44 +28,30 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
 import org.apache.cassandra.thrift.AuthenticationException;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SemanticVersion;
 
-import static org.apache.cassandra.tracing.Tracing.instance;
-
 /**
- * A container for per-client, thread-local state that Avro/Thrift threads must hold.
+ * State related to a client connection.
+ *
  * TODO: Kill thrift exceptions
  */
 public class ClientState
 {
-    private static final int MAX_CACHE_PREPARED = 10000;    // Enough to keep buggy clients from OOM'ing us
     private static final Logger logger = LoggerFactory.getLogger(ClientState.class);
     public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION;
 
     // Current user for the session
     private AuthenticatedUser user;
     private String keyspace;
-    private UUID preparedTracingSession;
 
     // Reusable array for authorization
     private final List<Object> resource = new ArrayList<Object>();
     private SemanticVersion cqlVersion = DEFAULT_CQL_VERSION;
 
-    // An LRU map of prepared statements
-    private final Map<Integer, CQLStatement> prepared = new LinkedHashMap<Integer, CQLStatement>(16, 0.75f, true) {
-        protected boolean removeEldestEntry(Map.Entry<Integer, CQLStatement> eldest) {
-            return size() > MAX_CACHE_PREPARED;
-        }
-    };
-
-    private long clock;
-
     // internalCall is used to mark ClientState as used by some internal component
     // that should have an ability to modify system keyspace
     private final boolean internalCall;
@@ -81,15 +67,7 @@ public class ClientState
     public ClientState(boolean internalCall)
     {
         this.internalCall = internalCall;
-
-        user = DatabaseDescriptor.getAuthenticator().defaultUser();
-        resourceClear();
-        prepared.clear();
-    }
-
-    public Map<Integer, CQLStatement> getPrepared()
-    {
-        return prepared;
+        this.user = DatabaseDescriptor.getAuthenticator().defaultUser();
     }
 
     public String getRawKeyspace()
@@ -111,45 +89,6 @@ public class ClientState
         keyspace = ks;
     }
 
-    public boolean traceNextQuery()
-    {
-        if (preparedTracingSession != null)
-        {
-            return true;
-        }
-
-        double tracingProbability = StorageService.instance.getTracingProbability();
-        return tracingProbability != 0 && FBUtilities.threadLocalRandom().nextDouble() < tracingProbability;
-    }
-
-    public void prepareTracingSession(UUID sessionId)
-    {
-        this.preparedTracingSession = sessionId;
-    }
-
-    public void createSession()
-    {
-        if (this.preparedTracingSession == null)
-        {
-            instance().newSession();
-        }
-        else
-        {
-            UUID session = this.preparedTracingSession;
-            this.preparedTracingSession = null;
-            instance().newSession(session);
-        }
-    }
-
-    public String getSchedulingValue()
-    {
-        switch(DatabaseDescriptor.getRequestSchedulerId())
-        {
-            case keyspace: return keyspace;
-        }
-        return "default";
-    }
-
     /**
      * Attempts to login this client with the given credentials map.
      */
@@ -290,18 +229,6 @@ public class ClientState
                                                       Resources.toString(resource)));
     }
 
-    /**
-     * This clock guarantees that updates from a given client will be ordered in the sequence seen,
-     * even if multiple updates happen in the same millisecond.  This can be useful when a client
-     * wants to perform multiple updates to a single column.
-     */
-    public long getTimestamp()
-    {
-        long current = System.currentTimeMillis() * 1000;
-        clock = clock >= current ? clock + 1 : current;
-        return clock;
-    }
-
     public void setCQLVersion(String str) throws InvalidRequestException
     {
         SemanticVersion version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
new file mode 100644
index 0000000..49feb3b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.UUID;
+
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Represents the state related to a given query.
+ */
+public class QueryState
+{
+    private final ClientState clientState;
+    private volatile long clock;
+    private volatile UUID preparedTracingSession;
+
+    public QueryState(ClientState clientState)
+    {
+        this.clientState = clientState;
+    }
+
+    public ClientState getClientState()
+    {
+        return clientState;
+    }
+
+    /**
+     * This clock guarantees that updates for the same QueryState will be ordered
+     * in the sequence seen, even if multiple updates happen in the same millisecond.
+     */
+    public long getTimestamp()
+    {
+        long current = System.currentTimeMillis() * 1000;
+        clock = clock >= current ? clock + 1 : current;
+        return clock;
+    }
+
+    public boolean traceNextQuery()
+    {
+        if (preparedTracingSession != null)
+        {
+            return true;
+        }
+
+        double tracingProbability = StorageService.instance.getTracingProbability();
+        return tracingProbability != 0 && FBUtilities.threadLocalRandom().nextDouble() < tracingProbability;
+    }
+
+    public void prepareTracingSession(UUID sessionId)
+    {
+        this.preparedTracingSession = sessionId;
+    }
+
+    public void createTracingSession()
+    {
+        if (this.preparedTracingSession == null)
+        {
+            Tracing.instance().newSession();
+        }
+        else
+        {
+            UUID session = this.preparedTracingSession;
+            this.preparedTracingSession = null;
+            Tracing.instance().newSession(session);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/service/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ThriftSessionManager.java b/src/java/org/apache/cassandra/service/ThriftSessionManager.java
deleted file mode 100644
index d1d3f6e..0000000
--- a/src/java/org/apache/cassandra/service/ThriftSessionManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Encapsulates the current client state (session).
- *
- * We rely on the Thrift server to tell us what socket it is
- * executing a request for via setCurrentSocket, after which currentSession can do its job anywhere.
- */
-public class ThriftSessionManager
-{
-    public final static ThriftSessionManager instance = new ThriftSessionManager();
-
-    private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>();
-    private final Map<SocketAddress, ClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ClientState>();
-
-    /**
-     * @param socket the address on which the current thread will work on requests for until further notice
-     */
-    public void setCurrentSocket(SocketAddress socket)
-    {
-        remoteSocket.set(socket);
-    }
-
-    /**
-     * @return the current session for the most recently given socket on this thread
-     */
-    public ClientState currentSession()
-    {
-        SocketAddress socket = remoteSocket.get();
-        assert socket != null;
-
-        ClientState cState = activeSocketSessions.get(socket);
-        if (cState == null)
-        {
-            cState = new ClientState();
-            activeSocketSessions.put(socket, cState);
-        }
-        return cState;
-    }
-
-    /**
-     * The connection associated with @param socket is permanently finished.
-     */
-    public void connectionComplete(SocketAddress socket)
-    {
-        assert socket != null;
-        activeSocketSessions.remove(socket);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5703855..ea26d46 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -82,7 +82,7 @@ public class CassandraServer implements Cassandra.Iface
         requestScheduler = DatabaseDescriptor.getRequestScheduler();
     }
 
-    public ClientState state()
+    public ThriftClientState state()
     {
         return ThriftSessionManager.instance.currentSession();
     }
@@ -393,7 +393,7 @@ public class CassandraServer implements Cassandra.Iface
     private ColumnOrSuperColumn internal_get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws RequestValidationException, NotFoundException, UnavailableException, TimedOutException
     {
-        ClientState cState = state();
+        ThriftClientState cState = state();
         cState.hasColumnFamilyAccess(column_path.column_family, Permission.SELECT);
         String keyspace = cState.getKeyspace();
 
@@ -468,7 +468,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
             Table table = Table.open(cState.getKeyspace());
             ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
@@ -573,7 +573,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
             String keyspace = cState.getKeyspace();
 
@@ -600,7 +600,7 @@ public class CassandraServer implements Cassandra.Iface
     private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
-        ClientState cState = state();
+        ThriftClientState cState = state();
         cState.hasColumnFamilyAccess(column_parent.column_family, Permission.UPDATE);
 
         CFMetaData metadata = ThriftValidation.validateColumnFamily(cState.getKeyspace(), column_parent.column_family, false);
@@ -663,7 +663,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         List<String> cfamsSeen = new ArrayList<String>();
         List<IMutation> rowMutations = new ArrayList<IMutation>();
-        ClientState cState = state();
+        ThriftClientState cState = state();
         String keyspace = cState.getKeyspace();
 
         for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry: mutation_map.entrySet())
@@ -801,7 +801,7 @@ public class CassandraServer implements Cassandra.Iface
     private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
-        ClientState cState = state();
+        ThriftClientState cState = state();
         cState.hasColumnFamilyAccess(column_path.column_family, Permission.DELETE);
 
         CFMetaData metadata = ThriftValidation.validateColumnFamily(cState.getKeyspace(), column_path.column_family, isCommutativeOp);
@@ -921,7 +921,7 @@ public class CassandraServer implements Cassandra.Iface
             String keyspace = null;
             CFMetaData metadata = null;
 
-            ClientState cState = state();
+            ThriftClientState cState = state();
             keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
 
@@ -1009,7 +1009,7 @@ public class CassandraServer implements Cassandra.Iface
         try
         {
 
-            ClientState cState = state();
+            ThriftClientState cState = state();
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(column_family, Permission.SELECT);
 
@@ -1111,7 +1111,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             cState.hasColumnFamilyAccess(column_parent.column_family, Permission.SELECT);
             String keyspace = cState.getKeyspace();
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
@@ -1321,7 +1321,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         logger.debug("drop_column_family");
 
-        ClientState cState = state();
+        ThriftClientState cState = state();
 
         try
         {
@@ -1703,8 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
                 logger.debug("execute_cql3_query");
             }
 
-            ClientState cState = state();
-            return org.apache.cassandra.cql3.QueryProcessor.process(queryString, ThriftConversion.fromThrift(cLevel), cState).toThriftResult();
+            ThriftClientState cState = state();
+            return org.apache.cassandra.cql3.QueryProcessor.process(queryString, ThriftConversion.fromThrift(cLevel), cState.getQueryState()).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -1729,7 +1729,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             String queryString = uncompress(query,compression);
             return QueryProcessor.prepare(queryString, cState);
         }
@@ -1747,7 +1747,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             String queryString = uncompress(query,compression);
             return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
         }
@@ -1772,7 +1772,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             CQLStatement statement = cState.getPrepared().get(itemId);
 
             if (statement == null)
@@ -1811,7 +1811,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            ClientState cState = state();
+            ThriftClientState cState = state();
             org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
 
             if (statement == null)
@@ -1821,7 +1821,7 @@ public class CassandraServer implements Cassandra.Iface
                                                                 itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
 
-            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState, bindVariables).toThriftResult();
+            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -1846,15 +1846,15 @@ public class CassandraServer implements Cassandra.Iface
     public ByteBuffer trace_next_query() throws TException
     {
         UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress());
-        state().prepareTracingSession(sessionId);
+        state().getQueryState().prepareTracingSession(sessionId);
         return TimeUUIDType.instance.decompose(sessionId);
     }
 
     private boolean startSessionIfRequested()
     {
-        if (state().traceNextQuery())
+        if (state().getQueryState().traceNextQuery())
         {
-            state().createSession();
+            state().getQueryState().createTracingSession();
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index 86445b8..0143136 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerTransport;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index fa5af8d..af82896 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -21,7 +21,6 @@ import java.net.InetSocketAddress;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerTransport;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index 04efc97..8b88e43 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
index 13a1dd0..779084a 100644
--- a/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
+++ b/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
@@ -21,7 +21,6 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
 
-import org.apache.cassandra.service.ThriftSessionManager;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TTransportException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/ThriftClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftClientState.java b/src/java/org/apache/cassandra/thrift/ThriftClientState.java
new file mode 100644
index 0000000..2b1a3de
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ThriftClientState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql.CQLStatement;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * ClientState used by thrift that also provide a QueryState.
+ *
+ * Thrift is intrinsically synchronous so there could be only one query per
+ * client at a given time. So ClientState and QueryState can be merge into the
+ * same object.
+ */
+public class ThriftClientState extends ClientState
+{
+    private static final int MAX_CACHE_PREPARED = 10000;    // Enough to keep buggy clients from OOM'ing us
+
+    private final QueryState queryState;
+
+    // An LRU map of prepared statements
+    private final Map<Integer, CQLStatement> prepared = new LinkedHashMap<Integer, CQLStatement>(16, 0.75f, true) {
+        protected boolean removeEldestEntry(Map.Entry<Integer, CQLStatement> eldest) {
+            return size() > MAX_CACHE_PREPARED;
+        }
+    };
+
+    public ThriftClientState()
+    {
+        super();
+        this.queryState = new QueryState(this);
+    }
+
+    public QueryState getQueryState()
+    {
+        return queryState;
+    }
+
+    public Map<Integer, CQLStatement> getPrepared()
+    {
+        return prepared;
+    }
+
+    public String getSchedulingValue()
+    {
+        switch(DatabaseDescriptor.getRequestSchedulerId())
+        {
+            case keyspace: return getRawKeyspace();
+        }
+        return "default";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
new file mode 100644
index 0000000..93d58ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Encapsulates the current client state (session).
+ *
+ * We rely on the Thrift server to tell us what socket it is
+ * executing a request for via setCurrentSocket, after which currentSession can do its job anywhere.
+ */
+public class ThriftSessionManager
+{
+    public final static ThriftSessionManager instance = new ThriftSessionManager();
+
+    private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>();
+    private final Map<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ThriftClientState>();
+
+    /**
+     * @param socket the address on which the current thread will work on requests for until further notice
+     */
+    public void setCurrentSocket(SocketAddress socket)
+    {
+        remoteSocket.set(socket);
+    }
+
+    /**
+     * @return the current session for the most recently given socket on this thread
+     */
+    public ThriftClientState currentSession()
+    {
+        SocketAddress socket = remoteSocket.get();
+        assert socket != null;
+
+        ThriftClientState cState = activeSocketSessions.get(socket);
+        if (cState == null)
+        {
+            cState = new ThriftClientState();
+            activeSocketSessions.put(socket, cState);
+        }
+        return cState;
+    }
+
+    /**
+     * The connection associated with @param socket is permanently finished.
+     */
+    public void connectionComplete(SocketAddress socket)
+    {
+        assert socket != null;
+        activeSocketSessions.remove(socket);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index ba04335..0d3be07 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -26,12 +26,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * ChannelBuffer utility methods.
@@ -143,6 +145,18 @@ public abstract class CBUtil
         return ConsistencyLevel.fromCode(cb.readUnsignedShort());
     }
 
+    public static ChannelBuffer uuidToCB(UUID uuid)
+    {
+        return ChannelBuffers.wrappedBuffer(UUIDGen.decompose(uuid));
+    }
+
+    public static UUID readUuid(ChannelBuffer cb)
+    {
+        byte[] bytes = new byte[16];
+        cb.readBytes(bytes);
+        return UUIDGen.getUUID(ByteBuffer.wrap(bytes));
+    }
+
     public static ChannelBuffer longStringToCB(String str)
     {
         ChannelBuffer bytes = bytes(str);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 8e3240b..6ff072c 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -75,9 +75,8 @@ public class Frame
         return new Frame(header, fullFrame, connection);
     }
 
-    public static Frame create(Message.Type type, int streamId, ChannelBuffer body, Connection connection)
+    public static Frame create(Message.Type type, int streamId, EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection)
     {
-        EnumSet<Header.Flag> flags = EnumSet.noneOf(Header.Flag.class);
         Header header = new Header(Header.CURRENT_VERSION, flags, streamId, type);
         return new Frame(header, body, connection);
     }
@@ -108,7 +107,8 @@ public class Frame
         public static enum Flag
         {
             // The order of that enum matters!!
-            COMPRESSED;
+            COMPRESSED,
+            TRACING;
 
             public static EnumSet<Flag> deserialize(int flags)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index ff002f8..a60a27a 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -17,7 +17,11 @@
  */
 package org.apache.cassandra.transport;
 
+import java.util.EnumSet;
+import java.util.UUID;
+
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
@@ -25,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.service.QueryState;
 
 /**
  * A message from the CQL binary protocol.
@@ -141,6 +146,8 @@ public abstract class Message
 
     public static abstract class Request extends Message
     {
+        protected boolean tracingRequested;
+
         protected Request(Type type)
         {
             super(type);
@@ -149,11 +156,23 @@ public abstract class Message
                 throw new IllegalArgumentException();
         }
 
-        public abstract Response execute();
+        public abstract Response execute(QueryState queryState);
+
+        public void setTracingRequested()
+        {
+            this.tracingRequested = true;
+        }
+
+        public boolean isTracingRequested()
+        {
+            return tracingRequested;
+        }
     }
 
     public static abstract class Response extends Message
     {
+        protected UUID tracingId;
+
         protected Response(Type type)
         {
             super(type);
@@ -161,6 +180,17 @@ public abstract class Message
             if (type.direction != Direction.RESPONSE)
                 throw new IllegalArgumentException();
         }
+
+        public Message setTracingId(UUID tracingId)
+        {
+            this.tracingId = tracingId;
+            return this;
+        }
+
+        public UUID getTracingId()
+        {
+            return tracingId;
+        }
     }
 
     public static class ProtocolDecoder extends OneToOneDecoder
@@ -170,10 +200,29 @@ public abstract class Message
             assert msg instanceof Frame : "Expecting frame, got " + msg;
 
             Frame frame = (Frame)msg;
+            boolean isRequest = frame.header.type.direction == Direction.REQUEST;
+            boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
+
+            UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUuid(frame.body);
+
             Message message = frame.header.type.codec.decode(frame.body);
             message.setStreamId(frame.header.streamId);
-            if (message instanceof Request)
-                ((Request)message).attach(frame.connection);
+
+            if (isRequest)
+            {
+                assert message instanceof Request;
+                Request req = (Request)message;
+                req.attach(frame.connection);
+                if (isTracing)
+                    req.setTracingRequested();
+            }
+            else
+            {
+                assert message instanceof Response;
+                if (isTracing)
+                    ((Response)message).setTracingId(tracingId);
+            }
+
             return message;
         }
     }
@@ -185,7 +234,25 @@ public abstract class Message
             assert msg instanceof Message : "Expecting message, got " + msg;
 
             Message message = (Message)msg;
-            return Frame.create(message.type, message.getStreamId(), message.encode(), message.connection());
+
+            ChannelBuffer body = message.encode();
+            EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class);
+            if (message instanceof Response)
+            {
+                UUID tracingId = ((Response)message).getTracingId();
+                if (tracingId != null)
+                {
+                    body = ChannelBuffers.wrappedBuffer(CBUtil.uuidToCB(tracingId), body);
+                    flags.add(Frame.Header.Flag.TRACING);
+                }
+            }
+            else
+            {
+                assert message instanceof Request;
+                if (((Request)message).isTracingRequested())
+                    flags.add(Frame.Header.Flag.TRACING);
+            }
+            return Frame.create(message.type, message.getStreamId(), flags, body, message.connection());
         }
     }
 
@@ -209,7 +276,7 @@ public abstract class Message
 
                 logger.debug("Received: " + request);
 
-                Response response = request.execute();
+                Response response = request.execute(connection.getQueryState(request.getStreamId()));
                 response.setStreamId(request.getStreamId());
                 response.attach(connection);
                 connection.applyStateTransition(request.type, response.type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 561828d..591a9c2 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -17,7 +17,12 @@
  */
 package org.apache.cassandra.transport;
 
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ServerConnection extends Connection
 {
@@ -34,6 +39,8 @@ public class ServerConnection extends Connection
     private final ClientState clientState;
     private volatile State state;
 
+    private final ConcurrentMap<Integer, QueryState> queryStates = new NonBlockingHashMap<Integer, QueryState>();
+
     public ServerConnection(Connection.Tracker tracker)
     {
         super(tracker);
@@ -41,9 +48,17 @@ public class ServerConnection extends Connection
         this.state = State.UNINITIALIZED;
     }
 
-    public ClientState clientState()
+    public QueryState getQueryState(int streamId)
     {
-        return clientState;
+        QueryState qState = queryStates.get(streamId);
+        if (qState == null)
+        {
+            // In theory we shouldn't get any race here, but it never hurts to be careful
+            QueryState newState = new QueryState(clientState);
+            if ((qState = queryStates.putIfAbsent(streamId, newState)) == null)
+                qState = newState;
+        }
+        return qState;
     }
 
     public void validateNewMessage(Message.Type type)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index c103c93..9dc5366 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.ServerConnection;
@@ -74,11 +75,11 @@ public class CredentialsMessage extends Message.Request
         return codec.encode(this);
     }
 
-    public Message.Response execute()
+    public Message.Response execute(QueryState state)
     {
         try
         {
-            ((ServerConnection)connection).clientState().login(credentials);
+            state.getClientState().login(credentials);
             return new ReadyMessage();
         }
         catch (AuthenticationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 842fb22..ccddbc7 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.transport.messages;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 
@@ -27,8 +29,12 @@ import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MD5Digest;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class ExecuteMessage extends Message.Request
 {
@@ -90,22 +96,44 @@ public class ExecuteMessage extends Message.Request
         return codec.encode(this);
     }
 
-    public Message.Response execute()
+    public Message.Response execute(QueryState state)
     {
         try
         {
-            ServerConnection c = (ServerConnection)connection;
             CQLStatement statement = QueryProcessor.getPrepared(statementId);
 
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
-            return QueryProcessor.processPrepared(statement, consistency, c.clientState(), values);
+            UUID tracingId = null;
+            if (isTracingRequested())
+            {
+                tracingId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress());
+                state.prepareTracingSession(tracingId);
+            }
+
+            if (state.traceNextQuery())
+            {
+                state.createTracingSession();
+                // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
+                Tracing.instance().begin("Execute CQL3 prepared query", Collections.<String, String>emptyMap());
+            }
+
+            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values);
+
+            if (tracingId != null)
+                response.setTracingId(tracingId);
+
+            return response;
         }
         catch (Exception e)
         {
             return ErrorMessage.fromException(e);
         }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45b4fd8e/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index cecead2..30bd046 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -26,6 +26,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.FrameCompressor;
 import org.apache.cassandra.transport.Message;
 
@@ -57,7 +58,7 @@ public class OptionsMessage extends Message.Request
         return codec.encode(this);
     }
 
-    public Message.Response execute()
+    public Message.Response execute(QueryState state)
     {
         List<String> cqlVersions = new ArrayList<String>();
         cqlVersions.add(QueryProcessor.CQL_VERSION.toString());


Mime
View raw message