Author: gtully
Date: Tue Jul 7 15:54:32 2009
New Revision: 791881
URL: http://svn.apache.org/viewvc?rev=791881&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-1112 - fix issues with inflight count
when messages expire on the consumer/client and on consumer close
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Jul 7 15:54:32 2009
@@ -644,6 +644,9 @@
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
+ } else {
+ ack = pendingAck;
+ pendingAck = null;
}
}
} else if (pendingAck != null && pendingAck.isStandardAck()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Tue Jul 7 15:54:32 2009
@@ -69,7 +69,7 @@
}
public String toString() {
- return "Message " + message.getMessageId() + " dropped=" + dropped + " locked=" +
(lockOwner != null);
+ return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" +
acked + " locked=" + (lockOwner != null);
}
public void incrementRedeliveryCounter() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Jul 7 15:54:32 2009
@@ -303,9 +303,16 @@
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
final MessageReference node = iter.next();
- if( node.isExpired() ) {
- node.getRegionDestination().messageExpired(context, this, node);
+ if (hasNotAlreadyExpired(node)) {
+ if (node.isExpired()) {
+ node.getRegionDestination().messageExpired(context, this, node);
+ dispatched.remove(node);
+ node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ }
+ } else {
+ // already expired
dispatched.remove(node);
+ node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -411,6 +418,16 @@
}
}
+ private boolean hasNotAlreadyExpired(MessageReference node) {
+ boolean hasNotExpired = true;
+ try {
+ hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION)
== null;
+ } catch (IOException e) {
+ LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION
+ " for " + node, e);
+ }
+ return hasNotExpired;
+ }
+
/**
* Checks an ack versus the contents of the dispatched list.
*
@@ -545,6 +562,11 @@
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
+ // Here is a potential problem concerning Inflight stat:
+ // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
+ // Except if each commit or rollback callback action comes before remove of subscriber.
+ rc.addAll(pending.remove(context, destination));
+
// Synchronized to DispatchLock
synchronized(dispatchLock) {
for (MessageReference r : dispatched) {
@@ -552,12 +574,10 @@
rc.add((QueueMessageReference)r);
}
}
- }
- // TODO Dispatched messages should be decremented from Inflight stat
- // Here is a potential problem concerning Inflight stat:
- // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
- // Except if each commit or rollback callback action comes before remove of subscriber.
- rc.addAll(pending.remove(context, destination));
+ destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
+ destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
+ dispatched.clear();
+ }
}
return rc;
}
@@ -661,12 +681,15 @@
if (node.getRegionDestination() != null) {
if (node != QueueMessageReference.NULL_MESSAGE) {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
- node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+ node.getRegionDestination().getDestinationStatistics().getInflight().increment();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " +
message.getMessageId()
+ + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount()
+ + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount());
+ }
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId());
- }
+
if (info.isDispatchAsync()) {
try {
dispatchPending();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jul 7 15:54:32 2009
@@ -205,7 +205,7 @@
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
- messageExpired(createConnectionContext(), null, message, false);
+ messageExpired(createConnectionContext(), message);
return true;
}
if (hasSpace()) {
@@ -343,6 +343,12 @@
// while removing up a subscription.
dispatchLock.lock();
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
+ + ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
+ + ", dispatched: " + getDestinationStatistics().getDispatched().getCount()
+ + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+ }
synchronized (consumers) {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
@@ -552,10 +558,12 @@
}
private void expireMessages() {
- LOG.info("expiring messages...");
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expiring messages ..");
+ }
+
// just track the insertion count
- List<Message> l = new AbstractList<Message>() {
+ List<Message> browsedMessages = new AbstractList<Message>() {
int size = 0;
@Override
@@ -573,7 +581,7 @@
return null;
}
};
- doBrowse(true, l, getMaxBrowsePageSize());
+ doBrowse(true, browsedMessages, this.getMaxExpirePageSize());
}
public void gc(){
@@ -750,7 +758,7 @@
addAll(pagedInPendingDispatch, l, max, toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
- messageExpired(connectionContext, ref, false);
+ messageExpired(connectionContext, ref);
}
}
toExpire.clear();
@@ -758,7 +766,7 @@
addAll(pagedInMessages.values(), l, max, toExpire);
}
for (MessageReference ref : toExpire) {
- messageExpired(connectionContext, ref, false);
+ messageExpired(connectionContext, ref);
}
if (l.size() < getMaxBrowsePageSize()) {
@@ -771,7 +779,7 @@
if (node != null) {
if (broker.isExpired(node)) {
messageExpired(connectionContext,
- createMessageReference(node.getMessage()),
false);
+ createMessageReference(node.getMessage()));
} else if (l.contains(node.getMessage()) == false) {
l.add(node.getMessage());
}
@@ -1249,21 +1257,17 @@
}
}
- public void messageExpired(ConnectionContext context,MessageReference reference, boolean
dispatched) {
- messageExpired(context,null,reference, dispatched);
+ public void messageExpired(ConnectionContext context,MessageReference reference) {
+ messageExpired(context,null,reference);
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference) {
- messageExpired(context, subs, reference, true);
- }
-
- public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference, boolean dispatched) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("message expired: " + reference);
+ }
broker.messageExpired(context, reference);
destinationStatistics.getDequeues().increment();
destinationStatistics.getExpired().increment();
- if (dispatched) {
- destinationStatistics.getInflight().decrement();
- }
try {
removeMessage(context,subs,(QueueMessageReference)reference);
} catch (IOException e) {
@@ -1349,7 +1353,7 @@
result.add(ref);
count++;
} else {
- messageExpired(createConnectionContext(), ref, false);
+ messageExpired(createConnectionContext(), ref);
}
}
} finally {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Jul 7 15:54:32 2009
@@ -69,6 +69,7 @@
* @version $Revision$
*/
public class RegionBroker extends EmptyBroker {
+ public static final String ORIGINAL_EXPIRATION = "originalExpiration";
private static final Log LOG = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
@@ -689,7 +690,7 @@
}
long expiration=message.getExpiration();
message.setExpiration(0);
- message.setProperty("originalExpiration",new Long(
+ message.setProperty(ORIGINAL_EXPIRATION,new Long(
expiration));
if(!message.isPersistent()){
message.setPersistent(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Tue Jul 7 15:54:32 2009
@@ -277,7 +277,7 @@
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() >
numMessages);
- assertTrue(exceptions.isEmpty());
+ assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
}
public void testConsumerRecover() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
Tue Jul 7 15:54:32 2009
@@ -354,12 +354,7 @@
mapMessage.onSend();
mapMessage.setContent(mapMessage.getContent());
- try {
- mapMessage.getString("String");
- fail("Should throw a Null pointer");
- }catch(NullPointerException e){
-
- }
+ assertNull(mapMessage.getString("String"));
mapMessage.clearBody();
mapMessage.setString("String", "String");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Tue Jul 7 15:54:32 2009
@@ -32,14 +32,13 @@
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
public class ExpiredMessagesTest extends CombinationTestSupport {
private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
@@ -60,15 +59,22 @@
}
protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("localhost");
- broker.setDataDirectory("data/");
- broker.setUseJmx(true);
- broker.deleteAllMessages();
- broker.addConnector("tcp://localhost:61616");
- broker.start();
- broker.waitUntilStarted();
- }
+ broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setDataDirectory("data/");
+ broker.setUseJmx(true);
+ broker.deleteAllMessages();
+
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ defaultPolicy.setExpireMessagesPeriod(100);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(defaultPolicy);
+ broker.setDestinationPolicy(policyMap);
+
+ broker.addConnector("tcp://localhost:61616");
+ broker.start();
+ broker.waitUntilStarted();
+ }
public void testExpiredMessages() throws Exception {
@@ -93,6 +99,7 @@
Thread.sleep(100);
end = System.currentTimeMillis();
}
+ consumer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
@@ -109,6 +116,7 @@
while (i++ < 30000) {
producer.send(session.createTextMessage("test"));
}
+ producer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
@@ -119,14 +127,23 @@
consumerThread.join();
producingThread.join();
+ session.close();
+ Thread.sleep(5000);
DestinationViewMBean view = createView(destination);
LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount()
+ ", dequeues: " + view.getDequeueCount()
+ ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount()
+ ", expiries: " + view.getExpiredCount());
assertEquals("got what did not expire", received.get(), view.getDequeueCount() -
view.getExpiredCount());
- //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount()
- view.getDequeueCount(), view.getInFlightCount());
+
+ long expiry = System.currentTimeMillis() + 30000;
+ while (view.getInFlightCount() > 0 && System.currentTimeMillis() <
expiry) {
+ Thread.sleep(500);
+ }
+ LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount()
+ ", dequeues: " + view.getDequeueCount()
+ + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount()
+ ", expiries: " + view.getExpiredCount());
+ assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
@@ -146,7 +163,4 @@
broker.stop();
broker.waitUntilStopped();
}
-
-
-
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Tue Jul 7 15:54:32 2009
@@ -16,7 +16,13 @@
*/
package org.apache.activemq.usecases;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
@@ -57,31 +63,45 @@
junit.textui.TestRunner.run(suite());
}
- protected void setUp() throws Exception {
- broker = new BrokerService();
- broker.setBrokerName("localhost");
- broker.setDataDirectory("data/");
- broker.setUseJmx(true);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector("tcp://localhost:61616");
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(expiryPeriod);
- defaultEntry.setMaxExpirePageSize(200);
- // so memory is not consumed by DLQ turn if off
- defaultEntry.setDeadLetterStrategy(null);
- defaultEntry.setMemoryLimit(200*1000);
- policyMap.setDefaultEntry(defaultEntry);
+ protected void createBrokerWithMemoryLimit() throws Exception {
+ doCreateBroker(true);
+ }
+
+ protected void createBroker() throws Exception {
+ doCreateBroker(false);
+ }
+
+ private void doCreateBroker(boolean memoryLimit) throws Exception {
+ broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setDataDirectory("data/");
+ broker.setUseJmx(true);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.addConnector("tcp://localhost:61616");
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setExpireMessagesPeriod(expiryPeriod);
+ defaultEntry.setMaxExpirePageSize(200);
+
+ if (memoryLimit) {
+ // so memory is not consumed by DLQ turn if off
+ defaultEntry.setDeadLetterStrategy(null);
+ defaultEntry.setMemoryLimit(200 * 1000);
+ }
+
+ policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
+
+ broker.start();
+
+ broker.waitUntilStarted();
+ }
- broker.start();
-
- broker.waitUntilStarted();
- }
-
- public void testExpiredMessages() throws Exception {
+ public void testExpiredMessagesWithNoConsumer() throws Exception {
+ createBrokerWithMemoryLimit();
+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -121,7 +141,89 @@
DestinationViewMBean view = createView(destination);
assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
}
+
+
+ public void testExpiredMessagesWitVerySlowConsumer() throws Exception {
+ createBroker();
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ producer = session.createProducer(destination);
+ final int ttl = 4000;
+ producer.setTimeToLive(ttl);
+
+ final long sendCount = 1001;
+ final CountDownLatch receivedOneCondition = new CountDownLatch(1);
+ final CountDownLatch waitCondition = new CountDownLatch(1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message message) {
+ try {
+ LOG.info("Got my message: " + message);
+ receivedOneCondition.countDown();
+ waitCondition.await(60, TimeUnit.SECONDS);
+ message.acknowledge();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.toString());
+ }
+ }
+ });
+
+ connection.start();
+
+
+ Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ int i = 0;
+ long tStamp = System.currentTimeMillis();
+ while (i++ < sendCount) {
+ producer.send(session.createTextMessage("test"));
+ if (i%100 == 0) {
+ LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis()
- tStamp) / 100) + "m/ms");
+ tStamp = System.currentTimeMillis() ;
+ }
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ }
+ }
+ };
+
+ producingThread.start();
+
+ final long expiry = System.currentTimeMillis() + 20*1000;
+ while (producingThread.isAlive() && expiry > System.currentTimeMillis())
{
+ producingThread.join(1000);
+ }
+
+ assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS));
+ assertTrue("producer completed within time ", !producingThread.isAlive());
+
+ Thread.sleep(2 * Math.max(ttl, expiryPeriod));
+ DestinationViewMBean view = createView(destination);
+
+ assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount());
+ assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount());
+
+
+ // let the ack happen
+ waitCondition.countDown();
+
+ Thread.sleep(Math.max(ttl, expiryPeriod));
+
+ assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount());
+
+ assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
+
+ consumer.close();
+ LOG.info("done: " + getName());
+ }
+
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
String domain = "org.apache.activemq";
|