Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61E961173A for ; Fri, 10 May 2013 17:14:16 +0000 (UTC) Received: (qmail 4292 invoked by uid 500); 10 May 2013 17:14:16 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 4266 invoked by uid 500); 10 May 2013 17:14:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 4259 invoked by uid 99); 10 May 2013 17:14:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 May 2013 17:14:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 May 2013 17:14:11 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CCE35238889B; Fri, 10 May 2013 17:13:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1481106 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/usage/ activemq-client/src/main/java/org/apache/activemq/usage/ activemq-client/src/test/java/org/apache/activemq/usage/ Date: Fri, 10 May 2013 17:13:48 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130510171348.CCE35238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri May 10 17:13:48 2013 New Revision: 1481106 URL: http://svn.apache.org/r1481106 Log: fix for: https://issues.apache.org/jira/browse/AMQ-4512 Usage and MemoryUsage sync fixes to keep state consistent during thread contention. Test case added. Added: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java (with props) Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java activemq/trunk/activemq-client/src/main/java/org/apache/activemq/usage/Usage.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java?rev=1481106&r1=1481105&r2=1481106&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/usage/StoreUsage.java Fri May 10 17:13:48 2013 @@ -22,9 +22,9 @@ import org.apache.activemq.store.Persist * Used to keep track of how much of something is being used so that a * productive working set usage can be controlled. Main use case is manage * memory usage. - * + * * @org.apache.xbean.XBean - * + * */ public class StoreUsage extends Usage { @@ -44,6 +44,7 @@ public class StoreUsage extends Usage= 100 && isStarted()) { - usageMutex.wait(); + usageLock.readLock().lock(); + try { + if (percentUsage >= 100 && isStarted()) { + usageLock.readLock().unlock(); + usageLock.writeLock().lock(); + try { + while (percentUsage >= 100 && isStarted()) { + waitForSpaceCondition.await(); + } + usageLock.readLock().lock(); + } finally { + usageLock.writeLock().unlock(); + } } if (percentUsage >= 100 && !isStarted()) { throw new InterruptedException("waitForSpace stopped during wait."); } + } finally { + usageLock.readLock().unlock(); } } @@ -86,11 +100,24 @@ public class MemoryUsage extends Usage= 100) { - usageMutex.wait(timeout); + usageLock.readLock().unlock(); + usageLock.writeLock().lock(); + try { + while (percentUsage >= 100 ) { + waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS); + } + usageLock.readLock().lock(); + } finally { + usageLock.writeLock().unlock(); + } } + return percentUsage < 100; + } finally { + usageLock.readLock().unlock(); } } @@ -99,8 +126,11 @@ public class MemoryUsage extends Usage= 100; + } finally { + usageLock.readLock().unlock(); } } @@ -125,12 +155,15 @@ public class MemoryUsage extends Usage implements Service { private static final Logger LOG = LoggerFactory.getLogger(Usage.class); - protected final Object usageMutex = new Object(); + + protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock(); + protected final Condition waitForSpaceCondition = usageLock.writeLock().newCondition(); protected int percentUsage; protected T parent; + protected String name; + private UsageCapacity limiter = new DefaultUsageCapacity(); private int percentUsageMinDelta = 1; private final List listeners = new CopyOnWriteArrayList(); private final boolean debug = LOG.isDebugEnabled(); - protected String name; private float usagePortion = 1.0f; private final List children = new CopyOnWriteArrayList(); private final List callbacks = new LinkedList(); private int pollingTime = 100; - private final AtomicBoolean started=new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); private ThreadPoolExecutor executor; + public Usage(T parent, String name, float portion) { this.parent = parent; this.usagePortion = portion; if (parent != null) { - this.limiter.setLimit((long)(parent.getLimit() * portion)); + this.limiter.setLimit((long) (parent.getLimit() * portion)); name = parent.name + ":" + name; } this.name = name; @@ -86,15 +91,16 @@ public abstract class Usage= highWaterMark) { long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; long timeleft = deadline; while (timeleft > 0) { - percentUsage=caclPercentUsage(); + percentUsage = caclPercentUsage(); if (percentUsage >= highWaterMark) { - usageMutex.wait(pollingTime); + waitForSpaceCondition.await(pollingTime, TimeUnit.MILLISECONDS); timeleft = deadline - System.currentTimeMillis(); } else { break; @@ -102,6 +108,8 @@ public abstract class Usage= highWaterMark; + } finally { + usageLock.writeLock().unlock(); } } @@ -128,16 +139,18 @@ public abstract class Usage 0 && parent != null) { - synchronized (usageMutex) { - this.limiter.setLimit((long)(parent.getLimit() * usagePortion)); + usageLock.writeLock().lock(); + try { + this.limiter.setLimit((long) (parent.getLimit() * usagePortion)); + } finally { + usageLock.writeLock().unlock(); } } // Reset the percent currently being used. - int percentUsage; - synchronized (usageMutex) { - percentUsage = caclPercentUsage(); + usageLock.writeLock().lock(); + try { + setPercentUsage(caclPercentUsage()); + } finally { + usageLock.writeLock().unlock(); } - setPercentUsage(percentUsage); // Let the children know that the limit has changed. They may need to - // set - // their limits based on ours. + // set their limits based on ours. for (T child : children) { child.onLimitChange(); } } public float getUsagePortion() { - synchronized (usageMutex) { + usageLock.readLock().lock(); + try { return usagePortion; + } finally { + usageLock.readLock().unlock(); } } public void setUsagePortion(float usagePortion) { - synchronized (usageMutex) { + usageLock.writeLock().lock(); + try { this.usagePortion = usagePortion; + } finally { + usageLock.writeLock().unlock(); } onLimitChange(); } public int getPercentUsage() { - synchronized (usageMutex) { + usageLock.readLock().lock(); + try { return percentUsage; + } finally { + usageLock.readLock().unlock(); } } public int getPercentUsageMinDelta() { - synchronized (usageMutex) { + usageLock.readLock().lock(); + try { return percentUsageMinDelta; + } finally { + usageLock.readLock().unlock(); } } /** - * Sets the minimum number of percentage points the usage has to change - * before a UsageListener event is fired by the manager. + * Sets the minimum number of percentage points the usage has to change before a UsageListener event is fired by the + * manager. * * @param percentUsageMinDelta * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" @@ -209,27 +240,35 @@ public abstract class Usage= 100 && newPercentUsage < 100) { - synchronized (usageMutex) { - usageMutex.notifyAll(); - if (!callbacks.isEmpty()) { - for (Iterator iter = new ArrayList(callbacks).iterator(); iter.hasNext();) { - Runnable callback = iter.next(); - getExecutor().execute(callback); - } - callbacks.clear(); + waitForSpaceCondition.signalAll(); + if (!callbacks.isEmpty()) { + for (Runnable callback : callbacks) { + getExecutor().execute(callback); } + callbacks.clear(); } } if (!listeners.isEmpty()) { @@ -264,9 +300,8 @@ public abstract class Usage iter = listeners.iterator(); iter.hasNext();) { - UsageListener l = iter.next(); - l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); + for (UsageListener listener : listeners) { + listener.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); } } }; @@ -285,24 +320,21 @@ public abstract class Usage parent.getLimit()) { - LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", - new Object[]{getName(), getLimit(), parent.getLimit()}); + if (getLimit() > parent.getLimit()) { + LOG.info("Usage({}) limit={} should be smaller than its parent limit={}", new Object[] { getName(), getLimit(), parent.getLimit() }); } } - for (T t:children) { + for (T t : children) { t.start(); } } @@ -311,21 +343,24 @@ public abstract class Usage iter = new ArrayList(this.callbacks).iterator(); iter.hasNext();) { - Runnable callback = iter.next(); + // clear down any callbacks + usageLock.writeLock().lock(); + try { + waitForSpaceCondition.signalAll(); + for (Runnable callback : this.callbacks) { callback.run(); } this.callbacks.clear(); + } finally { + usageLock.writeLock().unlock(); } - for (T t:children) { + + for (T t : children) { t.stop(); } } @@ -344,8 +379,7 @@ public abstract class Usage= 100) { callbacks.add(callback); } else { callback.run(); } + } finally { + usageLock.writeLock().unlock(); } } }; @@ -366,13 +403,16 @@ public abstract class Usage= 100) { callbacks.add(callback); return true; } else { return false; } + } finally { + usageLock.writeLock().unlock(); } } @@ -384,7 +424,8 @@ public abstract class Usage toAdd; + final BlockingQueue toRemove; + final BlockingQueue removed; + + if (useArrayBlocking) { + toAdd = new ArrayBlockingQueue(operations); + toRemove = new ArrayBlockingQueue(operations); + removed = new ArrayBlockingQueue(operations); + } else { + toAdd = new LinkedBlockingQueue(); + toRemove = new LinkedBlockingQueue(); + removed = new LinkedBlockingQueue(); + } + + final AtomicBoolean running = new AtomicBoolean(true); + final CountDownLatch startLatch = new CountDownLatch(1); + + final MemoryUsage memUsage = new MemoryUsage(); + memUsage.setLimit(1000); + memUsage.start(); + + Thread addThread = new Thread(new Runnable() { + @Override + public void run() { + try { + startLatch.await(); + + while (true) { + Integer add = toAdd.poll(1, TimeUnit.MILLISECONDS); + if (add == null) { + if (!running.get()) { + break; + } + } else { + // add to other queue before removing + toRemove.add(add); + memUsage.increaseUsage(add); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + Thread removeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + startLatch.await(); + + while (true) { + Integer remove = toRemove.poll(1, TimeUnit.MILLISECONDS); + if (remove == null) { + if (!running.get()) { + break; + } + } else { + memUsage.decreaseUsage(remove); + removed.add(remove); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + Thread waitForSpaceThread = new Thread(new Runnable() { + @Override + public void run() { + try { + startLatch.await(); + + while (running.get()) { + memUsage.waitForSpace(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + removeThread.start(); + addThread.start(); + if (useWaitForSpaceThread) { + waitForSpaceThread.start(); + } + + Random r = new Random(seed); + + startLatch.countDown(); + + for (int i = 0; i < operations; i++) { + toAdd.add(r.nextInt(100) + 1); + } + + // we expect the failure percentage to be related to the last operation + List ops = new ArrayList(operations); + for (int i = 0; i < operations; i++) { + Integer op = removed.poll(1000, TimeUnit.MILLISECONDS); + assertNotNull(op); + ops.add(op); + } + + running.set(false); + + if (useWaitForSpaceThread) { + try { + waitForSpaceThread.join(1000); + } catch (InterruptedException e) { + System.out.println("Attempt: " + attempt + " : " + memUsage + " waitForSpace never returned"); + waitForSpaceThread.interrupt(); + waitForSpaceThread.join(); + } + } + + removeThread.join(); + addThread.join(); + + if (memUsage.getPercentUsage() != 0 || memUsage.getUsage() != memUsage.getPercentUsage()) { + System.out.println("Attempt: " + attempt + " : " + memUsage); + System.out.println("Operations: " + ops); + assertEquals(0, memUsage.getPercentUsage()); + } + } +} Propchange: activemq/trunk/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageConcurrencyTest.java ------------------------------------------------------------------------------ svn:eol-style = native