cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject cassandra git commit: Added slow query log
Date Wed, 17 Aug 2016 02:48:57 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 44e30e33c -> 6966fcda9


Added slow query log

patch by Shogo Hoshii and Stefania Alborghetti; reviewed by Tyler Hobbs
and Stefania Alborghetti for CASSANDRA-12403


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6966fcda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6966fcda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6966fcda

Branch: refs/heads/trunk
Commit: 6966fcda9b5ce657760dfe103e5812862306ff7a
Parents: 44e30e3
Author: Shogo Hoshii <shoshii@yahoo-corp.jp>
Authored: Wed Aug 10 09:51:57 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Committed: Wed Aug 17 10:47:14 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |   7 +-
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   5 +
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../cassandra/db/monitoring/Monitorable.java    |   2 +
 .../db/monitoring/MonitorableImpl.java          |  25 +-
 .../cassandra/db/monitoring/MonitoringTask.java | 271 +++++++++++++++----
 .../org/apache/cassandra/net/MessageIn.java     |   5 +
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../db/monitoring/MonitoringTaskTest.java       | 166 ++++++++++--
 12 files changed, 398 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dfed9b4..51b87db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Added slow query log (CASSANDRA-12403)
  * Count full coordinated request against timeout (CASSANDRA-12256)
  * Allow TTL with null value on insert and update (CASSANDRA-12216)
  * Make decommission operation resumable (CASSANDRA-12008)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 86ec36c..a8ba483 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,9 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - A slow query log has been added: slow queries will be logged at DEBUG level.
+     For more details refer to CASSANDRA-12403 and slow_query_log_timeout_in_ms
+     in cassandra.yaml.
    - Support for GROUP BY queries has been added.
    - A new compaction-stress tool has been added to test the throughput of compaction
      for any cassandra-stress user schema.  see compaction-stress help for how to use.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e724941..5fb44cf 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1,4 +1,4 @@
-# Cassandra storage config YAML 
+# Cassandra storage config YAML
 
 # NOTE:
 #   See http://wiki.apache.org/cassandra/StorageConfiguration for
@@ -855,6 +855,11 @@ truncate_request_timeout_in_ms: 60000
 # The default timeout for other, miscellaneous operations
 request_timeout_in_ms: 10000
 
+# How long before a node logs slow queries. Select queries that take longer than
+# this timeout to execute, will generate an aggregated log message, so that slow queries
+# can be identified. Set this value to zero to disable slow query logging.
+slow_query_log_timeout_in_ms: 500
+
 # Enable operation timeout information exchange between nodes to accurately
 # measure request timeouts.  If disabled, replicas will assume that requests
 # were forwarded to them instantly by the coordinator, which means that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 0dd3cc8..fdf27d9 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -103,6 +103,8 @@ public class Config
 
     public boolean cross_node_timeout = false;
 
+    public volatile long slow_query_log_timeout_in_ms = 500L;
+
     public volatile Double phi_convict_threshold = 8.0;
 
     public Integer concurrent_reads = 32;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 103cb9d..6f71817 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1321,6 +1321,11 @@ public class DatabaseDescriptor
         }
     }
 
+    public static long getSlowQueryTimeout()
+    {
+        return conf.slow_query_log_timeout_in_ms;
+    }
+
     /**
      * @return the minimum configured {read, write, range, truncate, misc} timeout
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index e2a9678..7948590 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -41,7 +41,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         }
 
         ReadCommand command = message.payload;
-        command.setMonitoringTime(message.constructionTime, message.getTimeout());
+        command.setMonitoringTime(message.constructionTime, message.getTimeout(), message.getSlowQueryTimeout());
 
         ReadResponse response;
         try (ReadExecutionController executionController = command.executionController();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
index 202ac87..f4c5ee8 100644
--- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
+++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
@@ -23,10 +23,12 @@ public interface Monitorable
     String name();
     ConstructionTime constructionTime();
     long timeout();
+    long slowTimeout();
 
     boolean isInProgress();
     boolean isAborted();
     boolean isCompleted();
+    boolean isSlow();
 
     boolean abort();
     boolean complete();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
index f89f8ad..7363e10 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.monitoring;
 public abstract class MonitorableImpl implements Monitorable
 {
     private MonitoringState state;
+    private boolean isSlow;
     private ConstructionTime constructionTime;
     private long timeout;
+    private long slowTimeout;
 
     protected MonitorableImpl()
     {
         this.state = MonitoringState.IN_PROGRESS;
+        this.isSlow = false;
     }
 
     /**
@@ -34,10 +37,11 @@ public abstract class MonitorableImpl implements Monitorable
      * is too complex, it would require passing new parameters to all serializers
      * or specializing the serializers to accept these message properties.
      */
-    public void setMonitoringTime(ConstructionTime constructionTime, long timeout)
+    public void setMonitoringTime(ConstructionTime constructionTime, long timeout, long slowTimeout)
     {
         this.constructionTime = constructionTime;
         this.timeout = timeout;
+        this.slowTimeout = slowTimeout;
     }
 
     public ConstructionTime constructionTime()
@@ -50,6 +54,11 @@ public abstract class MonitorableImpl implements Monitorable
         return timeout;
     }
 
+    public long slowTimeout()
+    {
+        return slowTimeout;
+    }
+
     public boolean isInProgress()
     {
         check();
@@ -68,12 +77,19 @@ public abstract class MonitorableImpl implements Monitorable
         return state == MonitoringState.COMPLETED;
     }
 
+    public boolean isSlow()
+    {
+        check();
+        return isSlow;
+    }
+
     public boolean abort()
     {
         if (state == MonitoringState.IN_PROGRESS)
         {
             if (constructionTime != null)
                 MonitoringTask.addFailedOperation(this, ApproximateTime.currentTimeMillis());
+
             state = MonitoringState.ABORTED;
             return true;
         }
@@ -85,6 +101,9 @@ public abstract class MonitorableImpl implements Monitorable
     {
         if (state == MonitoringState.IN_PROGRESS)
         {
+            if (isSlow && slowTimeout > 0 && constructionTime != null)
+                MonitoringTask.addSlowOperation(this, ApproximateTime.currentTimeMillis());
+
             state = MonitoringState.COMPLETED;
             return true;
         }
@@ -98,6 +117,10 @@ public abstract class MonitorableImpl implements Monitorable
             return;
 
         long elapsed = ApproximateTime.currentTimeMillis() - constructionTime.timestamp;
+
+        if (elapsed >= slowTimeout && !isSlow)
+            isSlow = true;
+
         if (elapsed >= timeout)
             abort();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index a44773a..b116485 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -36,17 +36,20 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 import static java.lang.System.getProperty;
 
 /**
  * A task for monitoring in progress operations, currently only read queries, and aborting
them if they time out.
  * We also log timed out operations, see CASSANDRA-7392.
+ * Since CASSANDRA-12403 we also log queries that were slow.
  */
-public class MonitoringTask
+class MonitoringTask
 {
     private static final String LINE_SEPARATOR = getProperty("line.separator");
     private static final Logger logger = LoggerFactory.getLogger(MonitoringTask.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES);
 
     /**
      * Defines the interval for reporting any operations that have timed out.
@@ -62,12 +65,12 @@ public class MonitoringTask
     @VisibleForTesting
     static MonitoringTask instance = make(REPORT_INTERVAL_MS, MAX_OPERATIONS);
 
-    private final int maxOperations;
     private final ScheduledFuture<?> reportingTask;
-    private final BlockingQueue<FailedOperation> operationsQueue;
-    private final AtomicLong numDroppedOperations;
+    private final OperationsQueue failedOperationsQueue;
+    private final OperationsQueue slowOperationsQueue;
     private long lastLogTime;
 
+
     @VisibleForTesting
     static MonitoringTask make(int reportIntervalMillis, int maxTimedoutOperations)
     {
@@ -82,13 +85,13 @@ public class MonitoringTask
 
     private MonitoringTask(int reportIntervalMillis, int maxOperations)
     {
-        this.maxOperations = maxOperations;
-        this.operationsQueue = maxOperations > 0 ? new ArrayBlockingQueue<>(maxOperations)
: new LinkedBlockingQueue<>();
-        this.numDroppedOperations = new AtomicLong();
+        this.failedOperationsQueue = new OperationsQueue(maxOperations);
+        this.slowOperationsQueue = new OperationsQueue(maxOperations);
+
         this.lastLogTime = ApproximateTime.currentTimeMillis();
 
         logger.info("Scheduling monitoring task with report interval of {} ms, max operations
{}", reportIntervalMillis, maxOperations);
-        this.reportingTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(()
-> logFailedOperations(ApproximateTime.currentTimeMillis()),
+        this.reportingTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(()
-> logOperations(ApproximateTime.currentTimeMillis()),
                                                                                      reportIntervalMillis,
                                                                                      reportIntervalMillis,
                                                                                      TimeUnit.MILLISECONDS);
@@ -99,57 +102,51 @@ public class MonitoringTask
         reportingTask.cancel(false);
     }
 
-    public static void addFailedOperation(Monitorable operation, long now)
+    static void addFailedOperation(Monitorable operation, long now)
     {
-        instance.innerAddFailedOperation(operation, now);
+        instance.failedOperationsQueue.offer(new FailedOperation(operation, now));
     }
 
-    private void innerAddFailedOperation(Monitorable operation, long now)
+    static void addSlowOperation(Monitorable operation, long now)
     {
-        if (maxOperations == 0)
-            return; // logging of failed operations disabled
-
-        if (!operationsQueue.offer(new FailedOperation(operation, now)))
-            numDroppedOperations.incrementAndGet();
+        instance.slowOperationsQueue.offer(new SlowOperation(operation, now));
     }
 
     @VisibleForTesting
-    FailedOperations aggregateFailedOperations()
+    List<String> getFailedOperations()
     {
-        Map<String, FailedOperation> operations = new HashMap<>();
+        return getLogMessages(failedOperationsQueue.popOperations());
+    }
 
-        FailedOperation failedOperation;
-        while((failedOperation = operationsQueue.poll()) != null)
-        {
-            FailedOperation existing = operations.get(failedOperation.name());
-            if (existing != null)
-                existing.addTimeout(failedOperation);
-            else
-                operations.put(failedOperation.name(), failedOperation);
-        }
+    @VisibleForTesting
+    List<String> getSlowOperations()
+    {
+        return getLogMessages(slowOperationsQueue.popOperations());
+    }
 
-        return new FailedOperations(operations, numDroppedOperations.getAndSet(0L));
+    private List<String> getLogMessages(AggregatedOperations operations)
+    {
+        String ret = operations.getLogMessage();
+        return ret.isEmpty() ? Collections.emptyList() : Arrays.asList(ret.split("\n"));
     }
 
     @VisibleForTesting
-    List<String> getFailedOperations()
+    private void logOperations(long now)
     {
-        FailedOperations failedOperations = aggregateFailedOperations();
-        String ret = failedOperations.getLogMessage();
-        lastLogTime = ApproximateTime.currentTimeMillis();
-        return ret.isEmpty() ? Collections.emptyList() : Arrays.asList(ret.split("\n"));
+        logSlowOperations(now);
+        logFailedOperations(now);
+
+        lastLogTime = now;
     }
 
     @VisibleForTesting
-    void logFailedOperations(long now)
+    boolean logFailedOperations(long now)
     {
-        FailedOperations failedOperations = aggregateFailedOperations();
+        AggregatedOperations failedOperations = failedOperationsQueue.popOperations();
         if (!failedOperations.isEmpty())
         {
             long elapsed = now - lastLogTime;
-            logger.warn("{} operations timed out in the last {} msecs, operation list available
at debug log level",
-                        failedOperations.num(),
-                        elapsed);
+            noSpamLogger.warn("Some operations timed out, details available at debug level
(debug.log)");
 
             if (logger.isDebugEnabled())
                 logger.debug("{} operations timed out in the last {} msecs:{}{}",
@@ -157,17 +154,109 @@ public class MonitoringTask
                             elapsed,
                             LINE_SEPARATOR,
                             failedOperations.getLogMessage());
+            return true;
         }
 
-        lastLogTime = now;
+        return false;
     }
 
-    private static final class FailedOperations
+    @VisibleForTesting
+    boolean logSlowOperations(long now)
     {
-        public final Map<String, FailedOperation> operations;
-        public final long numDropped;
+        AggregatedOperations slowOperations = slowOperationsQueue.popOperations();
+        if (!slowOperations.isEmpty())
+        {
+            long elapsed = now - lastLogTime;
+            noSpamLogger.info("Some operations were slow, details available at debug level
(debug.log)");
 
-        FailedOperations(Map<String, FailedOperation> operations, long numDropped)
+            if (logger.isDebugEnabled())
+                logger.debug("{} operations were slow in the last {} msecs:{}{}",
+                             slowOperations.num(),
+                             elapsed,
+                             LINE_SEPARATOR,
+                             slowOperations.getLogMessage());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * A wrapper for a queue that can be either bounded, in which case
+     * we increment a counter if we exceed the queue size, or unbounded.
+     */
+    private static final class OperationsQueue
+    {
+        /** The max operations on the queue. If this value is zero then logging is disabled
+         * and the queue will always be empty. If this value is negative then the queue is
unbounded.
+         */
+        private final int maxOperations;
+
+        /**
+         * The operations queue, it can be either bounded or unbounded depending on the value
of maxOperations.
+         */
+        private final BlockingQueue<Operation> queue;
+
+        /**
+         * If we fail to add an operation to the queue then we increment this value. We reset
this value
+         * when the queue is emptied.
+         */
+        private final AtomicLong numDroppedOperations;
+
+        OperationsQueue(int maxOperations)
+        {
+            this.maxOperations = maxOperations;
+            this.queue = maxOperations > 0 ? new ArrayBlockingQueue<>(maxOperations)
: new LinkedBlockingQueue<>();
+            this.numDroppedOperations = new AtomicLong();
+        }
+
+        /**
+         * Add an operation to the queue, if possible, or increment the dropped counter.
+         *
+         * @param operation - the operations to add
+         */
+        private void offer(Operation operation)
+        {
+            if (maxOperations == 0)
+                return; // logging of operations is disabled
+
+            if (!queue.offer(operation))
+                numDroppedOperations.incrementAndGet();
+        }
+
+
+        /**
+         * Return all operations in the queue, aggregated by name, and reset
+         * the counter for dropped operations.
+         *
+         * @return - the aggregated operations
+         */
+        private AggregatedOperations popOperations()
+        {
+            Map<String, Operation> operations = new HashMap<>();
+
+            Operation operation;
+            while((operation = queue.poll()) != null)
+            {
+                Operation existing = operations.get(operation.name());
+                if (existing != null)
+                    existing.add(operation);
+                else
+                    operations.put(operation.name(), operation);
+            }
+            return new AggregatedOperations(operations, numDroppedOperations.getAndSet(0L));
+        }
+    }
+
+    /**
+     * Convert a map of aggregated operations into a log message that
+     * includes the information of whether some operations were dropped.
+     */
+    private static final class AggregatedOperations
+    {
+        private final Map<String, Operation> operations;
+        private final long numDropped;
+
+        AggregatedOperations(Map<String, Operation> operations, long numDropped)
         {
             this.operations = operations;
             this.numDropped = numDropped;
@@ -183,7 +272,7 @@ public class MonitoringTask
             return operations.size() + numDropped;
         }
 
-        public String getLogMessage()
+        String getLogMessage()
         {
             if (isEmpty())
                 return "";
@@ -200,7 +289,7 @@ public class MonitoringTask
             return ret.toString();
         }
 
-        private static void addOperation(StringBuilder ret, FailedOperation operation)
+        private static void addOperation(StringBuilder ret, Operation operation)
         {
             if (ret.length() > 0)
                 ret.append(LINE_SEPARATOR);
@@ -209,19 +298,38 @@ public class MonitoringTask
         }
     }
 
-    private final static class FailedOperation
+    /**
+     * A wrapper class for an operation that either failed (timed-out) or
+     * was reported as slow. Because the same operation (query) may execute
+     * multiple times, we aggregate the number of times an operation with the
+     * same name (CQL query text) is reported and store the average, min and max
+     * times.
+     */
+    protected abstract static class Operation
     {
-        public final Monitorable operation;
-        public int numTimeouts;
-        public long totalTime;
-        public long maxTime;
-        public long minTime;
+        /** The operation that was reported as slow or timed out */
+        final Monitorable operation;
+
+        /** The number of times the operation was reported */
+        int numTimesReported;
+
+        /** The total time spent by this operation */
+        long totalTime;
+
+        /** The maximum time spent by this operation */
+        long maxTime;
+
+        /** The minimum time spent by this operation */
+        long minTime;
+
+        /** The name of the operation, i.e. the SELECT query CQL,
+         * this is set lazily as it takes time to build the query CQL */
         private String name;
 
-        FailedOperation(Monitorable operation, long failedAt)
+        Operation(Monitorable operation, long failedAt)
         {
             this.operation = operation;
-            numTimeouts = 1;
+            numTimesReported = 1;
             totalTime = failedAt - operation.constructionTime().timestamp;
             minTime = totalTime;
             maxTime = totalTime;
@@ -234,31 +342,74 @@ public class MonitoringTask
             return name;
         }
 
-        void addTimeout(FailedOperation operation)
+        void add(Operation operation)
         {
-            numTimeouts++;
+            numTimesReported++;
             totalTime += operation.totalTime;
             maxTime = Math.max(maxTime, operation.maxTime);
             minTime = Math.min(minTime, operation.minTime);
         }
 
+        public abstract String getLogMessage();
+    }
+
+    /**
+     * An operation (query) that timed out.
+     */
+    private final static class FailedOperation extends Operation
+    {
+        FailedOperation(Monitorable operation, long failedAt)
+        {
+            super(operation, failedAt);
+        }
+
         public String getLogMessage()
         {
-            if (numTimeouts == 1)
-                return String.format("%s: total time %d msec - timeout %d %s",
+            if (numTimesReported == 1)
+                return String.format("<%s>, total time %d msec, timeout %d %s",
                                      name(),
                                      totalTime,
                                      operation.timeout(),
                                      operation.constructionTime().isCrossNode ? "msec/cross-node"
: "msec");
             else
-                return String.format("%s (timed out %d times): total time avg/min/max %d/%d/%d
msec - timeout %d %s",
+                return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d
msec, timeout %d %s",
                                      name(),
-                                     numTimeouts,
-                                     totalTime / numTimeouts,
+                                     numTimesReported,
+                                     totalTime / numTimesReported,
                                      minTime,
                                      maxTime,
                                      operation.timeout(),
                                      operation.constructionTime().isCrossNode ? "msec/cross-node"
: "msec");
         }
     }
+
+    /**
+     * An operation (query) that was reported as slow.
+     */
+    private final static class SlowOperation extends Operation
+    {
+        SlowOperation(Monitorable operation, long failedAt)
+        {
+            super(operation, failedAt);
+        }
+
+        public String getLogMessage()
+        {
+            if (numTimesReported == 1)
+                return String.format("<%s>, time %d msec - slow timeout %d %s",
+                                     name(),
+                                     totalTime,
+                                     operation.slowTimeout(),
+                                     operation.constructionTime().isCrossNode ? "msec/cross-node"
: "msec");
+            else
+                return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d
msec - slow timeout %d %s",
+                                     name(),
+                                     numTimesReported,
+                                     totalTime / numTimesReported,
+                                     minTime,
+                                     maxTime,
+                                     operation.slowTimeout(),
+                                     operation.constructionTime().isCrossNode ? "msec/cross-node"
: "msec");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index df1b4e1..23b2995 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -157,6 +157,11 @@ public class MessageIn<T>
         return DatabaseDescriptor.getTimeout(verb);
     }
 
+    public long getSlowQueryTimeout()
+    {
+        return DatabaseDescriptor.getSlowQueryTimeout();
+    }
+
     public String toString()
     {
         StringBuilder sbuf = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9bf90dc..9cfbd68 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1841,7 +1841,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             try
             {
-                command.setMonitoringTime(new ConstructionTime(constructionTime), timeout);
+                command.setMonitoringTime(new ConstructionTime(constructionTime), timeout,
DatabaseDescriptor.getSlowQueryTimeout());
 
                 ReadResponse response;
                 try (ReadExecutionController executionController = command.executionController();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
index 4490519..14659e3 100644
--- a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -39,10 +40,12 @@ import static org.junit.Assert.fail;
 public class MonitoringTaskTest
 {
     private static final long timeout = 100;
+    private static final long slowTimeout = 10;
+
     private static final long MAX_SPIN_TIME_NANOS = TimeUnit.SECONDS.toNanos(5);
 
-    public static final int REPORT_INTERVAL_MS = 600000; // long enough so that it won't
check unless told to do so
-    public static final int MAX_TIMEDOUT_OPERATIONS = -1; // unlimited
+    private static final int REPORT_INTERVAL_MS = 600000; // long enough so that it won't
check unless told to do so
+    private static final int MAX_TIMEDOUT_OPERATIONS = -1; // unlimited
 
     @BeforeClass
     public static void setup()
@@ -50,14 +53,22 @@ public class MonitoringTaskTest
         MonitoringTask.instance = MonitoringTask.make(REPORT_INTERVAL_MS, MAX_TIMEDOUT_OPERATIONS);
     }
 
+    @After
+    public void cleanUp()
+    {
+        // these clear the queues of the monitorint task
+        MonitoringTask.instance.getSlowOperations();
+        MonitoringTask.instance.getFailedOperations();
+    }
+
     private static final class TestMonitor extends MonitorableImpl
     {
         private final String name;
 
-        TestMonitor(String name, ConstructionTime constructionTime, long timeout)
+        TestMonitor(String name, ConstructionTime constructionTime, long timeout, long slow)
         {
             this.name = name;
-            setMonitoringTime(constructionTime, timeout);
+            setMonitoringTime(constructionTime, timeout, slow);
         }
 
         public String name()
@@ -88,15 +99,32 @@ public class MonitoringTaskTest
             long numInProgress = operations.stream().filter(Monitorable::isInProgress).count();
             if (numInProgress == 0)
                 return;
+        }
+    }
+
+    private static void waitForOperationsToBeReportedAsSlow(Monitorable... operations) throws
InterruptedException
+    {
+        waitForOperationsToBeReportedAsSlow(Arrays.asList(operations));
+    }
+
+    private static void waitForOperationsToBeReportedAsSlow(List<Monitorable> operations)
throws InterruptedException
+    {
+        long timeout = operations.stream().map(Monitorable::slowTimeout).reduce(0L, Long::max);
+        Thread.sleep(timeout * 2 + ApproximateTime.precision());
 
-            Thread.yield();
+        long start = System.nanoTime();
+        while(System.nanoTime() - start <= MAX_SPIN_TIME_NANOS)
+        {
+            long numSlow = operations.stream().filter(Monitorable::isSlow).count();
+            if (numSlow == operations.size())
+                return;
         }
     }
 
     @Test
     public void testAbort() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()),
timeout);
+        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.isAborted());
@@ -107,7 +135,7 @@ public class MonitoringTaskTest
     @Test
     public void testAbortIdemPotent() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()),
timeout);
+        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.abort());
@@ -120,7 +148,7 @@ public class MonitoringTaskTest
     @Test
     public void testAbortCrossNode() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(),
true), timeout);
+        Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(),
true), timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.isAborted());
@@ -131,7 +159,7 @@ public class MonitoringTaskTest
     @Test
     public void testComplete() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()),
timeout);
+        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
         operation.complete();
         waitForOperationsToComplete(operation);
 
@@ -143,7 +171,7 @@ public class MonitoringTaskTest
     @Test
     public void testCompleteIdemPotent() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()),
timeout);
+        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
         operation.complete();
         waitForOperationsToComplete(operation);
 
@@ -155,14 +183,47 @@ public class MonitoringTaskTest
     }
 
     @Test
+    public void testReportSlow() throws InterruptedException
+    {
+        Monitorable operation = new TestMonitor("Test report slow", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
+        waitForOperationsToBeReportedAsSlow(operation);
+
+        assertTrue(operation.isSlow());
+        operation.complete();
+        assertFalse(operation.isAborted());
+        assertTrue(operation.isCompleted());
+        assertEquals(1, MonitoringTask.instance.getSlowOperations().size());
+    }
+
+    @Test
+    public void testNoReportSlowIfZeroSlowTimeout() throws InterruptedException
+    {
+        // when the slow timeout is set to zero then operation won't be reported as slow
+        Monitorable operation = new TestMonitor("Test report slow disabled", new ConstructionTime(System.currentTimeMillis()),
timeout, 0);
+        waitForOperationsToBeReportedAsSlow(operation);
+
+        assertTrue(operation.isSlow());
+        operation.complete();
+        assertFalse(operation.isAborted());
+        assertTrue(operation.isCompleted());
+        assertEquals(0, MonitoringTask.instance.getSlowOperations().size());
+    }
+
+    @Test
     public void testReport() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()),
timeout);
+        Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
+        assertTrue(operation.isSlow());
         assertTrue(operation.isAborted());
         assertFalse(operation.isCompleted());
-        MonitoringTask.instance.logFailedOperations(ApproximateTime.currentTimeMillis());
+
+        // aborted operations are not logged as slow
+        assertFalse(MonitoringTask.instance.logSlowOperations(ApproximateTime.currentTimeMillis()));
+        assertEquals(0, MonitoringTask.instance.getSlowOperations().size());
+
+        assertTrue(MonitoringTask.instance.logFailedOperations(ApproximateTime.currentTimeMillis()));
         assertEquals(0, MonitoringTask.instance.getFailedOperations().size());
     }
 
@@ -172,14 +233,22 @@ public class MonitoringTaskTest
         MonitoringTask.instance = MonitoringTask.make(10, -1);
         try
         {
-            Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()),
timeout);
-            waitForOperationsToComplete(operation);
+            Monitorable operation1 = new TestMonitor("Test report 1", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
+            waitForOperationsToComplete(operation1);
+
+            assertTrue(operation1.isAborted());
+            assertFalse(operation1.isCompleted());
+
+            Monitorable operation2 = new TestMonitor("Test report 2", new ConstructionTime(System.currentTimeMillis()),
timeout, slowTimeout);
+            waitForOperationsToBeReportedAsSlow(operation2);
 
-            assertTrue(operation.isAborted());
-            assertFalse(operation.isCompleted());
+            operation2.complete();
+            assertFalse(operation2.isAborted());
+            assertTrue(operation2.isCompleted());
 
             Thread.sleep(ApproximateTime.precision() + 500);
             assertEquals(0, MonitoringTask.instance.getFailedOperations().size());
+            assertEquals(0, MonitoringTask.instance.getSlowOperations().size());
         }
         finally
         {
@@ -197,7 +266,7 @@ public class MonitoringTaskTest
         for (int i = 0; i < opCount; i++)
         {
             executorService.submit(() ->
-                operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(),
timeout))
+                operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(),
timeout, slowTimeout))
             );
         }
 
@@ -207,6 +276,7 @@ public class MonitoringTaskTest
 
         waitForOperationsToComplete(operations);
         assertEquals(opCount, MonitoringTask.instance.getFailedOperations().size());
+        assertEquals(0, MonitoringTask.instance.getSlowOperations().size());
     }
 
     @Test
@@ -228,11 +298,10 @@ public class MonitoringTaskTest
         MonitoringTask.instance = MonitoringTask.make(REPORT_INTERVAL_MS, maxTimedoutOperations);
         try
         {
-            final int threadCount = numThreads;
-            ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
-            final CountDownLatch finished = new CountDownLatch(threadCount);
+            ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+            final CountDownLatch finished = new CountDownLatch(numThreads);
 
-            for (int i = 0; i < threadCount; i++)
+            for (int i = 0; i < numThreads; i++)
             {
                 final String operationName = "Operation " + Integer.toString(i+1);
                 final int numTimes = i + 1;
@@ -241,10 +310,16 @@ public class MonitoringTaskTest
                     {
                         for (int j = 0; j < numTimes; j++)
                         {
-                            Monitorable operation = new TestMonitor(operationName,
+                            Monitorable operation1 = new TestMonitor(operationName,
                                                                     new ConstructionTime(System.currentTimeMillis()),
-                                                                    timeout);
-                            waitForOperationsToComplete(operation);
+                                                                    timeout, slowTimeout);
+                            waitForOperationsToComplete(operation1);
+
+                            Monitorable operation2 = new TestMonitor(operationName,
+                                                                     new ConstructionTime(System.currentTimeMillis()),
+                                                                     timeout, slowTimeout);
+                            waitForOperationsToBeReportedAsSlow(operation2);
+                            operation2.complete();
                         }
                     }
                     catch (InterruptedException e)
@@ -274,7 +349,7 @@ public class MonitoringTaskTest
     }
 
     @Test
-    public void testMultipleThreadsSameName() throws InterruptedException
+    public void testMultipleThreadsSameNameFailed() throws InterruptedException
     {
         final int threadCount = 50;
         final List<Monitorable> operations = new ArrayList<>(threadCount);
@@ -286,9 +361,9 @@ public class MonitoringTaskTest
             executorService.submit(() -> {
                 try
                 {
-                    Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName",
+                    Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName
failed",
                                                             new ConstructionTime(System.currentTimeMillis()),
-                                                            timeout);
+                                                            timeout, slowTimeout);
                     operations.add(operation);
                 }
                 finally
@@ -302,11 +377,44 @@ public class MonitoringTaskTest
         assertEquals(0, executorService.shutdownNow().size());
 
         waitForOperationsToComplete(operations);
-        //MonitoringTask.instance.checkFailedOperations(ApproximateTime.currentTimeMillis());
         assertEquals(1, MonitoringTask.instance.getFailedOperations().size());
     }
 
     @Test
+    public void testMultipleThreadsSameNameSlow() throws InterruptedException
+    {
+        final int threadCount = 50;
+        final List<Monitorable> operations = new ArrayList<>(threadCount);
+        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch finished = new CountDownLatch(threadCount);
+
+        for (int i = 0; i < threadCount; i++)
+        {
+            executorService.submit(() -> {
+                try
+                {
+                    Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName
slow",
+                                                            new ConstructionTime(System.currentTimeMillis()),
+                                                            timeout, slowTimeout);
+                    operations.add(operation);
+                }
+                finally
+                {
+                    finished.countDown();
+                }
+            });
+        }
+
+        finished.await();
+        assertEquals(0, executorService.shutdownNow().size());
+
+        waitForOperationsToBeReportedAsSlow(operations);
+        operations.forEach(o -> o.complete());
+
+        assertEquals(1, MonitoringTask.instance.getSlowOperations().size());
+    }
+
+    @Test
     public void testMultipleThreadsNoFailedOps() throws InterruptedException
     {
         final int threadCount = 50;
@@ -321,7 +429,7 @@ public class MonitoringTaskTest
                 {
                     Monitorable operation = new TestMonitor("Test thread " + Thread.currentThread().getName(),
                                                             new ConstructionTime(System.currentTimeMillis()),
-                                                            timeout);
+                                                            timeout, slowTimeout);
                     operations.add(operation);
                     operation.complete();
                 }


Mime
View raw message