cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [2/3] cassandra git commit: Count entire coordinated request against timeout
Date Tue, 16 Aug 2016 19:35:54 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b1b7b10..2309e87 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -35,9 +35,10 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
                                           ConsistencyLevel consistencyLevel,
                                           Keyspace keyspace,
                                           Runnable callback,
-                                          WriteType writeType)
+                                          WriteType writeType,
+                                          long queryStartNanoTime)
     {
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType, queryStartNanoTime);
         assert consistencyLevel.isDatacenterLocal();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 47eacdf..3f1ff3c 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -52,7 +52,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
     public final ResponseResolver resolver;
     private final SimpleCondition condition = new SimpleCondition();
-    private final long start;
+    private final long queryStartNanoTime;
     final int blockfor;
     final List<InetAddress> endpoints;
     private final ReadCommand command;
@@ -69,24 +69,25 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, long queryStartNanoTime)
     {
         this(resolver,
              consistencyLevel,
              consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
              command,
              Keyspace.open(command.metadata().ksName),
-             filteredEndpoints);
+             filteredEndpoints,
+             queryStartNanoTime);
     }
 
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints, long queryStartNanoTime)
     {
         this.command = command;
         this.keyspace = keyspace;
         this.blockfor = blockfor;
         this.consistencyLevel = consistencyLevel;
         this.resolver = resolver;
-        this.start = System.nanoTime();
+        this.queryStartNanoTime = queryStartNanoTime;
         this.endpoints = endpoints;
         // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
         assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
@@ -97,7 +98,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
     public boolean await(long timePastStart, TimeUnit unit)
     {
-        long time = unit.toNanos(timePastStart) - (System.nanoTime() - start);
+        long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
         try
         {
             return condition.await(time, TimeUnit.NANOSECONDS);
@@ -138,7 +139,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
         PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
         if (logger.isTraceEnabled())
-            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
         return result;
     }
 
@@ -163,7 +164,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 TraceState traceState = Tracing.instance.get();
                 if (traceState != null)
                     traceState.trace("Initiating read-repair");
-                StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
+                StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState, queryStartNanoTime));
             }
         }
     }
@@ -210,10 +211,12 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     private class AsyncRepairRunner implements Runnable
     {
         private final TraceState traceState;
+        private final long queryStartNanoTime;
 
-        public AsyncRepairRunner(TraceState traceState)
+        public AsyncRepairRunner(TraceState traceState, long queryStartNanoTime)
         {
             this.traceState = traceState;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         public void run()
@@ -236,7 +239,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 
                 ReadRepairMetrics.repairedBackground.mark();
                 
-                final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size());
+                final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime);
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 for (InetAddress endpoint : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9283c04..9bf90dc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -223,10 +223,11 @@ public class StorageProxy implements StorageProxyMBean
                                   CASRequest request,
                                   ConsistencyLevel consistencyForPaxos,
                                   ConsistencyLevel consistencyForCommit,
-                                  ClientState state)
+                                  ClientState state,
+                                  long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
-        final long start = System.nanoTime();
+        final long startTimeForMetrics = System.nanoTime();
         int contentions = 0;
         try
         {
@@ -236,14 +237,14 @@ public class StorageProxy implements StorageProxyMBean
             CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
 
             long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-            while (System.nanoTime() - start < timeout)
+            while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the start of each attempt
                 Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
                 List<InetAddress> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.left;
                 contentions += pair.right;
 
@@ -253,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean
                 ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
 
                 FilteredPartition current;
-                try (RowIterator rowIter = readOne(readCommand, readConsistency))
+                try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime))
                 {
                     current = FilteredPartition.create(rowIter);
                 }
@@ -281,9 +282,9 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
+                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
                 {
-                    commitPaxos(proposal, consistencyForCommit, true);
+                    commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
                     Tracing.trace("CAS successful");
                     return null;
                 }
@@ -318,7 +319,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             if(contentions > 0)
                 casWriteMetrics.contention.update(contentions);
-            final long latency = System.nanoTime() - start;
+            final long latency = System.nanoTime() - startTimeForMetrics;
             casWriteMetrics.addNano(latency);
             writeMetricsMap.get(consistencyForPaxos).addNano(latency);
         }
@@ -373,7 +374,7 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static Pair<UUID, Integer> beginAndRepairPaxos(long start,
+    private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime,
                                                            DecoratedKey key,
                                                            CFMetaData metadata,
                                                            List<InetAddress> liveEndpoints,
@@ -388,7 +389,7 @@ public class StorageProxy implements StorageProxyMBean
 
         PrepareCallback summary = null;
         int contentions = 0;
-        while (System.nanoTime() - start < timeout)
+        while (System.nanoTime() - queryStartNanoTime < timeout)
         {
             // We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected
             // already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known
@@ -403,7 +404,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos);
+            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -426,11 +427,11 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
+                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
                 {
                     try
                     {
-                        commitPaxos(refreshedInProgress, consistencyForCommit, false);
+                        commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime);
                     }
                     catch (WriteTimeoutException e)
                     {
@@ -481,10 +482,10 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos)
+    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos);
+        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -492,10 +493,10 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel)
+    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel);
+        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -511,7 +512,7 @@ public class StorageProxy implements StorageProxyMBean
         return false;
     }
 
-    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint) throws WriteTimeoutException
+    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws WriteTimeoutException
     {
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
@@ -524,7 +525,7 @@ public class StorageProxy implements StorageProxyMBean
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
         }
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -597,8 +598,9 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
+    public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -613,12 +615,12 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (mutation instanceof CounterMutation)
                 {
-                    responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
+                    responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
                 }
                 else
                 {
                     WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH;
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt));
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt, queryStartNanoTime));
                 }
             }
 
@@ -728,8 +730,9 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the mutations to be applied across the replicas
      * @param writeCommitLog if commitlog should be written
      * @param baseComplete time from epoch in ms that the local base mutation was(or will be) completed
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete)
+    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -791,7 +794,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                       Collections.singletonList(pairedEndpoint.get()),
                                                                       baseComplete,
                                                                       WriteType.BATCH,
-                                                                      cleanup));
+                                                                      cleanup,
+                                                                      queryStartNanoTime));
                         }
                     }
                     else
@@ -834,7 +838,8 @@ public class StorageProxy implements StorageProxyMBean
     @SuppressWarnings("unchecked")
     public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
                                           ConsistencyLevel consistencyLevel,
-                                          boolean mutateAtomically)
+                                          boolean mutateAtomically,
+                                          long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException
     {
         Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
@@ -844,13 +849,13 @@ public class StorageProxy implements StorageProxyMBean
                               .updatesAffectView(mutations, true);
 
         if (augmented != null)
-            mutateAtomically(augmented, consistencyLevel, updatesView);
+            mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime);
         else
         {
             if (mutateAtomically || updatesView)
-                mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView);
+                mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView, queryStartNanoTime);
             else
-                mutate(mutations, consistencyLevel);
+                mutate(mutations, consistencyLevel, queryStartNanoTime);
         }
     }
 
@@ -863,10 +868,12 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the Mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
      * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
     public static void mutateAtomically(Collection<Mutation> mutations,
                                         ConsistencyLevel consistency_level,
-                                        boolean requireQuorumForRemove)
+                                        boolean requireQuorumForRemove,
+                                        long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for atomic batch");
@@ -894,7 +901,7 @@ public class StorageProxy implements StorageProxyMBean
             final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -903,14 +910,15 @@ public class StorageProxy implements StorageProxyMBean
                                                                                consistency_level,
                                                                                batchConsistencyLevel,
                                                                                WriteType.BATCH,
-                                                                               cleanup);
+                                                                               cleanup,
+                                                                               queryStartNanoTime);
                 // exit early if we can't fulfill the CL at this time.
                 wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
             // write to the batchlog
-            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
+            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime);
 
             // now actually perform the writes and wait for them to complete
             syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
@@ -950,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
         WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
@@ -958,7 +966,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                      endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
                                                                      Keyspace.open(SystemKeyspace.NAME),
                                                                      null,
-                                                                     WriteType.BATCH_LOG);
+                                                                     WriteType.BATCH_LOG,
+                                                                     queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
 
@@ -987,13 +996,13 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
     {
         if (!endpoints.current.isEmpty())
             asyncRemoveFromBatchlog(endpoints.current, uuid);
 
         if (!endpoints.legacy.isEmpty())
-            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
+            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime);
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
@@ -1054,14 +1063,15 @@ public class StorageProxy implements StorageProxyMBean
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
      * @param callback an optional callback to be run if and when the write is
-     * successful.
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
     public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation,
-                                                            ConsistencyLevel consistency_level,
-                                                            String localDataCenter,
-                                                            WritePerformer performer,
-                                                            Runnable callback,
-                                                            WriteType writeType)
+                                                                       ConsistencyLevel consistency_level,
+                                                                       String localDataCenter,
+                                                                       WritePerformer performer,
+                                                                       Runnable callback,
+                                                                       WriteType writeType,
+                                                                       long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
         String keyspaceName = mutation.getKeyspaceName();
@@ -1071,7 +1081,7 @@ public class StorageProxy implements StorageProxyMBean
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
-        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -1085,7 +1095,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                         ConsistencyLevel consistency_level,
                                                                         ConsistencyLevel batchConsistencyLevel,
                                                                         WriteType writeType,
-                                                                        BatchlogResponseHandler.BatchlogCleanup cleanup)
+                                                                        BatchlogResponseHandler.BatchlogCleanup cleanup,
+                                                                        long queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1093,8 +1104,8 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1108,7 +1119,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                             List<InetAddress> naturalEndpoints,
                                                                             AtomicLong baseComplete,
                                                                             WriteType writeType,
-                                                                            BatchlogResponseHandler.BatchlogCleanup cleanup)
+                                                                            BatchlogResponseHandler.BatchlogCleanup cleanup,
+                                                                            long queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1118,8 +1130,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
-        }, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        }, writeType, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1400,13 +1412,13 @@ public class StorageProxy implements StorageProxyMBean
      * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
      */
-    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
+    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
 
         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
         {
-            return applyCounterMutationOnCoordinator(cm, localDataCenter);
+            return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
         }
         else
         {
@@ -1417,10 +1429,10 @@ public class StorageProxy implements StorageProxyMBean
             List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
             Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
-            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
+            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER);
+            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime);
 
             Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false);
@@ -1467,18 +1479,18 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
-    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime);
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
@@ -1512,31 +1524,31 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
-    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel)
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        return readOne(command, consistencyLevel, null);
+        return readOne(command, consistencyLevel, null, queryStartNanoTime);
     }
 
-    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state)
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command);
+        return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state, queryStartNanoTime), command);
     }
 
-    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         // When using serial CL, the ClientState should be provided
         assert !consistencyLevel.isSerialConsistency();
-        return read(group, consistencyLevel, null);
+        return read(group, consistencyLevel, null, queryStartNanoTime);
     }
 
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
      * a specific set of column names from a given column family.
      */
-    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
@@ -1547,11 +1559,11 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         return consistencyLevel.isSerialConsistency()
-             ? readWithPaxos(group, consistencyLevel, state)
-             : readRegular(group, consistencyLevel);
+             ? readWithPaxos(group, consistencyLevel, state, queryStartNanoTime)
+             : readRegular(group, consistencyLevel, queryStartNanoTime);
     }
 
-    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
         assert state != null;
@@ -1591,7 +1603,7 @@ public class StorageProxy implements StorageProxyMBean
                 throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
             }
 
-            result = fetchRows(group.commands, consistencyForCommitOrFetch);
+            result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
         }
         catch (UnavailableException e)
         {
@@ -1627,13 +1639,13 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @SuppressWarnings("resource")
-    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         long start = System.nanoTime();
         try
         {
-            PartitionIterator result = fetchRows(group.commands, consistencyLevel);
+            PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
             // If we have more than one command, then despite each read command honoring the limit, the total result
             // might not honor it and so we should enforce it
             if (group.commands.size() > 1)
@@ -1680,14 +1692,14 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         int cmdCount = commands.size();
 
         SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
         for (int i = 0; i < cmdCount; i++)
-            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
+            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel, queryStartNanoTime);
 
         for (int i = 0; i < cmdCount; i++)
             reads[i].doInitialQueries();
@@ -1717,15 +1729,17 @@ public class StorageProxy implements StorageProxyMBean
         private final SinglePartitionReadCommand command;
         private final AbstractReadExecutor executor;
         private final ConsistencyLevel consistency;
+        private final long queryStartNanoTime;
 
         private PartitionIterator result;
         private ReadCallback repairHandler;
 
-        SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency)
+        SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency, long queryStartNanoTime)
         {
             this.command = command;
-            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
+            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency, queryStartNanoTime);
             this.consistency = consistency;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         boolean isDone()
@@ -1757,13 +1771,14 @@ public class StorageProxy implements StorageProxyMBean
 
                 // Do a full data read to resolve the correct response (and repair node that need be)
                 Keyspace keyspace = Keyspace.open(command.metadata().ksName);
-                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
+                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime);
                 repairHandler = new ReadCallback(resolver,
                                                  ConsistencyLevel.ALL,
                                                  executor.getContactedReplicas().size(),
                                                  command,
                                                  keyspace,
-                                                 executor.handler.endpoints);
+                                                 executor.handler.endpoints,
+                                                 queryStartNanoTime);
 
                 for (InetAddress endpoint : executor.getContactedReplicas())
                 {
@@ -2052,6 +2067,7 @@ public class StorageProxy implements StorageProxyMBean
         private final ConsistencyLevel consistency;
 
         private final long startTime;
+        private final long queryStartNanoTime;
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
@@ -2061,7 +2077,7 @@ public class StorageProxy implements StorageProxyMBean
         private int liveReturned;
         private int rangesQueried;
 
-        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
         {
             this.command = command;
             this.concurrencyFactor = concurrencyFactor;
@@ -2070,6 +2086,7 @@ public class StorageProxy implements StorageProxyMBean
             this.totalRangeCount = ranges.rangeCount();
             this.consistency = consistency;
             this.keyspace = keyspace;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         public RowIterator computeNext()
@@ -2145,12 +2162,12 @@ public class StorageProxy implements StorageProxyMBean
         {
             PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
 
-            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
+            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime);
 
             int blockFor = consistency.blockFor(keyspace);
             int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
             List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
-            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
+            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
 
@@ -2204,7 +2221,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @SuppressWarnings("resource")
-    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
+    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     {
         Tracing.trace("Computing ranges to query");
 
@@ -2225,7 +2242,7 @@ public class StorageProxy implements StorageProxyMBean
 
         // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
 
-        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
+        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec());
     }
 
     public Map<String, List<String>> getSchemaVersions()
@@ -2485,9 +2502,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
     {
-        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
         {
-            super(writeHandler, i, cleanup);
+            super(writeHandler, i, cleanup, queryStartNanoTime);
             viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 1dc03e0..46e4e93 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -47,20 +47,21 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
                                 ConsistencyLevel consistencyLevel,
                                 Keyspace keyspace,
                                 Runnable callback,
-                                WriteType writeType)
+                                WriteType writeType,
+                                long queryStartNanoTime)
     {
-        super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+        super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime);
         responses = totalBlockFor();
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
     {
-        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType);
+        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime)
     {
-        this(endpoint, writeType, null);
+        this(endpoint, writeType, null, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 01a56c4..d9b3632 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -55,7 +55,7 @@ abstract class AbstractQueryPager implements QueryPager
         return command.executionController();
     }
 
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState)
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
     {
         if (isExhausted())
             return EmptyIterators.partition();
@@ -63,7 +63,7 @@ abstract class AbstractQueryPager implements QueryPager
         pageSize = Math.min(pageSize, remaining);
         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
 
-        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
+        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
     }
 
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
index 5483d15..f9a8cda 100644
--- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -52,12 +52,13 @@ public final class AggregationQueryPager implements QueryPager
     @Override
     public PartitionIterator fetchPage(int pageSize,
                                        ConsistencyLevel consistency,
-                                       ClientState clientState)
+                                       ClientState clientState,
+                                       long queryStartNanoTime)
     {
         if (limits.isGroupByLimit())
-            return new GroupByPartitionIterator(pageSize, consistency, clientState);
+            return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
 
-        return new AggregationPartitionIterator(pageSize, consistency, clientState);
+        return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
     }
 
     @Override
@@ -70,9 +71,9 @@ public final class AggregationQueryPager implements QueryPager
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
     {
         if (limits.isGroupByLimit())
-            return new GroupByPartitionIterator(pageSize, executionController);
+            return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime());
 
-        return new AggregationPartitionIterator(pageSize, executionController);
+        return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime());
     }
 
     @Override
@@ -152,28 +153,34 @@ public final class AggregationQueryPager implements QueryPager
          */
         private int initialMaxRemaining;
 
+        private long queryStartNanoTime;
+
         public GroupByPartitionIterator(int pageSize,
                                          ConsistencyLevel consistency,
-                                         ClientState clientState)
+                                         ClientState clientState,
+                                        long queryStartNanoTime)
         {
-            this(pageSize, consistency, clientState, null);
+            this(pageSize, consistency, clientState, null, queryStartNanoTime);
         }
 
         public GroupByPartitionIterator(int pageSize,
-                                        ReadExecutionController executionController)
+                                        ReadExecutionController executionController,
+                                        long queryStartNanoTime)
        {
-           this(pageSize, null, null, executionController);
+           this(pageSize, null, null, executionController, queryStartNanoTime);
        }
 
         private GroupByPartitionIterator(int pageSize,
                                          ConsistencyLevel consistency,
                                          ClientState clientState,
-                                         ReadExecutionController executionController)
+                                         ReadExecutionController executionController,
+                                         long queryStartNanoTime)
         {
             this.pageSize = handlePagingOff(pageSize);
             this.consistency = consistency;
             this.clientState = clientState;
             this.executionController = executionController;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         private int handlePagingOff(int pageSize)
@@ -280,7 +287,7 @@ public final class AggregationQueryPager implements QueryPager
          */
         private final PartitionIterator fetchSubPage(int subPageSize)
         {
-            return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState)
+            return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime)
                                        : subPager.fetchPageInternal(subPageSize, executionController);
         }
 
@@ -393,15 +400,17 @@ public final class AggregationQueryPager implements QueryPager
     {
         public AggregationPartitionIterator(int pageSize,
                                             ConsistencyLevel consistency,
-                                            ClientState clientState)
+                                            ClientState clientState,
+                                            long queryStartNanoTime)
         {
-            super(pageSize, consistency, clientState);
+            super(pageSize, consistency, clientState, queryStartNanoTime);
         }
 
         public AggregationPartitionIterator(int pageSize,
-                                            ReadExecutionController executionController)
+                                            ReadExecutionController executionController,
+                                            long queryStartNanoTime)
         {
-            super(pageSize, executionController);
+            super(pageSize, executionController, queryStartNanoTime);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9670f28..75cc71f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -142,17 +142,17 @@ public class MultiPartitionPager implements QueryPager
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        return new PagersIterator(toQuery, consistency, clientState, null);
+        return new PagersIterator(toQuery, consistency, clientState, null, queryStartNanoTime);
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        return new PagersIterator(toQuery, null, null, executionController);
+        return new PagersIterator(toQuery, null, null, executionController, System.nanoTime());
     }
 
     private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
@@ -160,6 +160,7 @@ public class MultiPartitionPager implements QueryPager
         private final int pageSize;
         private PartitionIterator result;
         private boolean closed;
+        private final long queryStartNanoTime;
 
         // For "normal" queries
         private final ConsistencyLevel consistency;
@@ -171,12 +172,13 @@ public class MultiPartitionPager implements QueryPager
         private int pagerMaxRemaining;
         private int counted;
 
-        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController)
+        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController, long queryStartNanoTime)
         {
             this.pageSize = pageSize;
             this.consistency = consistency;
             this.clientState = clientState;
             this.executionController = executionController;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         protected RowIterator computeNext()
@@ -205,7 +207,7 @@ public class MultiPartitionPager implements QueryPager
                 int toQuery = pageSize - counted;
                 result = consistency == null
                        ? pagers[current].fetchPageInternal(toQuery, executionController)
-                       : pagers[current].fetchPage(toQuery, consistency, clientState);
+                       : pagers[current].fetchPage(toQuery, consistency, clientState, queryStartNanoTime);
             }
             return result.next();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index edd2a55..5d23997 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -54,7 +54,7 @@ public interface QueryPager
             return ReadExecutionController.empty();
         }
 
-        public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+        public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
         {
             return EmptyIterators.partition();
         }
@@ -94,7 +94,7 @@ public interface QueryPager
      * {@code consistency} is a serial consistency.
      * @return the page of result.
      */
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException;
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Starts a new read operation.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 02b5de2..7fb4e70 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -45,7 +45,8 @@ public class QueryPagers
                                  ClientState state,
                                  final int pageSize,
                                  int nowInSec,
-                                 boolean isForThrift) throws RequestValidationException, RequestExecutionException
+                                 boolean isForThrift,
+                                 long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
         final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
@@ -53,7 +54,7 @@ public class QueryPagers
         int count = 0;
         while (!pager.isExhausted())
         {
-            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
+            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime))
             {
                 DataLimits.Counter counter = limits.newCounter(nowInSec, true);
                 PartitionIterators.consume(counter.applyTo(iter));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
index 37defde..90bfc5d 100644
--- a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
@@ -35,12 +35,14 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
     protected final CountDownLatch latch;
     protected final int targets;
     private final ConsistencyLevel consistency;
+    private final long queryStartNanoTime;
 
-    public AbstractPaxosCallback(int targets, ConsistencyLevel consistency)
+    public AbstractPaxosCallback(int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
         this.targets = targets;
         this.consistency = consistency;
         latch = new CountDownLatch(targets);
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public boolean isLatencyForSnitch()
@@ -57,7 +59,8 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
     {
         try
         {
-            if (!latch.await(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS))
+            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - queryStartNanoTime);
+            if (!latch.await(timeout, TimeUnit.NANOSECONDS))
                 throw new WriteTimeoutException(WriteType.CAS, consistency, getResponseCount(), targets);
         }
         catch (InterruptedException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 544403a..5915eab 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -49,9 +49,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
-    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
+    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
-        super(targets, consistency);
+        super(targets, consistency, queryStartNanoTime);
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
         mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index b0bd163..c9cb1f0 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -50,9 +50,9 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean>
     private final int requiredAccepts;
     private final boolean failFast;
 
-    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency)
+    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency, long queryStartNanoTime)
     {
-        super(totalTargets, consistency);
+        super(totalTargets, consistency, queryStartNanoTime);
         this.requiredAccepts = requiredTargets;
         this.failFast = failFast;
     }


Mime
View raw message