Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F225218C4C for ; Tue, 8 Mar 2016 18:38:53 +0000 (UTC) Received: (qmail 37021 invoked by uid 500); 8 Mar 2016 18:38:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 36978 invoked by uid 500); 8 Mar 2016 18:38:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 36969 invoked by uid 99); 8 Mar 2016 18:38:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 18:38:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A5AD8DFB8A; Tue, 8 Mar 2016 18:38:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <97b52fd7d01244899f74af60554e308d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-5785 Date: Tue, 8 Mar 2016 18:38:53 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 078f39f58 -> 8b23e072e https://issues.apache.org/jira/browse/AMQ-5785 Avoid holding the intrinsic lock on the cursor when expiring the messages. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b23e072 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b23e072 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b23e072 Branch: refs/heads/master Commit: 8b23e072eeab2beebf62fd267bf8d9f88d05b5c2 Parents: 078f39f Author: Timothy Bish Authored: Tue Mar 8 13:37:58 2016 -0500 Committer: Timothy Bish Committed: Tue Mar 8 13:37:58 2016 -0500 ---------------------------------------------------------------------- .../cursors/FilePendingMessageCursor.java | 40 +++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8b23e072/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 9c9a8e7..3c2bd5f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,10 +17,13 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; @@ -31,25 +34,26 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.PList; -import org.apache.activemq.store.PListStore; import org.apache.activemq.store.PListEntry; +import org.apache.activemq.store.PListStore; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.activemq.util.ByteSequence; /** * persist pending messages pending message (messages awaiting dispatch to a * consumer) cursor - * - * */ public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { + static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); + private static final AtomicLong NAME_COUNT = new AtomicLong(); + protected Broker broker; private final PListStore store; private final String name; @@ -61,6 +65,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private boolean flushRequired; private final AtomicBoolean started = new AtomicBoolean(); private final WireFormat wireFormat = new OpenWireFormat(); + /** * @param broker * @param name @@ -374,9 +379,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public synchronized boolean isFull() { - return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); - } @Override @@ -392,11 +395,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple @Override public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) { + List expiredMessages = null; synchronized (this) { if (!flushRequired && size() != 0) { flushRequired =true; if (!iterating) { - expireOldMessages(); + expiredMessages = expireOldMessages(); if (!hasSpace()) { flushToDisk(); flushRequired = false; @@ -404,6 +408,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple } } } + + if (expiredMessages != null) { + for (MessageReference node : expiredMessages) { + discardExpiredMessage(node); + } + } } } @@ -412,26 +422,30 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple return true; } - protected synchronized void expireOldMessages() { + private synchronized List expireOldMessages() { + List expired = new ArrayList(); if (!memoryList.isEmpty()) { for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); if (node.isExpired()) { node.decrementReferenceCount(); - discardExpiredMessage(node); + expired.add(node); iterator.remove(); } } } + + return expired; } protected synchronized void flushToDisk() { if (!memoryList.isEmpty() && store != null) { long start = 0; - if (LOG.isTraceEnabled()) { + if (LOG.isTraceEnabled()) { start = System.currentTimeMillis(); - LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); - } + LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[] { name, memoryList.size(), + (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); + } for (Iterator iterator = memoryList.iterator(); iterator.hasNext();) { MessageReference node = iterator.next(); node.decrementReferenceCount();