cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/2] cassandra git commit: Abort in-progress queries that time out
Date Wed, 07 Oct 2015 08:29:41 GMT
Abort in-progress queries that time out

patch by Stefania; reviewed by aweisberg for CASSANDRA-7392


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/557bbbcc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/557bbbcc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/557bbbcc

Branch: refs/heads/trunk
Commit: 557bbbccb0eddc9f2ba6431b023b3ded253de056
Parents: aebdef8
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Fri Jul 3 15:16:15 2015 +0800
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Oct 7 10:28:37 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../concurrent/ScheduledExecutors.java          |   5 +
 .../apache/cassandra/cql3/UntypedResultSet.java |   3 +-
 .../cql3/statements/ModificationStatement.java  |   6 +-
 .../cql3/statements/SelectStatement.java        |  18 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   2 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  73 +++-
 .../cassandra/db/ReadCommandVerbHandler.java    |  13 +-
 .../cassandra/db/ReadExecutionController.java   | 132 +++++++
 .../org/apache/cassandra/db/ReadOrderGroup.java | 127 -------
 src/java/org/apache/cassandra/db/ReadQuery.java |  14 +-
 .../db/SinglePartitionNamesCommand.java         |   2 +-
 .../db/SinglePartitionReadCommand.java          |  29 +-
 .../db/SinglePartitionSliceCommand.java         |   2 +-
 src/java/org/apache/cassandra/db/Slices.java    |   2 +-
 .../db/filter/ClusteringIndexNamesFilter.java   |   2 +-
 .../cassandra/db/filter/ColumnFilter.java       |   7 +-
 .../db/monitoring/ApproximateTime.java          |  61 ++++
 .../db/monitoring/ConstructionTime.java         |  41 +++
 .../cassandra/db/monitoring/Monitorable.java    |  33 ++
 .../db/monitoring/MonitorableImpl.java          | 104 ++++++
 .../db/monitoring/MonitoringState.java          |  26 ++
 .../cassandra/db/monitoring/MonitoringTask.java | 264 ++++++++++++++
 src/java/org/apache/cassandra/db/view/View.java |   8 +-
 .../apache/cassandra/db/view/ViewBuilder.java   |   4 +-
 src/java/org/apache/cassandra/index/Index.java  |   2 +-
 .../index/internal/CassandraIndexSearcher.java  |  12 +-
 .../internal/composites/CompositesSearcher.java |   6 +-
 .../index/internal/keys/KeysSearcher.java       |   6 +-
 .../apache/cassandra/net/IAsyncCallback.java    |   4 +-
 .../net/IAsyncCallbackWithFailure.java          |   2 +-
 .../org/apache/cassandra/net/IVerbHandler.java  |   4 +-
 .../cassandra/net/IncomingTcpConnection.java    |  15 +-
 .../cassandra/net/MessageDeliveryTask.java      |  12 +-
 .../org/apache/cassandra/net/MessageIn.java     |  55 ++-
 .../apache/cassandra/net/MessagingService.java  |   9 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |  10 +-
 .../cassandra/serializers/ByteSerializer.java   |   2 +-
 .../apache/cassandra/service/ReadCallback.java  |   3 +-
 .../apache/cassandra/service/StorageProxy.java  |  30 +-
 .../service/pager/AbstractQueryPager.java       |   8 +-
 .../service/pager/MultiPartitionPager.java      |  16 +-
 .../cassandra/service/pager/QueryPager.java     |  16 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  12 +
 .../utils/NanoTimeToCurrentTimeMillis.java      |  48 +--
 test/conf/logback-test.xml                      |   6 +-
 test/unit/org/apache/cassandra/Util.java        |  24 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |   9 +-
 .../apache/cassandra/db/ReadCommandTest.java    | 145 ++++++++
 .../db/RepairedDataTombstonesTest.java          |   9 +-
 .../apache/cassandra/db/SecondaryIndexTest.java |   7 +-
 .../db/SinglePartitionSliceCommandTest.java     |   8 +-
 .../db/monitoring/MonitoringTaskTest.java       | 341 +++++++++++++++++++
 .../org/apache/cassandra/hints/HintTest.java    |   4 +-
 .../org/apache/cassandra/index/StubIndex.java   |   4 +-
 .../index/internal/CassandraIndexTest.java      |   4 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   5 +-
 .../cassandra/service/DataResolverTest.java     |   3 +-
 .../cassandra/service/QueryPagerTest.java       |   3 +-
 .../utils/NanoTimeToCurrentTimeMillisTest.java  |   5 +-
 60 files changed, 1494 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95a940b..725da54 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Abort in-progress queries that time out (CASSANDRA-7392)
  * Add transparent data encryption core classes (CASSANDRA-9945)
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 5935669..35469cc 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -23,6 +23,11 @@ package org.apache.cassandra.concurrent;
 public class ScheduledExecutors
 {
     /**
+     * This pool is used for periodic fast (sub-microsecond) tasks.
+     */
+    public static final DebuggableScheduledThreadPoolExecutor scheduledFastTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledFastTasks");
+
+    /**
      * This pool is used for periodic short (sub-second) tasks.
      */
      public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 136e2fd..a6d8e93 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -187,7 +187,8 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row>
                         if (pager.isExhausted())
                             return endOfData();
 
-                        try (ReadOrderGroup orderGroup = pager.startOrderGroup(); PartitionIterator iter = pager.fetchPageInternal(pageSize, orderGroup))
+                        try (ReadExecutionController executionController = pager.executionController();
+                             PartitionIterator iter = pager.fetchPageInternal(pageSize, executionController))
                         {
                             currentPage = select.process(iter, nowInSec).rows.iterator();
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 7eefd8e..0287299 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -360,7 +360,8 @@ public abstract class ModificationStatement implements CQLStatement
 
         if (local)
         {
-            try (ReadOrderGroup orderGroup = group.startOrderGroup(); PartitionIterator iter = group.executeInternal(orderGroup))
+            try (ReadExecutionController executionController = group.executionController();
+                 PartitionIterator iter = group.executeInternal(executionController))
             {
                 return asMaterializedMap(iter);
             }
@@ -575,7 +576,8 @@ public abstract class ModificationStatement implements CQLStatement
 
         SinglePartitionReadCommand<?> readCommand = request.readCommand(FBUtilities.nowInSeconds());
         FilteredPartition current;
-        try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup))
+        try (ReadExecutionController executionController = readCommand.executionController();
+             PartitionIterator iter = readCommand.executeInternal(executionController))
         {
             current = FilteredPartition.create(PartitionIterators.getOnlyElement(iter, readCommand));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 21bb257..67d4622 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -253,9 +253,9 @@ public class SelectStatement implements CQLStatement
             this.pager = pager;
         }
 
-        public static Pager forInternalQuery(QueryPager pager, ReadOrderGroup orderGroup)
+        public static Pager forInternalQuery(QueryPager pager, ReadExecutionController executionController)
         {
-            return new InternalPager(pager, orderGroup);
+            return new InternalPager(pager, executionController);
         }
 
         public static Pager forDistributedQuery(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
@@ -295,17 +295,17 @@ public class SelectStatement implements CQLStatement
 
         public static class InternalPager extends Pager
         {
-            private final ReadOrderGroup orderGroup;
+            private final ReadExecutionController executionController;
 
-            private InternalPager(QueryPager pager, ReadOrderGroup orderGroup)
+            private InternalPager(QueryPager pager, ReadExecutionController executionController)
             {
                 super(pager);
-                this.orderGroup = orderGroup;
+                this.executionController = executionController;
             }
 
             public PartitionIterator fetchPage(int pageSize)
             {
-                return pager.fetchPageInternal(pageSize, orderGroup);
+                return pager.fetchPageInternal(pageSize, executionController);
             }
         }
     }
@@ -378,11 +378,11 @@ public class SelectStatement implements CQLStatement
         ReadQuery query = getQuery(options, nowInSec);
         int pageSize = getPageSize(options);
 
-        try (ReadOrderGroup orderGroup = query.startOrderGroup())
+        try (ReadExecutionController executionController = query.executionController())
         {
             if (pageSize <= 0 || query.limits().count() <= pageSize)
             {
-                try (PartitionIterator data = query.executeInternal(orderGroup))
+                try (PartitionIterator data = query.executeInternal(executionController))
                 {
                     return processResults(data, options, nowInSec);
                 }
@@ -390,7 +390,7 @@ public class SelectStatement implements CQLStatement
             else
             {
                 QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
-                return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
+                return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index f17f3e3..8fd53a7 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -175,7 +175,7 @@ public class PartitionRangeReadCommand extends ReadCommand
         metric.rangeLatency.addNano(latencyNanos);
     }
 
-    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
     {
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, dataRange().keyRange()));
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 91227cf..222f81b 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -43,6 +44,7 @@ import org.apache.cassandra.schema.UnknownIndexException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -51,10 +53,10 @@ import org.apache.cassandra.utils.Pair;
  * <p>
  * This contains all the informations needed to do a local read.
  */
-public abstract class ReadCommand implements ReadQuery
+public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
 {
+    private static final int TEST_ITERATION_DELAY_MILLIS = Integer.valueOf(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
-
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
     // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
     // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
@@ -276,7 +278,7 @@ public abstract class ReadCommand implements ReadQuery
      */
     public abstract ReadCommand copy();
 
-    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
+    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
 
     protected abstract int oldestUnrepairedTombstone();
 
@@ -321,13 +323,13 @@ public abstract class ReadCommand implements ReadQuery
     /**
      * Executes this command on the local host.
      *
-     * @param orderGroup the operation group spanning this command
+     * @param executionController the execution controller spanning this command
      *
      * @return an iterator over the result of executing this command locally.
      */
     @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
                                   // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
-    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
+    public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
     {
         long startTimeNanos = System.nanoTime();
 
@@ -339,11 +341,12 @@ public abstract class ReadCommand implements ReadQuery
             Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
 
         UnfilteredPartitionIterator resultIterator = searcher == null
-                                         ? queryStorage(cfs, orderGroup)
-                                         : searcher.search(orderGroup);
+                                         ? queryStorage(cfs, executionController)
+                                         : searcher.search(executionController);
 
         try
         {
+            resultIterator = withStateTracking(resultIterator);
             resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
 
             // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
@@ -367,14 +370,14 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 
-    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+    public PartitionIterator executeInternal(ReadExecutionController controller)
     {
-        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
+        return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
     }
 
-    public ReadOrderGroup startOrderGroup()
+    public ReadExecutionController executionController()
     {
-        return ReadOrderGroup.forCommand(this);
+        return ReadExecutionController.forCommand(this);
     }
 
     /**
@@ -470,6 +473,48 @@ public abstract class ReadCommand implements ReadQuery
         };
     }
 
+    protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter)
+    {
+        return new WrappingUnfilteredPartitionIterator(iter)
+        {
+            @Override
+            public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            {
+                if (isAborted())
+                    return null;
+
+                if (TEST_ITERATION_DELAY_MILLIS > 0)
+                    maybeDelayForTesting();
+
+                return iter;
+            }
+        };
+    }
+
+    protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator iter)
+    {
+        return new WrappingUnfilteredRowIterator(iter)
+        {
+            @Override
+            public boolean hasNext()
+            {
+                if (isAborted())
+                    return false;
+
+                if (TEST_ITERATION_DELAY_MILLIS > 0)
+                    maybeDelayForTesting();
+
+                return super.hasNext();
+            }
+        };
+    }
+
+    private void maybeDelayForTesting()
+    {
+        if (!metadata.ksName.startsWith("system"))
+            FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
+    }
+
     /**
      * Creates a message for this command.
      */
@@ -512,6 +557,12 @@ public abstract class ReadCommand implements ReadQuery
         return sb.toString();
     }
 
+    // Monitorable interface
+    public String name()
+    {
+        return toCQLString();
+    }
+
     private static class Serializer implements IVersionedSerializer<ReadCommand>
     {
         private static int digestFlag(boolean isDigest)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 72a6fa8..9eaa8fa 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -41,15 +41,24 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         }
 
         ReadCommand command = message.payload;
+        command.setMonitoringTime(message.constructionTime, message.getTimeout());
+
         ReadResponse response;
-        try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
+        try (ReadExecutionController executionController = command.executionController();
+             UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
         {
             response = command.createResponse(iterator, command.columnFilter());
         }
 
-        MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
+        if (!command.complete())
+        {
+            Tracing.trace("Discarding partial response to {} (timed out)", message.from);
+            MessagingService.instance().incrementDroppedMessages(message);
+            return;
+        }
 
         Tracing.trace("Enqueuing response to {}", message.from);
+        MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer);
         MessagingService.instance().sendReply(reply, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/ReadExecutionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java
new file mode 100644
index 0000000..0bb40f8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class ReadExecutionController implements AutoCloseable
+{
+    // For every reads
+    private final OpOrder.Group baseOp;
+
+    // For index reads
+    private final OpOrder.Group indexOp;
+    private final OpOrder.Group writeOp;
+
+    private ReadExecutionController(OpOrder.Group baseOp, OpOrder.Group indexOp, OpOrder.Group writeOp)
+    {
+        this.baseOp = baseOp;
+        this.indexOp = indexOp;
+        this.writeOp = writeOp;
+    }
+
+    public OpOrder.Group baseReadOpOrderGroup()
+    {
+        return baseOp;
+    }
+
+    public OpOrder.Group indexReadOpOrderGroup()
+    {
+        return indexOp;
+    }
+
+    public OpOrder.Group writeOpOrderGroup()
+    {
+        return writeOp;
+    }
+
+    public static ReadExecutionController empty()
+    {
+        return new ReadExecutionController(null, null, null);
+    }
+
+    public static ReadExecutionController forReadOp(OpOrder.Group readOp)
+    {
+        return new ReadExecutionController(readOp, null, null);
+    }
+
+    public static ReadExecutionController forCommand(ReadCommand command)
+    {
+        ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
+        ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command);
+
+        if (indexCfs == null)
+        {
+            return new ReadExecutionController(baseCfs.readOrdering.start(), null, null);
+        }
+        else
+        {
+            OpOrder.Group baseOp = null, indexOp = null, writeOp;
+            // OpOrder.start() shouldn't fail, but better safe than sorry.
+            try
+            {
+                baseOp = baseCfs.readOrdering.start();
+                indexOp = indexCfs.readOrdering.start();
+                // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
+                // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
+                writeOp = baseCfs.keyspace.writeOrder.start();
+                return new ReadExecutionController(baseOp, indexOp, writeOp);
+            }
+            catch (RuntimeException e)
+            {
+                // Note that must have writeOp == null since ReadOrderGroup ctor can't fail
+                try
+                {
+                    if (baseOp != null)
+                        baseOp.close();
+                }
+                finally
+                {
+                    if (indexOp != null)
+                        indexOp.close();
+                }
+                throw e;
+            }
+        }
+    }
+
+    private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
+    {
+        Index index = command.getIndex(baseCfs);
+        return index == null ? null : index.getBackingTable().orElse(null);
+    }
+
+    public void close()
+    {
+        try
+        {
+            if (baseOp != null)
+                baseOp.close();
+        }
+        finally
+        {
+            if (indexOp != null)
+            {
+                try
+                {
+                    indexOp.close();
+                }
+                finally
+                {
+                    writeOp.close();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
deleted file mode 100644
index 0720d79..0000000
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.index.Index;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-public class ReadOrderGroup implements AutoCloseable
-{
-    // For every reads
-    private final OpOrder.Group baseOp;
-
-    // For index reads
-    private final OpOrder.Group indexOp;
-    private final OpOrder.Group writeOp;
-
-    private ReadOrderGroup(OpOrder.Group baseOp, OpOrder.Group indexOp, OpOrder.Group writeOp)
-    {
-        this.baseOp = baseOp;
-        this.indexOp = indexOp;
-        this.writeOp = writeOp;
-    }
-
-    public OpOrder.Group baseReadOpOrderGroup()
-    {
-        return baseOp;
-    }
-
-    public OpOrder.Group indexReadOpOrderGroup()
-    {
-        return indexOp;
-    }
-
-    public OpOrder.Group writeOpOrderGroup()
-    {
-        return writeOp;
-    }
-
-    public static ReadOrderGroup emptyGroup()
-    {
-        return new ReadOrderGroup(null, null, null);
-    }
-
-    public static ReadOrderGroup forCommand(ReadCommand command)
-    {
-        ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(command.metadata());
-        ColumnFamilyStore indexCfs = maybeGetIndexCfs(baseCfs, command);
-
-        if (indexCfs == null)
-        {
-            return new ReadOrderGroup(baseCfs.readOrdering.start(), null, null);
-        }
-        else
-        {
-            OpOrder.Group baseOp = null, indexOp = null, writeOp;
-            // OpOrder.start() shouldn't fail, but better safe than sorry.
-            try
-            {
-                baseOp = baseCfs.readOrdering.start();
-                indexOp = indexCfs.readOrdering.start();
-                // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room
-                // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made
-                writeOp = baseCfs.keyspace.writeOrder.start();
-                return new ReadOrderGroup(baseOp, indexOp, writeOp);
-            }
-            catch (RuntimeException e)
-            {
-                // Note that must have writeOp == null since ReadOrderGroup ctor can't fail
-                try
-                {
-                    if (baseOp != null)
-                        baseOp.close();
-                }
-                finally
-                {
-                    if (indexOp != null)
-                        indexOp.close();
-                }
-                throw e;
-            }
-        }
-    }
-
-    private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
-    {
-        Index index = command.getIndex(baseCfs);
-        return index == null ? null : index.getBackingTable().orElse(null);
-    }
-
-    public void close()
-    {
-        try
-        {
-            if (baseOp != null)
-                baseOp.close();
-        }
-        finally
-        {
-            if (indexOp != null)
-            {
-                try
-                {
-                    indexOp.close();
-                }
-                finally
-                {
-                    writeOp.close();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index d1f5272..2b5c09c 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -35,9 +35,9 @@ public interface ReadQuery
 {
     ReadQuery EMPTY = new ReadQuery()
     {
-        public ReadOrderGroup startOrderGroup()
+        public ReadExecutionController executionController()
         {
-            return ReadOrderGroup.emptyGroup();
+            return ReadExecutionController.empty();
         }
 
         public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
@@ -45,7 +45,7 @@ public interface ReadQuery
             return PartitionIterators.EMPTY;
         }
 
-        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+        public PartitionIterator executeInternal(ReadExecutionController controller)
         {
             return PartitionIterators.EMPTY;
         }
@@ -86,9 +86,9 @@ public interface ReadQuery
      * The returned object <b>must</b> be closed on all path and it is thus strongly advised to
      * use it in a try-with-ressource construction.
      *
-     * @return a newly started order group for this {@code ReadQuery}.
+     * @return a newly started execution controller for this {@code ReadQuery}.
      */
-    public ReadOrderGroup startOrderGroup();
+    public ReadExecutionController executionController();
 
     /**
      * Executes the query at the provided consistency level.
@@ -104,10 +104,10 @@ public interface ReadQuery
     /**
      * Execute the query for internal queries (that is, it basically executes the query locally).
      *
-     * @param orderGroup the {@code ReadOrderGroup} protecting the read.
+     * @param controller the {@code ReadExecutionController} protecting the read.
      * @return the result of the query.
      */
-    public PartitionIterator executeInternal(ReadOrderGroup orderGroup);
+    public PartitionIterator executeInternal(ReadExecutionController controller);
 
     /**
      * Returns a pager for the query.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index 430e4a1..1181485 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -181,7 +181,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
             }
         }
 
-        return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+        return withStateTracking(result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()));
     }
 
     private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, boolean isRepaired)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index a8e37b4..3a50f23 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -254,12 +254,12 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         metric.readLatency.addNano(latencyNanos);
     }
 
-    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
     {
         @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
         UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
-                                        ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup())
-                                        : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup());
+                                        ? getThroughCache(cfs, executionController)
+                                        : queryMemtableAndDisk(cfs, executionController);
         return new SingletonUnfilteredPartitionIterator(partition, isForThrift());
     }
 
@@ -272,7 +272,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      * If the partition is is not cached, we figure out what filter is "biggest", read
      * that from disk, then filter the result and either cache that or return it.
      */
-    private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
+    private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, ReadExecutionController executionController)
     {
         assert !cfs.isIndex(); // CASSANDRA-5732
         assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
@@ -290,7 +290,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
                 // Some other read is trying to cache the value, just do a normal non-caching read
                 Tracing.trace("Row cache miss (race)");
                 cfs.metric.rowCacheMiss.inc();
-                return queryMemtableAndDisk(cfs, readOp);
+                return queryMemtableAndDisk(cfs, executionController);
             }
 
             CachedPartition cachedPartition = (CachedPartition)cached;
@@ -303,7 +303,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
 
             cfs.metric.rowCacheHitOutOfRange.inc();
             Tracing.trace("Ignoring row cache as cached value could not satisfy query");
-            return queryMemtableAndDisk(cfs, readOp);
+            return queryMemtableAndDisk(cfs, executionController);
         }
 
         cfs.metric.rowCacheMiss.inc();
@@ -328,7 +328,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
             {
                 int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
                 @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
-                UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+                UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController);
                 try
                 {
                     // We want to cache only rowsToCache rows
@@ -368,7 +368,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         }
 
         Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
-        return queryMemtableAndDisk(cfs, readOp);
+        return queryMemtableAndDisk(cfs, executionController);
     }
 
     /**
@@ -388,6 +388,11 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      */
     public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp)
     {
+        return queryMemtableAndDisk(cfs, ReadExecutionController.forReadOp(readOp));
+    }
+
+    public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExecutionController executionController)
+    {
         Tracing.trace("Executing single-partition query on {}", cfs.name);
 
         boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
@@ -487,18 +492,18 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
             return commands.get(0).metadata();
         }
 
-        public ReadOrderGroup startOrderGroup()
+        public ReadExecutionController executionController()
         {
             // Note that the only difference between the command in a group must be the partition key on which
             // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
-            return commands.get(0).startOrderGroup();
+            return commands.get(0).executionController();
         }
 
-        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+        public PartitionIterator executeInternal(ReadExecutionController controller)
         {
             List<PartitionIterator> partitions = new ArrayList<>(commands.size());
             for (SinglePartitionReadCommand cmd : commands)
-                partitions.add(cmd.executeInternal(orderGroup));
+                partitions.add(cmd.executeInternal(controller));
 
             // Because we only have enforce the limit per command, we need to enforce it globally.
             return limits.filter(PartitionIterators.concat(partitions), nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index 27aab62..9fabdc2 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -247,7 +247,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
                 cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
             }
 
-            return merged;
+            return withStateTracking(merged);
         }
         catch (RuntimeException | Error e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
index 8fa9337..94dea15 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -738,7 +738,7 @@ public abstract class Slices implements Iterable<Slice>
 
             public boolean isEQ()
             {
-                return startValue.equals(endValue);
+                return Objects.equals(startValue, endValue);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index d3a289a..be6d7e2 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -217,7 +217,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
 
     public String toCQLString(CFMetaData metadata)
     {
-        if (clusterings.isEmpty())
+        if (metadata.clusteringColumns().isEmpty() || clusterings.size() <= 1)
             return "";
 
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 1a4573e..98b3600 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -317,9 +317,12 @@ public class ColumnFilter
             return "<none>";
 
         StringBuilder sb = new StringBuilder();
-        appendColumnDef(sb, defs.next());
         while (defs.hasNext())
-            appendColumnDef(sb.append(", "), defs.next());
+        {
+            appendColumnDef(sb, defs.next());
+            if (defs.hasNext())
+                sb.append(", ");
+        }
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/ApproximateTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/ApproximateTime.java b/src/java/org/apache/cassandra/db/monitoring/ApproximateTime.java
new file mode 100644
index 0000000..1d57398
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/ApproximateTime.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
+
+/**
+ * This is an approximation of System.currentTimeInMillis(). It updates its
+ * time value at periodic intervals of CHECK_INTERVAL_MS milliseconds
+ * (currently 10 milliseconds by default). It can be used as a faster alternative
+ * to System.currentTimeInMillis() every time an imprecision of a few milliseconds
+ * can be accepted.
+ */
+public class ApproximateTime
+{
+    private static final Logger logger = LoggerFactory.getLogger(ApproximateTime.class);
+    private static final int CHECK_INTERVAL_MS = Math.max(5, Integer.valueOf(System.getProperty(Config.PROPERTY_PREFIX + "approximate_time_precision_ms", "10")));
+
+    private static volatile long time = System.currentTimeMillis();
+    static
+    {
+        logger.info("Scheduling approximate time-check task with a precision of {} milliseconds", CHECK_INTERVAL_MS);
+        ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> time = System.currentTimeMillis(),
+                                                                     CHECK_INTERVAL_MS,
+                                                                     CHECK_INTERVAL_MS,
+                                                                     TimeUnit.MILLISECONDS);
+    }
+
+    public static long currentTimeMillis()
+    {
+        return time;
+    }
+
+    public static long precision()
+    {
+        return 2 * CHECK_INTERVAL_MS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java b/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
new file mode 100644
index 0000000..d6b6078
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+public final class ConstructionTime
+{
+    public final long timestamp;
+    public final boolean isCrossNode;
+
+    public ConstructionTime()
+    {
+        this(ApproximateTime.currentTimeMillis());
+    }
+
+    public ConstructionTime(long timestamp)
+    {
+        this(timestamp, false);
+    }
+
+    public ConstructionTime(long timestamp, boolean isCrossNode)
+    {
+        this.timestamp = timestamp;
+        this.isCrossNode = isCrossNode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
new file mode 100644
index 0000000..202ac87
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+public interface Monitorable
+{
+    String name();
+    ConstructionTime constructionTime();
+    long timeout();
+
+    boolean isInProgress();
+    boolean isAborted();
+    boolean isCompleted();
+
+    boolean abort();
+    boolean complete();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
new file mode 100644
index 0000000..f89f8ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+public abstract class MonitorableImpl implements Monitorable
+{
+    private MonitoringState state;
+    private ConstructionTime constructionTime;
+    private long timeout;
+
+    protected MonitorableImpl()
+    {
+        this.state = MonitoringState.IN_PROGRESS;
+    }
+
+    /**
+     * This setter is ugly but the construction chain to ReadCommand
+     * is too complex, it would require passing new parameters to all serializers
+     * or specializing the serializers to accept these message properties.
+     */
+    public void setMonitoringTime(ConstructionTime constructionTime, long timeout)
+    {
+        this.constructionTime = constructionTime;
+        this.timeout = timeout;
+    }
+
+    public ConstructionTime constructionTime()
+    {
+        return constructionTime;
+    }
+
+    public long timeout()
+    {
+        return timeout;
+    }
+
+    public boolean isInProgress()
+    {
+        check();
+        return state == MonitoringState.IN_PROGRESS;
+    }
+
+    public boolean isAborted()
+    {
+        check();
+        return state == MonitoringState.ABORTED;
+    }
+
+    public boolean isCompleted()
+    {
+        check();
+        return state == MonitoringState.COMPLETED;
+    }
+
+    public boolean abort()
+    {
+        if (state == MonitoringState.IN_PROGRESS)
+        {
+            if (constructionTime != null)
+                MonitoringTask.addFailedOperation(this, ApproximateTime.currentTimeMillis());
+            state = MonitoringState.ABORTED;
+            return true;
+        }
+
+        return state == MonitoringState.ABORTED;
+    }
+
+    public boolean complete()
+    {
+        if (state == MonitoringState.IN_PROGRESS)
+        {
+            state = MonitoringState.COMPLETED;
+            return true;
+        }
+
+        return state == MonitoringState.COMPLETED;
+    }
+
+    private void check()
+    {
+        if (constructionTime == null || state != MonitoringState.IN_PROGRESS)
+            return;
+
+        long elapsed = ApproximateTime.currentTimeMillis() - constructionTime.timestamp;
+        if (elapsed >= timeout)
+            abort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/MonitoringState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringState.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringState.java
new file mode 100644
index 0000000..4fe3cf8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringState.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+public enum MonitoringState
+{
+    IN_PROGRESS,
+    ABORTED,
+    COMPLETED
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
new file mode 100644
index 0000000..6d28078
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.monitoring;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
+
+import static java.lang.System.getProperty;
+
+/**
+ * A task for monitoring in progress operations, currently only read queries, and aborting them if they time out.
+ * We also log timed out operations, see CASSANDRA-7392.
+ */
+public class MonitoringTask
+{
+    private static final String LINE_SEPARATOR = getProperty("line.separator");
+    private static final Logger logger = LoggerFactory.getLogger(MonitoringTask.class);
+
+    /**
+     * Defines the interval for reporting any operations that have timed out.
+     */
+    private static final int REPORT_INTERVAL_MS = Math.max(0, Integer.valueOf(System.getProperty(Config.PROPERTY_PREFIX + "monitoring_report_interval_ms", "5000")));
+
+    /**
+     * Defines the maximum number of unique timed out queries that will be reported in the logs.
+     * Use a negative number to remove any limit.
+     */
+    private static final int MAX_OPERATIONS = Integer.valueOf(System.getProperty(Config.PROPERTY_PREFIX + "monitoring_max_operations", "50"));
+
+    @VisibleForTesting
+    static MonitoringTask instance = make(REPORT_INTERVAL_MS, MAX_OPERATIONS);
+
+    private final int maxOperations;
+    private final ScheduledFuture<?> reportingTask;
+    private final BlockingQueue<FailedOperation> operationsQueue;
+    private final AtomicLong numDroppedOperations;
+    private long lastLogTime;
+
+    @VisibleForTesting
+    static MonitoringTask make(int reportIntervalMillis, int maxTimedoutOperations)
+    {
+        if (instance != null)
+        {
+            instance.cancel();
+            instance = null;
+        }
+
+        return new MonitoringTask(reportIntervalMillis, maxTimedoutOperations);
+    }
+
+    private MonitoringTask(int reportIntervalMillis, int maxOperations)
+    {
+        this.maxOperations = maxOperations;
+        this.operationsQueue = maxOperations > 0 ? new ArrayBlockingQueue<>(maxOperations) : new LinkedBlockingQueue<>();
+        this.numDroppedOperations = new AtomicLong();
+        this.lastLogTime = ApproximateTime.currentTimeMillis();
+
+        logger.info("Scheduling monitoring task with report interval of {} ms, max operations {}", reportIntervalMillis, maxOperations);
+        this.reportingTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(() -> logFailedOperations(ApproximateTime.currentTimeMillis()),
+                                                                                     reportIntervalMillis,
+                                                                                     reportIntervalMillis,
+                                                                                     TimeUnit.MILLISECONDS);
+    }
+
+    public void cancel()
+    {
+        reportingTask.cancel(false);
+    }
+
+    public static void addFailedOperation(Monitorable operation, long now)
+    {
+        instance.innerAddFailedOperation(operation, now);
+    }
+
+    private void innerAddFailedOperation(Monitorable operation, long now)
+    {
+        if (maxOperations == 0)
+            return; // logging of failed operations disabled
+
+        if (!operationsQueue.offer(new FailedOperation(operation, now)))
+            numDroppedOperations.incrementAndGet();
+    }
+
+    @VisibleForTesting
+    FailedOperations aggregateFailedOperations()
+    {
+        Map<String, FailedOperation> operations = new HashMap<>();
+
+        FailedOperation failedOperation;
+        while((failedOperation = operationsQueue.poll()) != null)
+        {
+            FailedOperation existing = operations.get(failedOperation.name());
+            if (existing != null)
+                existing.addTimeout(failedOperation);
+            else
+                operations.put(failedOperation.name(), failedOperation);
+        }
+
+        return new FailedOperations(operations, numDroppedOperations.getAndSet(0L));
+    }
+
+    @VisibleForTesting
+    List<String> getFailedOperations()
+    {
+        FailedOperations failedOperations = aggregateFailedOperations();
+        String ret = failedOperations.getLogMessage();
+        lastLogTime = ApproximateTime.currentTimeMillis();
+        return ret.isEmpty() ? Collections.emptyList() : Arrays.asList(ret.split("\n"));
+    }
+
+    @VisibleForTesting
+    void logFailedOperations(long now)
+    {
+        FailedOperations failedOperations = aggregateFailedOperations();
+        if (!failedOperations.isEmpty())
+        {
+            long elapsed = now - lastLogTime;
+            logger.warn("{} operations timed out in the last {} msecs, operation list available at debug log level",
+                        failedOperations.num(),
+                        elapsed);
+
+            if (logger.isDebugEnabled())
+                logger.debug("{} operations timed out in the last {} msecs:{}{}",
+                            failedOperations.num(),
+                            elapsed,
+                            LINE_SEPARATOR,
+                            failedOperations.getLogMessage());
+        }
+
+        lastLogTime = now;
+    }
+
+    private static final class FailedOperations
+    {
+        public final Map<String, FailedOperation> operations;
+        public final long numDropped;
+
+        FailedOperations(Map<String, FailedOperation> operations, long numDropped)
+        {
+            this.operations = operations;
+            this.numDropped = numDropped;
+        }
+
+        public boolean isEmpty()
+        {
+            return operations.isEmpty() && numDropped == 0;
+        }
+
+        public long num()
+        {
+            return operations.size() + numDropped;
+        }
+
+        public String getLogMessage()
+        {
+            if (isEmpty())
+                return "";
+
+            final StringBuilder ret = new StringBuilder();
+            operations.values().forEach(o -> addOperation(ret, o));
+
+            if (numDropped > 0)
+                ret.append(LINE_SEPARATOR)
+                   .append("... (")
+                   .append(numDropped)
+                   .append(" were dropped)");
+
+            return ret.toString();
+        }
+
+        private static void addOperation(StringBuilder ret, FailedOperation operation)
+        {
+            if (ret.length() > 0)
+                ret.append(LINE_SEPARATOR);
+
+            ret.append(operation.getLogMessage());
+        }
+    }
+
+    private final static class FailedOperation
+    {
+        public final Monitorable operation;
+        public int numTimeouts;
+        public long totalTime;
+        public long maxTime;
+        public long minTime;
+        private String name;
+
+        FailedOperation(Monitorable operation, long failedAt)
+        {
+            this.operation = operation;
+            numTimeouts = 1;
+            totalTime = failedAt - operation.constructionTime().timestamp;
+            minTime = totalTime;
+            maxTime = totalTime;
+        }
+
+        public String name()
+        {
+            if (name == null)
+                name = operation.name();
+            return name;
+        }
+
+        void addTimeout(FailedOperation operation)
+        {
+            numTimeouts++;
+            totalTime += operation.totalTime;
+            maxTime = Math.max(maxTime, operation.maxTime);
+            minTime = Math.min(minTime, operation.minTime);
+        }
+
+        public String getLogMessage()
+        {
+            if (numTimeouts == 1)
+                return String.format("%s: total time %d msec - timeout %d %s",
+                                     name(),
+                                     totalTime,
+                                     operation.timeout(),
+                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+            else
+                return String.format("%s (timed out %d times): total time avg/min/max %d/%d/%d msec - timeout %d %s",
+                                     name(),
+                                     numTimeouts,
+                                     totalTime / numTimeouts,
+                                     minTime,
+                                     maxTime,
+                                     operation.timeout(),
+                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 87ea2ec..7ce7904 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -480,8 +480,8 @@ public class View
                 // Add all of the rows which were recovered from the query to the row set
                 while (!pager.isExhausted())
                 {
-                    try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                         PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+                    try (ReadExecutionController executionController = pager.executionController();
+                         PartitionIterator iter = pager.fetchPageInternal(128, executionController))
                     {
                         if (!iter.hasNext())
                             break;
@@ -538,8 +538,8 @@ public class View
 
         while (!pager.isExhausted())
         {
-            try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                 PartitionIterator iter = pager.fetchPageInternal(128, orderGroup))
+            try (ReadExecutionController executionController = pager.executionController();
+                 PartitionIterator iter = pager.fetchPageInternal(128, executionController))
             {
                 while (iter.hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 35b023b..8146211 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -80,8 +80,8 @@ public class ViewBuilder extends CompactionInfo.Holder
 
         while (!pager.isExhausted())
         {
-           try (ReadOrderGroup orderGroup = pager.startOrderGroup();
-                PartitionIterator partitionIterator = pager.fetchPageInternal(128, orderGroup))
+           try (ReadExecutionController executionController = pager.executionController();
+                PartitionIterator partitionIterator = pager.fetchPageInternal(128, executionController))
            {
                if (!partitionIterator.hasNext())
                    return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 3ceec13..bd48063 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -427,6 +427,6 @@ public interface Index
          * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under.
          * @return partitions from the base table matching the criteria of the search.
          */
-        public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup);
+        public UnfilteredPartitionIterator search(ReadExecutionController executionController);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
index 72d2528..1e28579 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@ -36,14 +36,14 @@ public abstract class CassandraIndexSearcher implements Index.Searcher
 
     @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
     // of this method.
-    public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup)
+    public UnfilteredPartitionIterator search(ReadExecutionController executionController)
     {
         // the value of the index expression is the partition key in the index table
         DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue());
-        UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup);
+        UnfilteredRowIterator indexIter = queryIndex(indexKey, command, executionController);
         try
         {
-            return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+            return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, executionController);
         }
         catch (RuntimeException | Error e)
         {
@@ -52,13 +52,13 @@ public abstract class CassandraIndexSearcher implements Index.Searcher
         }
     }
 
-    private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+    private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadExecutionController executionController)
     {
         ClusteringIndexFilter filter = makeIndexFilter(command);
         ColumnFamilyStore indexCfs = index.getBackingTable().get();
         CFMetaData indexCfm = indexCfs.metadata;
         return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
-                                         .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup());
+                                         .queryMemtableAndDisk(indexCfs, executionController.indexReadOpOrderGroup());
     }
 
     private ClusteringIndexFilter makeIndexFilter(ReadCommand command)
@@ -168,5 +168,5 @@ public abstract class CassandraIndexSearcher implements Index.Searcher
     protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey,
                                                                       RowIterator indexHits,
                                                                       ReadCommand command,
-                                                                      ReadOrderGroup orderGroup);
+                                                                      ReadExecutionController executionController);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index f1751f5..d77b889 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -57,7 +57,7 @@ public class CompositesSearcher extends CassandraIndexSearcher
     protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
                                                              final RowIterator indexHits,
                                                              final ReadCommand command,
-                                                             final ReadOrderGroup orderGroup)
+                                                             final ReadExecutionController executionController)
     {
         assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
 
@@ -143,10 +143,10 @@ public class CompositesSearcher extends CassandraIndexSearcher
                 @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
                                               // by the next caller of next, or through closing this iterator is this come before.
                 UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs,
-                                                                                                 orderGroup.baseReadOpOrderGroup()),
+                                                                                                 executionController.baseReadOpOrderGroup()),
                                                                     indexKey.getKey(),
                                                                     entries,
-                                                                    orderGroup.writeOpOrderGroup(),
+                                                                    executionController.writeOpOrderGroup(),
                                                                     command.nowInSec());
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index b60d2d9..0ad0891 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -47,7 +47,7 @@ public class KeysSearcher extends CassandraIndexSearcher
     protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
                                                              final RowIterator indexHits,
                                                              final ReadCommand command,
-                                                             final ReadOrderGroup orderGroup)
+                                                             final ReadExecutionController executionController)
     {
         assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
 
@@ -100,10 +100,10 @@ public class KeysSearcher extends CassandraIndexSearcher
                                                   // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
                                                   // by the next caller of next, or through closing this iterator is this come before.
                     UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs,
-                                                                                                orderGroup.baseReadOpOrderGroup()),
+                                                                                                executionController.baseReadOpOrderGroup()),
                                                                    hit,
                                                                    indexKey.getKey(),
-                                                                   orderGroup.writeOpOrderGroup(),
+                                                                   executionController.writeOpOrderGroup(),
                                                                    isForThrift(),
                                                                    command.nowInSec());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index a29260c..d159e0c 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.gms.FailureDetector;
  */
 public interface IAsyncCallback<T>
 {
-    public static Predicate<InetAddress> isAlive = new Predicate<InetAddress>()
+    Predicate<InetAddress> isAlive = new Predicate<InetAddress>()
     {
         public boolean apply(InetAddress endpoint)
         {
@@ -42,7 +42,7 @@ public interface IAsyncCallback<T>
     /**
      * @param msg response received.
      */
-    public void response(MessageIn<T> msg);
+    void response(MessageIn<T> msg);
 
     /**
      * @return true if this callback is on the read path and its latency should be

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
index 744bb62..546a416 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -25,5 +25,5 @@ public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
     /**
      * Called when there is an exception on the remote node or timeout happens
      */
-    public void onFailure(InetAddress from);
+    void onFailure(InetAddress from);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index 574f30f..b9f1a54 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.net;
 
 import java.io.IOException;
 
+import org.apache.cassandra.db.ReadCommand;
+
 /**
  * IVerbHandler provides the method that all verb handlers need to implement.
  * The concrete implementation of this interface would provide the functionality
@@ -36,5 +38,5 @@ public interface IVerbHandler<T>
      * @param message - incoming message that needs handling.
      * @param id
      */
-    public void doVerb(MessageIn<T> message, int id) throws IOException;
+    void doVerb(MessageIn<T> message, int id) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 7054bcc..a5e86fa 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -184,18 +184,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         else
             id = input.readInt();
 
-        long timestamp = System.currentTimeMillis();
-        boolean isCrossNodeTimestamp = false;
-        // make sure to readInt, even if cross_node_to is not enabled
-        int partial = input.readInt();
-        if (DatabaseDescriptor.hasCrossNodeTimeout())
-        {
-            long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
-            isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
-            timestamp = crossNodeTimestamp;
-        }
-
-        MessageIn message = MessageIn.read(input, version, id);
+        MessageIn message = MessageIn.read(input, version, id, MessageIn.readTimestamp(input));
         if (message == null)
         {
             // callback expired; nothing to do
@@ -203,7 +192,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
         if (version <= MessagingService.current_version)
         {
-            MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
+            MessagingService.instance().receive(message, id);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index a46366c..46376d0 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -32,25 +32,21 @@ public class MessageDeliveryTask implements Runnable
 
     private final MessageIn message;
     private final int id;
-    private final long constructionTime;
-    private final boolean isCrossNodeTimestamp;
 
-    public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
+    public MessageDeliveryTask(MessageIn message, int id)
     {
         assert message != null;
         this.message = message;
         this.id = id;
-        this.constructionTime = timestamp;
-        this.isCrossNodeTimestamp = isCrossNodeTimestamp;
     }
 
     public void run()
     {
         MessagingService.Verb verb = message.verb;
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > constructionTime + message.getTimeout())
+            && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout())
         {
-            MessagingService.instance().incrementDroppedMessages(verb, isCrossNodeTimestamp);
+            MessagingService.instance().incrementDroppedMessages(message);
             return;
         }
 
@@ -82,7 +78,7 @@ public class MessageDeliveryTask implements Runnable
         }
 
         if (GOSSIP_VERBS.contains(message.verb))
-            Gossiper.instance.setLastProcessedMessageAt(constructionTime);
+            Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
     }
 
     private void handleFailure(Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/557bbbcc/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 64b8e81..c6e4d89 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -24,40 +24,55 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.monitoring.ConstructionTime;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class MessageIn<T>
 {
-    private static final Logger logger = LoggerFactory.getLogger(MessageIn.class);
-
     public final InetAddress from;
     public final T payload;
     public final Map<String, byte[]> parameters;
     public final MessagingService.Verb verb;
     public final int version;
+    public final ConstructionTime constructionTime;
 
-    private MessageIn(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
+    private MessageIn(InetAddress from,
+                      T payload,
+                      Map<String, byte[]> parameters,
+                      MessagingService.Verb verb,
+                      int version,
+                      ConstructionTime constructionTime)
     {
         this.from = from;
         this.payload = payload;
         this.parameters = parameters;
         this.verb = verb;
         this.version = version;
+        this.constructionTime = constructionTime;
     }
 
-    public static <T> MessageIn<T> create(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
+    public static <T> MessageIn<T> create(InetAddress from,
+                                          T payload,
+                                          Map<String, byte[]> parameters,
+                                          MessagingService.Verb verb,
+                                          int version,
+                                          ConstructionTime constructionTime)
     {
-        return new MessageIn<T>(from, payload, parameters, verb, version);
+        return new MessageIn<>(from, payload, parameters, verb, version, constructionTime);
     }
 
     public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id) throws IOException
     {
+        return read(in, version, id, new ConstructionTime());
+    }
+
+    public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, ConstructionTime constructionTime) throws IOException
+    {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 
         MessagingService.Verb verb = MessagingService.Verb.values()[in.readInt()];
@@ -94,9 +109,31 @@ public class MessageIn<T>
             serializer = (IVersionedSerializer<T2>) callback.serializer;
         }
         if (payloadSize == 0 || serializer == null)
-            return create(from, null, parameters, verb, version);
+            return create(from, null, parameters, verb, version, constructionTime);
+
         T2 payload = serializer.deserialize(in, version);
-        return MessageIn.create(from, payload, parameters, verb, version);
+        return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
+    }
+
+    public static ConstructionTime createTimestamp()
+    {
+        return new ConstructionTime();
+    }
+
+    public static ConstructionTime readTimestamp(DataInputPlus input) throws IOException
+    {
+        // make sure to readInt, even if cross_node_to is not enabled
+        int partial = input.readInt();
+        if(DatabaseDescriptor.hasCrossNodeTimeout())
+        {
+            long timestamp = System.currentTimeMillis();
+            long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+            return new ConstructionTime(crossNodeTimestamp, timestamp != crossNodeTimestamp);
+        }
+        else
+        {
+            return new ConstructionTime();
+        }
     }
 
     public Stage getMessageType()


Mime
View raw message