cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [1/3] cassandra git commit: Count entire coordinated request against timeout
Date Tue, 16 Aug 2016 19:35:53 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e83f9e69e -> aa83c942a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index a189000..910f334 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -85,7 +85,7 @@ public class CassandraServer implements Cassandra.Iface
         return ThriftSessionManager.instance.currentSession();
     }
 
-    protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
+    protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
         try
@@ -93,7 +93,7 @@ public class CassandraServer implements Cassandra.Iface
             schedule(DatabaseDescriptor.getReadRpcTimeout());
             try
             {
-                return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState);
+                return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState, queryStartNanoTime);
             }
             finally
             {
@@ -257,10 +257,10 @@ public class CassandraServer implements Cassandra.Iface
              : result;
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
-        try (PartitionIterator results = read(commands, consistency_level, cState))
+        try (PartitionIterator results = read(commands, consistency_level, cState, queryStartNanoTime))
         {
             Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<>();
             while (results.hasNext())
@@ -278,6 +278,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -296,7 +297,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState);
+            List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime);
             return result == null ? Collections.<ColumnOrSuperColumn>emptyList() : result;
         }
         catch (RequestValidationException e)
@@ -315,15 +316,17 @@ public class CassandraServer implements Cassandra.Iface
                                                        int nowInSec,
                                                        SlicePredicate predicate,
                                                        ConsistencyLevel consistency_level,
-                                                       ClientState cState)
+                                                       ClientState cState,
+                                                       long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
-        return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState).get(key);
+        return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).get(key);
     }
 
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             List<String> keysList = Lists.newArrayList();
@@ -345,7 +348,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState);
+            return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -541,7 +544,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                              int nowInSec,
                                                                              SlicePredicate predicate,
                                                                              ConsistencyLevel consistency_level,
-                                                                             ClientState cState)
+                                                                             ClientState cState,
+                                                                             long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
@@ -563,12 +567,13 @@ public class CassandraServer implements Cassandra.Iface
             commands.add(SinglePartitionReadCommand.create(true, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, dk, filter));
         }
 
-        return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState);
+        return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState, queryStartNanoTime);
     }
 
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -643,7 +648,7 @@ public class CassandraServer implements Cassandra.Iface
             DecoratedKey dk = metadata.decorateKey(key);
             SinglePartitionReadCommand command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter);
 
-            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState), command))
+            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState, queryStartNanoTime), command))
             {
                 if (!result.hasNext())
                     throw new NotFoundException();
@@ -672,6 +677,7 @@ public class CassandraServer implements Cassandra.Iface
     public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -695,7 +701,7 @@ public class CassandraServer implements Cassandra.Iface
             int nowInSec = FBUtilities.nowInSeconds();
 
             if (predicate.column_names != null)
-                return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState).size();
+                return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).size();
 
             int pageSize;
             // request by page if this is a large row
@@ -742,7 +748,8 @@ public class CassandraServer implements Cassandra.Iface
                                           cState,
                                           pageSize,
                                           nowInSec,
-                                          true);
+                                          true,
+                                          queryStartNanoTime);
         }
         catch (IllegalArgumentException e)
         {
@@ -766,6 +773,7 @@ public class CassandraServer implements Cassandra.Iface
     public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             List<String> keysList = Lists.newArrayList();
@@ -797,7 +805,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                                                  FBUtilities.nowInSeconds(),
                                                                                                  predicate,
                                                                                                  consistency_level,
-                                                                                                 cState);
+                                                                                                 cState,
+                                                                                                 queryStartNanoTime);
 
             for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
                 counts.put(cf.getKey(), cf.getValue().size());
@@ -833,7 +842,7 @@ public class CassandraServer implements Cassandra.Iface
         return column.ttl;
     }
 
-    private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
+    private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level, long queryStartNanoTime)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
@@ -870,12 +879,13 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage());
         }
-        doInsert(consistency_level, Collections.singletonList(mutation));
+        doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime);
     }
 
     public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -891,7 +901,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_insert(key, column_parent, column, consistency_level);
+            internal_insert(key, column_parent, column, consistency_level, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -911,6 +921,7 @@ public class CassandraServer implements Cassandra.Iface
                          ConsistencyLevel commit_consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
@@ -964,7 +975,8 @@ public class CassandraServer implements Cassandra.Iface
                                                        new ThriftCASRequest(toLegacyCells(metadata, expected, nowInSec), partitionUpdates, nowInSec),
                                                        ThriftConversion.fromThrift(serial_consistency_level),
                                                        ThriftConversion.fromThrift(commit_consistency_level),
-                                                       cState))
+                                                       cState,
+                                                       queryStartNanoTime))
             {
                 return result == null
                      ? new CASResult(true)
@@ -1276,6 +1288,7 @@ public class CassandraServer implements Cassandra.Iface
     public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = Maps.newLinkedHashMap();
@@ -1294,7 +1307,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true));
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true), queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1309,6 +1322,7 @@ public class CassandraServer implements Cassandra.Iface
     public void atomic_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = Maps.newLinkedHashMap();
@@ -1327,7 +1341,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true);
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1339,7 +1353,7 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
+    private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp, long queryStartNanoTime)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
@@ -1386,14 +1400,15 @@ public class CassandraServer implements Cassandra.Iface
         org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
 
         if (isCommutativeOp)
-            doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
+            doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime);
         else
-            doInsert(consistency_level, Collections.singletonList(mutation));
+            doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime);
     }
 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -1409,7 +1424,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_remove(key, column_path, timestamp, consistency_level, false);
+            internal_remove(key, column_path, timestamp, consistency_level, false, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1421,13 +1436,13 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations)
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, long queryStartNanoTime)
     throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
     {
-        doInsert(consistency_level, mutations, false);
+        doInsert(consistency_level, mutations, false, queryStartNanoTime);
     }
 
-    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically)
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically, long queryStartNanoTime)
     throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
     {
         org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
@@ -1442,7 +1457,7 @@ public class CassandraServer implements Cassandra.Iface
         schedule(timeout);
         try
         {
-            StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically);
+            StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically, queryStartNanoTime);
         }
         catch (RequestExecutionException e)
         {
@@ -1480,6 +1495,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of(
@@ -1541,7 +1557,7 @@ public class CassandraServer implements Cassandra.Iface
                                                                               limits,
                                                                               new DataRange(bounds, filter),
                                                                               Optional.empty());
-                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                 {
                     assert results != null;
                     return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
@@ -1569,6 +1585,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_family", column_family,
@@ -1635,7 +1652,7 @@ public class CassandraServer implements Cassandra.Iface
                                                                               limits,
                                                                               new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
                                                                               Optional.empty());
-                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                 {
                     return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
                 }
@@ -1684,6 +1701,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(),
@@ -1734,7 +1752,7 @@ public class CassandraServer implements Cassandra.Iface
             // further lookups.
             cmd.maybeValidateIndex();
 
-            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
             {
                 return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
             }
@@ -2139,6 +2157,7 @@ public class CassandraServer implements Cassandra.Iface
     public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
             throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(),
@@ -2183,7 +2202,7 @@ public class CassandraServer implements Cassandra.Iface
                 PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
 
                 org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
-                doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
+                doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime);
             }
             catch (MarshalException|UnknownColumnException e)
             {
@@ -2203,6 +2222,7 @@ public class CassandraServer implements Cassandra.Iface
     public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -2217,7 +2237,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true);
+            internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -2296,6 +2316,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         try
         {
+            long queryStartNanoTime = System.nanoTime();
             String queryString = uncompress(query, compression);
             if (startSessionIfRequested())
             {
@@ -2313,7 +2334,8 @@ public class CassandraServer implements Cassandra.Iface
                                                             cState.getQueryState(),
                                                             QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel),
                                                             Collections.<ByteBuffer>emptyList()),
-                                                            null).toThriftResult();
+                                                            null,
+                                                            queryStartNanoTime).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2359,6 +2381,7 @@ public class CassandraServer implements Cassandra.Iface
 
     public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel) throws TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
@@ -2384,7 +2407,8 @@ public class CassandraServer implements Cassandra.Iface
             return ClientState.getCQLQueryHandler().processPrepared(prepared.statement,
                                                                     cState.getQueryState(),
                                                                     QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables),
-                                                                    null).toThriftResult();
+                                                                    null,
+                                                                    queryStartNanoTime).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2404,6 +2428,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<ColumnOrSuperColumn> get_multi_slice(MultiSliceRequest request)
             throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(request.key),
@@ -2457,7 +2482,8 @@ public class CassandraServer implements Cassandra.Iface
                             false,
                             limits.perPartitionCount(),
                             consistencyLevel,
-                            cState).entrySet().iterator().next().getValue();
+                            cState,
+                            queryStartNanoTime).entrySet().iterator().next().getValue();
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index e20f99b..fe78e64 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -116,7 +116,7 @@ public class TraceStateImpl extends TraceState
     {
         try
         {
-            StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY);
+            StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime());
         }
         catch (OverloadedException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 66e0014..2463306 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -210,7 +210,7 @@ public abstract class Message
                 throw new IllegalArgumentException();
         }
 
-        public abstract Response execute(QueryState queryState);
+        public abstract Response execute(QueryState queryState, long queryStartNanoTime);
 
         public void setTracingRequested()
         {
@@ -501,6 +501,7 @@ public abstract class Message
 
             final Response response;
             final ServerConnection connection;
+            long queryStartNanoTime = System.nanoTime();
 
             try
             {
@@ -512,7 +513,7 @@ public abstract class Message
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
 
                 logger.trace("Received: {}, v={}", request, connection.getVersion());
-                response = request.execute(qstate);
+                response = request.execute(qstate, queryStartNanoTime);
                 response.setStreamId(request.getStreamId());
                 response.setWarnings(ClientWarn.instance.getWarnings());
                 response.attach(connection);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 8b3e866..e90f740 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -68,7 +68,7 @@ public class AuthResponse extends Message.Request
     }
 
     @Override
-    public Response execute(QueryState queryState)
+    public Response execute(QueryState queryState, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 9d1047f..6675565 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -147,7 +147,7 @@ public class BatchMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -214,7 +214,7 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
-            Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload());
+            Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index 4c51cce..4ecaffd 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -71,7 +71,7 @@ public class CredentialsMessage extends Message.Request
         this.credentials = credentials;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index a5348a4..088f278 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -87,7 +87,7 @@ public class ExecuteMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -143,7 +143,7 @@ public class ExecuteMessage extends Message.Request
             // Some custom QueryHandlers are interested by the bound names. We provide them this information
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
-            Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload());
+            Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 2f6e3da..4e95342 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -56,7 +56,7 @@ public class OptionsMessage extends Message.Request
         super(Message.Type.OPTIONS);
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         List<String> cqlVersions = new ArrayList<String>();
         cqlVersions.add(QueryProcessor.CQL_VERSION.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index f54d1d9..f5192de 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -58,7 +58,7 @@ public class PrepareMessage extends Message.Request
         this.query = query;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 3b48d52..2bd5efc 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -82,7 +82,7 @@ public class QueryMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -112,7 +112,7 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
-            Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
+            Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 928e676..c8e48b0 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -62,7 +62,7 @@ public class RegisterMessage extends Message.Request
         this.eventTypes = eventTypes;
     }
 
-    public Response execute(QueryState state)
+    public Response execute(QueryState state, long queryStartNanoTime)
     {
         assert connection instanceof ServerConnection;
         Connection.Tracker tracker = connection.getTracker();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 04d8e62..8966aeb 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -62,7 +62,7 @@ public class StartupMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         String cqlVersion = options.get(CQL_VERSION);
         if (cqlVersion == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 17e426b..6a075c9 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -125,6 +125,6 @@ public class PstmtPersistenceTest extends CQLTester
     {
         ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
         Assert.assertNotNull(prepared);
-        handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap());
+        handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 3fee5f9..156bd66 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -132,7 +132,7 @@ public class DataResolverTest
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -161,7 +161,7 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointSingleRow()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -196,7 +196,7 @@ public class DataResolverTest
     public void testResolveDisjointMultipleRows() throws UnknownHostException
     {
 
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -242,7 +242,7 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointMultipleRowsWithRangeTombstones()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
 
         RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
         RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
@@ -322,7 +322,7 @@ public class DataResolverTest
     @Test
     public void testResolveWithOneEmpty()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
                                                                                                        .add("c2", "v2")
@@ -349,7 +349,7 @@ public class DataResolverTest
     @Test
     public void testResolveWithBothEmpty()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false)));
         resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false)));
 
@@ -364,7 +364,7 @@ public class DataResolverTest
     @Test
     public void testResolveDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         // one response with columns timestamped before a delete in another response
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
@@ -389,7 +389,7 @@ public class DataResolverTest
     @Test
     public void testResolveMultipleDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
         // deletes and columns with interleaved timestamp, with out of order return sequence
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
@@ -471,7 +471,7 @@ public class DataResolverTest
      */
     private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         InetAddress peer2 = peer();
 
@@ -562,7 +562,7 @@ public class DataResolverTest
     public void testResolveComplexDelete()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -611,7 +611,7 @@ public class DataResolverTest
     {
 
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -654,7 +654,7 @@ public class DataResolverTest
     public void testResolveNewCollection()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -700,7 +700,7 @@ public class DataResolverTest
     public void testResolveNewCollectionOverwritingDeleted()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 260c507..f8a77e1 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -316,12 +316,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage process(String query,
                                      QueryState state,
                                      QueryOptions options,
-                                     Map<String, ByteBuffer> customPayload)
-                                             throws RequestExecutionException, RequestValidationException
+                                     Map<String, ByteBuffer> customPayload,
+                                     long queryStartNanoTime)
+                                            throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);
@@ -333,12 +334,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage processBatch(BatchStatement statement,
                                           QueryState state,
                                           BatchQueryOptions options,
-                                          Map<String, ByteBuffer> customPayload)
+                                          Map<String, ByteBuffer> customPayload,
+                                          long queryStartNanoTime)
                                                   throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);
@@ -350,12 +352,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage processPrepared(CQLStatement statement,
                                              QueryState state,
                                              QueryOptions options,
-                                             Map<String, ByteBuffer> customPayload)
-                                                     throws RequestExecutionException, RequestValidationException
+                                             Map<String, ByteBuffer> customPayload,
+                                             long queryStartNanoTime)
+                                                    throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);


Mime
View raw message