From commits-return-12228-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Nov 10 17:27:02 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 13631 invoked from network); 10 Nov 2009 17:27:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Nov 2009 17:27:02 -0000 Received: (qmail 41083 invoked by uid 500); 10 Nov 2009 17:27:02 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 41026 invoked by uid 500); 10 Nov 2009 17:27:02 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 41017 invoked by uid 99); 10 Nov 2009 17:27:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2009 17:27:02 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2009 17:26:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 42C9C238889D; Tue, 10 Nov 2009 17:26:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r834557 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache/activemq/broker/region/ test/java... Date: Tue, 10 Nov 2009 17:26:39 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091110172639.42C9C238889D@eris.apache.org> Author: gtully Date: Tue Nov 10 17:26:38 2009 New Revision: 834557 URL: http://svn.apache.org/viewvc?rev=834557&view=rev Log: merge -c 834543 - resolve https://issues.apache.org/activemq/browse/AMQ-2481 - no need to force a page in but sync between expiry from browse and from pageIn needed some tweaks, expired messages need to be removed from the cursor in the event of expiry from browse. Also resolve unit test failures from https://issues.apache.org/activemq/browse/AMQ-2481 Added: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java - copied unchanged from r834543, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Nov 10 17:26:38 2009 @@ -25,7 +25,6 @@ import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatchNotification; Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Nov 10 17:26:38 2009 @@ -16,6 +16,28 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -54,26 +76,6 @@ import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; /** @@ -198,10 +200,12 @@ public boolean recoverMessage(Message message) { // Message could have expired while it was being // loaded.. - if (broker.isExpired(message)) { - messageExpired(createConnectionContext(), createMessageReference(message)); - // drop message will decrement so counter balance here - destinationStatistics.getMessages().increment(); + if (message.isExpired()) { + if (broker.isExpired(message)) { + messageExpired(createConnectionContext(), createMessageReference(message)); + // drop message will decrement so counter balance here + destinationStatistics.getMessages().increment(); + } return true; } if (hasSpace()) { @@ -439,6 +443,7 @@ // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { + LOG.error("expired waiting for space.."); broker.messageExpired(context, message); destinationStatistics.getExpired().increment(); } else { @@ -585,7 +590,7 @@ return null; } }; - doBrowse(true, browsedMessages, this.getMaxExpirePageSize()); + doBrowse(browsedMessages, this.getMaxExpirePageSize()); } public void gc(){ @@ -749,14 +754,15 @@ public Message[] browse() { List l = new ArrayList(); - doBrowse(false, l, getMaxBrowsePageSize()); + doBrowse(l, getMaxBrowsePageSize()); return l.toArray(new Message[l.size()]); } - public void doBrowse(boolean forcePageIn, List l, int max) { + + public void doBrowse(List l, int max) { final ConnectionContext connectionContext = createConnectionContext(); try { - pageInMessages(forcePageIn); + pageInMessages(false); List toExpire = new ArrayList(); synchronized(dispatchMutex) { synchronized (pagedInPendingDispatch) { @@ -770,7 +776,7 @@ } toExpire.clear(); synchronized (pagedInMessages) { - addAll(pagedInMessages.values(), l, max, toExpire); + addAll(pagedInMessages.values(), l, max, toExpire); } for (MessageReference ref : toExpire) { if (broker.isExpired(ref)) { @@ -787,13 +793,16 @@ try { messages.reset(); while (messages.hasNext() && l.size() < max) { - MessageReference node = messages.next(); - messages.rollback(node.getMessageId()); - if (node != null) { + MessageReference node = messages.next(); + if (node.isExpired()) { if (broker.isExpired(node)) { messageExpired(connectionContext, createMessageReference(node.getMessage())); - } else if (l.contains(node.getMessage()) == false) { + } + messages.remove(); + } else { + messages.rollback(node.getMessageId()); + if (l.contains(node.getMessage()) == false) { l.add(node.getMessage()); } } @@ -806,7 +815,7 @@ } } catch (Exception e) { LOG.error("Problem retrieving message for browse", e); - } + } } private void addAll(Collection refs, @@ -1278,7 +1287,7 @@ } public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) { - if (LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("message expired: " + reference); } broker.messageExpired(context, reference); @@ -1371,12 +1380,14 @@ node.incrementReferenceCount(); messages.remove(); QueueMessageReference ref = createMessageReference(node.getMessage()); - if (!broker.isExpired(node)) { + if (ref.isExpired()) { + if (broker.isExpired(ref)) { + messageExpired(createConnectionContext(), ref); + } + } else { result.add(ref); count++; - } else { - messageExpired(createConnectionContext(), ref); - } + } } } finally { messages.release(); Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java Tue Nov 10 17:26:38 2009 @@ -16,11 +16,14 @@ */ package org.apache.activemq.broker.ft; +import java.io.File; +import java.io.IOException; import java.net.URI; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest { @@ -33,17 +36,25 @@ protected void createMaster() throws Exception { master = new BrokerService(); - master.setBrokerName("shared"); + master.setBrokerName("shared-master"); + configureSharedPersistenceAdapter(master); master.addConnector(brokerUrl); master.start(); } + private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception { + AMQPersistenceAdapter adapter = new AMQPersistenceAdapter(); + adapter.setDirectory(new File("shared")); + broker.setPersistenceAdapter(adapter); + } + protected void createSlave() throws Exception { new Thread(new Runnable() { public void run() { try { BrokerService broker = new BrokerService(); - broker.setBrokerName("shared"); + broker.setBrokerName("shared-slave"); + configureSharedPersistenceAdapter(broker); // add transport as a service so that it is bound on start, after store started final TransportConnector tConnector = new TransportConnector(); tConnector.setUri(new URI(brokerUrl)); Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java Tue Nov 10 17:26:38 2009 @@ -98,6 +98,23 @@ proxy.getQueueSize()); } + public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(); + final int exprityPeriod = 1000; + applyExpiryDuration(exprityPeriod); + createProducerAndSendMessages(90000); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem"); + Thread.sleep(10000); + assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000, + proxy.getQueueSize()); + } + + + private void applyExpiryDuration(int i) { + broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); + } + private void applyBrokerSpoolingPolicy() { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java Tue Nov 10 17:26:38 2009 @@ -98,7 +98,8 @@ context.checking(new Expectations(){{ allowing (managementContext).getJmxDomainName(); will (returnValue("Test")); allowing (managementContext).start(); - allowing (managementContext).stop(); + allowing (managementContext).stop(); + allowing (managementContext).isConnectorStarted(); // expected MBeans allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Tue Nov 10 17:26:38 2009 @@ -152,6 +152,8 @@ assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { + LOG.info("Stats: received: " + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount() + + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount()); return view.getQueueSize() == 0; } })); @@ -282,7 +284,7 @@ broker.waitUntilStarted(); return broker; } - + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { String domain = "org.apache.activemq"; ObjectName name; Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=834557&r1=834556&r2=834557&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Tue Nov 10 17:26:38 2009 @@ -16,6 +16,19 @@ */ package org.apache.activemq.usecases; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import junit.framework.Test; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -27,16 +40,6 @@ import org.apache.activemq.util.Wait; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; -import junit.framework.Test; public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport { @@ -140,12 +143,16 @@ final DestinationViewMBean view = createView(destination); Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { + LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + + ", size= " + view.getQueueSize()); return sendCount == view.getExpiredCount(); } }); LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount() + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount() + ", size= " + view.getQueueSize()); + assertEquals("All sent have expired", sendCount, view.getExpiredCount()); }