Author: dejanb Date: Mon Feb 28 14:23:30 2011 New Revision: 1075346 URL: http://svn.apache.org/viewvc?rev=1075346&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3193 - consumers don't get messages after JMX remove Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1075346&r1=1075345&r2=1075346&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Feb 28 14:23:30 2011 @@ -1437,6 +1437,23 @@ public class Queue extends BaseDestinati } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } + } else { + // if there are already paged messages + // dispatch them + if (pagedInMessages.size() != 0) { + pagedInMessagesLock.writeLock().lock(); + ArrayList paged = new ArrayList(); + try { + paged.addAll(pagedInMessages.values()); + } finally { + pagedInMessagesLock.writeLock().unlock(); + } + try { + doDispatch(paged); + } catch (Exception e) { + LOG.error("Failed to dispatch already paged messages ", e); + } + } } if (pendingBrowserDispatch != null) { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1075346&r1=1075345&r2=1075346&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Mon Feb 28 14:23:30 2011 @@ -48,6 +48,7 @@ import org.apache.activemq.broker.region import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.util.JMXSupport; @@ -161,6 +162,39 @@ public class MBeanTest extends EmbeddedB assertTrue("cache enabled", queueNew.isCacheEnabled()); } + public void testRemoveMessages() throws Exception { + ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); + BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + broker.addQueue(getDestinationString()); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); + + QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + String msg1 = queue.sendTextMessage("message 1"); + String msg2 = queue.sendTextMessage("message 2"); + + assertTrue(queue.removeMessage(msg2)); + + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination dest = createDestination(); + + MessageConsumer consumer = session.createConsumer(dest); + Message message = consumer.receive(1000); + assertNotNull(message); + assertEquals(msg1, message.getJMSMessageID()); + + String msg3 = queue.sendTextMessage("message 3"); + message = consumer.receive(1000); + assertNotNull(message); + assertEquals(msg3, message.getJMSMessageID()); + + message = consumer.receive(1000); + assertNull(message); + + } + public void testRetryMessages() throws Exception { // lets speed up redelivery ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;