Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 55110442B for ; Thu, 23 Jun 2011 17:11:11 +0000 (UTC) Received: (qmail 63136 invoked by uid 500); 23 Jun 2011 17:11:11 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 63114 invoked by uid 500); 23 Jun 2011 17:11:11 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 63106 invoked by uid 99); 23 Jun 2011 17:11:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2011 17:11:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Jun 2011 17:11:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 21FD2238896F for ; Thu, 23 Jun 2011 17:10:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1138996 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/tools/ src/java/org/apache/cassandra/utils/ Date: Thu, 23 Jun 2011 17:10:48 -0000 To: commits@cassandra.apache.org From: slebresne@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110623171049.21FD2238896F@eris.apache.org> Author: slebresne Date: Thu Jun 23 17:10:48 2011 New Revision: 1138996 URL: http://svn.apache.org/viewvc?rev=1138996&view=rev Log: Expose number of threads blocked on submitting a memtable for flush patch by slebresne; reviewed by jbellis for CASSANDRA-2817 Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Jun 23 17:10:48 2011 @@ -20,6 +20,8 @@ supercolumn tombstone during replica resolution (CASSANDRA-2590) * use threadsafe collections for StreamInSession and StreamOutSession (CASSANDRA-2766, CASSANDRA-2792) + * Expose number of threads blocked on submitting memtable to flush + (CASSANDRA-2817) 0.7.6 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jun 23 17:10:48 2011 @@ -66,15 +66,22 @@ public class DebuggableThreadPoolExecuto { public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { + ((DebuggableThreadPoolExecutor)executor).onInitialRejection(task); BlockingQueue queue = executor.getQueue(); while (true) { if (executor.isShutdown()) + { + ((DebuggableThreadPoolExecutor)executor).onFinalRejection(task); throw new RejectedExecutionException("ThreadPoolExecutor has shut down"); + } try { if (queue.offer(task, 1000, TimeUnit.MILLISECONDS)) + { + ((DebuggableThreadPoolExecutor)executor).onFinalAccept(task); break; + } } catch (InterruptedException e) { @@ -85,6 +92,10 @@ public class DebuggableThreadPoolExecuto }); } + protected void onInitialRejection(Runnable task) {} + protected void onFinalAccept(Runnable task) {} + protected void onFinalRejection(Runnable task) {} + @Override public void afterExecute(Runnable r, Throwable t) { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Thu Jun 23 17:10:48 2011 @@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent; import java.lang.management.ManagementFactory; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -36,6 +37,9 @@ public class JMXEnabledThreadPoolExecuto { private final String mbeanName; + private final AtomicInteger totalBlocked = new AtomicInteger(0); + private final AtomicInteger currentBlocked = new AtomicInteger(0); + public JMXEnabledThreadPoolExecutor(String threadPoolName) { this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(threadPoolName), "internal"); @@ -129,4 +133,34 @@ public class JMXEnabledThreadPoolExecuto { return getTaskCount() - getCompletedTaskCount(); } + + public int getTotalBlockedTasks() + { + return totalBlocked.get(); + } + + public int getCurrentlyBlockedTasks() + { + return currentBlocked.get(); + } + + @Override + protected void onInitialRejection(Runnable task) + { + totalBlocked.incrementAndGet(); + currentBlocked.incrementAndGet(); + } + + @Override + protected void onFinalAccept(Runnable task) + { + currentBlocked.decrementAndGet(); + } + + @Override + protected void onFinalRejection(Runnable task) + { + currentBlocked.decrementAndGet(); + } + } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java Thu Jun 23 17:10:48 2011 @@ -20,4 +20,15 @@ package org.apache.cassandra.concurrent; public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean { -} \ No newline at end of file + /** + * Get the number of tasks that had blocked before being accepted (or + * rejected). + */ + public int getTotalBlockedTasks(); + + /** + * Get the number of tasks currently blocked, waiting to be accepted by + * the executor (because all threads are busy and the backing queue is full). + */ + public int getCurrentlyBlockedTasks(); +} Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Jun 23 17:10:48 2011 @@ -36,7 +36,7 @@ import org.apache.cassandra.config.Confi import org.apache.commons.cli.*; import org.apache.cassandra.cache.JMXInstrumentedCacheMBean; -import org.apache.cassandra.concurrent.IExecutorMBean; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.CompactionManagerMBean; import org.apache.cassandra.dht.Token; @@ -189,16 +189,21 @@ public class NodeCmd public void printThreadPoolStats(PrintStream outs) { - outs.printf("%-25s%10s%10s%15s%n", "Pool Name", "Active", "Pending", "Completed"); + outs.printf("%-25s%10s%10s%15s%10s%18s%n", "Pool Name", "Active", "Pending", "Completed", "Blocked", "All time blocked"); - Iterator> threads = probe.getThreadPoolMBeanProxies(); + Iterator> threads = probe.getThreadPoolMBeanProxies(); while (threads.hasNext()) { - Entry thread = threads.next(); + Entry thread = threads.next(); String poolName = thread.getKey(); - IExecutorMBean threadPoolProxy = thread.getValue(); - outs.printf("%-25s%10s%10s%15s%n", - poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCompletedTasks()); + JMXEnabledThreadPoolExecutorMBean threadPoolProxy = thread.getValue(); + outs.printf("%-25s%10s%10s%15s%10s%18s%n", + poolName, + threadPoolProxy.getActiveCount(), + threadPoolProxy.getPendingTasks(), + threadPoolProxy.getCompletedTasks(), + threadPoolProxy.getCurrentlyBlockedTasks(), + threadPoolProxy.getTotalBlockedTasks()); } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java Thu Jun 23 17:10:48 2011 @@ -40,7 +40,7 @@ import javax.management.remote.JMXServic import com.google.common.collect.Iterables; import org.apache.cassandra.cache.JMXInstrumentedCacheMBean; -import org.apache.cassandra.concurrent.IExecutorMBean; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.CompactionManager; @@ -372,7 +372,7 @@ public class NodeProbe ssProxy.forceRemoveCompletion(); } - public Iterator> getThreadPoolMBeanProxies() + public Iterator> getThreadPoolMBeanProxies() { try { @@ -587,7 +587,7 @@ class ColumnFamilyStoreMBeanIterator imp } } -class ThreadPoolProxyMBeanIterator implements Iterator> +class ThreadPoolProxyMBeanIterator implements Iterator> { private Iterator resIter; private MBeanServerConnection mbeanServerConn; @@ -606,12 +606,12 @@ class ThreadPoolProxyMBeanIterator imple return resIter.hasNext(); } - public Map.Entry next() + public Map.Entry next() { ObjectName objectName = resIter.next(); String poolName = objectName.getKeyProperty("type"); - IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, IExecutorMBean.class); - return new AbstractMap.SimpleImmutableEntry(poolName, threadPoolProxy); + JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(mbeanServerConn, objectName, JMXEnabledThreadPoolExecutorMBean.class); + return new AbstractMap.SimpleImmutableEntry(poolName, threadPoolProxy); } public void remove() Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java?rev=1138996&r1=1138995&r2=1138996&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java Thu Jun 23 17:10:48 2011 @@ -33,7 +33,7 @@ import com.google.common.collect.Iterabl import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.IExecutorMBean; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.CompactionManager; import org.apache.cassandra.net.MessagingService; @@ -48,7 +48,7 @@ public class StatusLogger MBeanServer server = ManagementFactory.getPlatformMBeanServer(); // everything from o.a.c.concurrent - logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", "Pending")); + logger.info(String.format("%-25s%10s%10s%10s", "Pool Name", "Active", "Pending", "Blocked")); Set request, internal; try { @@ -62,9 +62,9 @@ public class StatusLogger for (ObjectName objectName : Iterables.concat(request, internal)) { String poolName = objectName.getKeyProperty("type"); - IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, IExecutorMBean.class); - logger.info(String.format("%-25s%10s%10s", - poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks())); + JMXEnabledThreadPoolExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, JMXEnabledThreadPoolExecutorMBean.class); + logger.info(String.format("%-25s%10s%10s%10s", + poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks(), threadPoolProxy.getCurrentlyBlockedTasks())); } // one offs logger.info(String.format("%-25s%10s%10s",