activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r791881 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq/command/ test/java/org/apache/activemq/usecases/
Date Tue, 07 Jul 2009 15:54:33 GMT
Author: gtully
Date: Tue Jul  7 15:54:32 2009
New Revision: 791881

URL: http://svn.apache.org/viewvc?rev=791881&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-1112 - fix issues with inflight count
when messages expire on the consumer/client and on consumer close

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Jul  7 15:54:32 2009
@@ -644,6 +644,9 @@
                     if (ack != null) {
                         deliveredMessages.clear();
                         ackCounter = 0;
+            		} else {
+            		    ack = pendingAck;
+            		    pendingAck = null;
             		}
             	}
             } else if (pendingAck != null && pendingAck.isStandardAck()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Tue Jul  7 15:54:32 2009
@@ -69,7 +69,7 @@
     }
 
     public String toString() {
-        return "Message " + message.getMessageId() + " dropped=" + dropped + " locked=" +
(lockOwner != null);
+        return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" +
acked + " locked=" + (lockOwner != null);
     }
 
     public void incrementRedeliveryCounter() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Jul  7 15:54:32 2009
@@ -303,9 +303,16 @@
                 int index = 0;
                 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
                     final MessageReference node = iter.next();
-                    if( node.isExpired() ) {
-                        node.getRegionDestination().messageExpired(context, this, node);
+                    if (hasNotAlreadyExpired(node)) {
+                        if (node.isExpired()) {
+                            node.getRegionDestination().messageExpired(context, this, node);
+                            dispatched.remove(node);
+                            node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                        }
+                    } else {
+                        // already expired
                         dispatched.remove(node);
+                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
   
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
                         prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -411,6 +418,16 @@
         }
     }
 
+    private boolean hasNotAlreadyExpired(MessageReference node) {
+        boolean hasNotExpired = true;
+        try {
+            hasNotExpired = node.getMessage().getProperty(RegionBroker.ORIGINAL_EXPIRATION)
== null;
+        } catch (IOException e) {
+            LOG.warn("failed to determine value message property " + RegionBroker.ORIGINAL_EXPIRATION
+ " for " + node, e);
+        }
+        return hasNotExpired;
+    }
+
     /**
      * Checks an ack versus the contents of the dispatched list.
      * 
@@ -545,6 +562,11 @@
         List<MessageReference> rc = new ArrayList<MessageReference>();
         synchronized(pendingLock) {
             super.remove(context, destination);
+            // Here is a potential problem concerning Inflight stat:
+            // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
+            // Except if each commit or rollback callback action comes before remove of subscriber.
+            rc.addAll(pending.remove(context, destination));
+
             // Synchronized to DispatchLock
             synchronized(dispatchLock) {
 	            for (MessageReference r : dispatched) {
@@ -552,12 +574,10 @@
 	                	rc.add((QueueMessageReference)r);
 	                }
 	            }
-            }
-            // TODO Dispatched messages should be decremented from Inflight stat 
-            // Here is a potential problem concerning Inflight stat:
-            // Messages not already committed or rolled back may not be removed from dispatched
list at the moment
-            // Except if each commit or rollback callback action comes before remove of subscriber.
-            rc.addAll(pending.remove(context, destination));
+                destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
+                destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
+                dispatched.clear();
+            }            
         }
         return rc;
     }
@@ -661,12 +681,15 @@
         if (node.getRegionDestination() != null) {
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
      
+                node.getRegionDestination().getDestinationStatistics().getInflight().increment();
  
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " +
message.getMessageId() 
+                            + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount()
+                            + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount());
+                }
             }
         }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId());
-        }
+        
         if (info.isDispatchAsync()) {
             try {
                 dispatchPending();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Jul  7 15:54:32 2009
@@ -205,7 +205,7 @@
                         // Message could have expired while it was being
                         // loaded..
                         if (broker.isExpired(message)) {
-                            messageExpired(createConnectionContext(), null, message, false);
+                            messageExpired(createConnectionContext(), message);
                             return true;
                         }
                         if (hasSpace()) {
@@ -343,6 +343,12 @@
         // while removing up a subscription.
         dispatchLock.lock();
         try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
+                        + ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
+                        + ", dispatched: " + getDestinationStatistics().getDispatched().getCount()
+                        + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+            }
             synchronized (consumers) {
                 removeFromConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
@@ -552,10 +558,12 @@
     }
     
     private void expireMessages() {
-        LOG.info("expiring messages...");
-
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Expiring messages ..");
+        }
+        
         // just track the insertion count
-        List<Message> l = new AbstractList<Message>() {
+        List<Message> browsedMessages = new AbstractList<Message>() {
             int size = 0;
 
             @Override
@@ -573,7 +581,7 @@
                 return null;
             }
         };
-        doBrowse(true, l, getMaxBrowsePageSize());
+        doBrowse(true, browsedMessages, this.getMaxExpirePageSize());
     }
 
     public void gc(){
@@ -750,7 +758,7 @@
                     addAll(pagedInPendingDispatch, l, max, toExpire);
                     for (MessageReference ref : toExpire) {
                         pagedInPendingDispatch.remove(ref);
-                        messageExpired(connectionContext, ref, false);
+                        messageExpired(connectionContext, ref);
                     }
                 }
                 toExpire.clear();
@@ -758,7 +766,7 @@
                     addAll(pagedInMessages.values(), l, max, toExpire);   
                 }
                 for (MessageReference ref : toExpire) {
-                    messageExpired(connectionContext, ref, false);
+                    messageExpired(connectionContext, ref);
                 }
                 
                 if (l.size() < getMaxBrowsePageSize()) {
@@ -771,7 +779,7 @@
                                 if (node != null) {
                                     if (broker.isExpired(node)) {
                                         messageExpired(connectionContext,
-                                                createMessageReference(node.getMessage()),
false);
+                                                createMessageReference(node.getMessage()));
                                     } else if (l.contains(node.getMessage()) == false) {
                                         l.add(node.getMessage());
                                     }
@@ -1249,21 +1257,17 @@
         }
     }
     
-    public void messageExpired(ConnectionContext context,MessageReference reference, boolean
dispatched) {
-        messageExpired(context,null,reference, dispatched);
+    public void messageExpired(ConnectionContext context,MessageReference reference) {
+        messageExpired(context,null,reference);
     }
     
     public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference) {
-        messageExpired(context, subs, reference, true);
-    }
-    
-    public void messageExpired(ConnectionContext context,Subscription subs, MessageReference
reference, boolean dispatched) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("message expired: " + reference);
+        }
         broker.messageExpired(context, reference);
         destinationStatistics.getDequeues().increment();
         destinationStatistics.getExpired().increment();
-        if (dispatched) {
-            destinationStatistics.getInflight().decrement();
-        }
         try {
             removeMessage(context,subs,(QueueMessageReference)reference);
         } catch (IOException e) {
@@ -1349,7 +1353,7 @@
                                 result.add(ref);
                                 count++;
                             } else {
-                                messageExpired(createConnectionContext(), ref, false);
+                                messageExpired(createConnectionContext(), ref);
                             }
                         }
                     } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Jul  7 15:54:32 2009
@@ -69,6 +69,7 @@
  * @version $Revision$
  */
 public class RegionBroker extends EmptyBroker {
+    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
     private static final Log LOG = LogFactory.getLog(RegionBroker.class);
     private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
 
@@ -689,7 +690,7 @@
 						    }
 							long expiration=message.getExpiration();
 							message.setExpiration(0);
-							message.setProperty("originalExpiration",new Long(
+							message.setProperty(ORIGINAL_EXPIRATION,new Long(
 							        expiration));
 							if(!message.isPersistent()){
 							    message.setPersistent(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Tue Jul  7 15:54:32 2009
@@ -277,7 +277,7 @@
         executor.shutdown();
         executor.awaitTermination(30, TimeUnit.SECONDS);
         assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() >
numMessages);
-        assertTrue(exceptions.isEmpty());
+        assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
     }
     
     public void testConsumerRecover() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
Tue Jul  7 15:54:32 2009
@@ -354,12 +354,7 @@
 
         mapMessage.onSend();
         mapMessage.setContent(mapMessage.getContent());
-        try {
-        mapMessage.getString("String");
-        fail("Should throw a Null pointer");
-        }catch(NullPointerException e){
-            
-        }
+        assertNull(mapMessage.getString("String"));
         mapMessage.clearBody();
         mapMessage.setString("String", "String");
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Tue Jul  7 15:54:32 2009
@@ -32,14 +32,13 @@
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
-
 public class ExpiredMessagesTest extends CombinationTestSupport {
 
     private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
@@ -60,15 +59,22 @@
     }
 	
 	protected void setUp() throws Exception {
-		broker = new BrokerService();
-		broker.setBrokerName("localhost");
-		broker.setDataDirectory("data/");
-		broker.setUseJmx(true);
-		broker.deleteAllMessages();
-		broker.addConnector("tcp://localhost:61616");
-		broker.start();
-		broker.waitUntilStarted();
-	}
+        broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        broker.setDataDirectory("data/");
+        broker.setUseJmx(true);
+        broker.deleteAllMessages();
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setExpireMessagesPeriod(100);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        broker.waitUntilStarted();
+    }
 	
 	public void testExpiredMessages() throws Exception {
 		
@@ -93,6 +99,7 @@
 						Thread.sleep(100);
 						end = System.currentTimeMillis();
 					}
+					consumer.close();
 				} catch (Throwable ex) {
 					ex.printStackTrace();
 				}
@@ -109,6 +116,7 @@
                 	while (i++ < 30000) {
                 		producer.send(session.createTextMessage("test"));
                 	}
+                	producer.close();
                 } catch (Throwable ex) {
                     ex.printStackTrace();
                 }
@@ -119,14 +127,23 @@
 		
         consumerThread.join();
         producingThread.join();
+        session.close();
         
+        Thread.sleep(5000);
         
         DestinationViewMBean view = createView(destination);
         LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount()
+ ", dequeues: " + view.getDequeueCount()
                 + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount()
+ ", expiries: " + view.getExpiredCount());
         
         assertEquals("got what did not expire", received.get(), view.getDequeueCount() -
view.getExpiredCount());
-        //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount()
- view.getDequeueCount(), view.getInFlightCount());
+        
+        long expiry = System.currentTimeMillis() + 30000;
+        while (view.getInFlightCount() > 0 && System.currentTimeMillis() <
expiry) {
+            Thread.sleep(500);
+        }
+        LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount()
+ ", dequeues: " + view.getDequeueCount()
+                + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount()
+ ", expiries: " + view.getExpiredCount());
+        assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
 	}
 	
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
@@ -146,7 +163,4 @@
 		broker.stop();
 		broker.waitUntilStopped();
 	}
-
-	
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=791881&r1=791880&r2=791881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Tue Jul  7 15:54:32 2009
@@ -16,7 +16,13 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.management.MBeanServer;
@@ -57,31 +63,45 @@
         junit.textui.TestRunner.run(suite());
     }
 	
-	protected void setUp() throws Exception {
-		broker = new BrokerService();
-		broker.setBrokerName("localhost");
-		broker.setDataDirectory("data/");
-		broker.setUseJmx(true);
-		broker.setDeleteAllMessagesOnStartup(true);
-		broker.addConnector("tcp://localhost:61616");
-			
-		PolicyMap policyMap = new PolicyMap();
-		PolicyEntry defaultEntry = new PolicyEntry();
-		defaultEntry.setExpireMessagesPeriod(expiryPeriod);
-		defaultEntry.setMaxExpirePageSize(200);
-		// so memory is not consumed by DLQ turn if off
-		defaultEntry.setDeadLetterStrategy(null);
-		defaultEntry.setMemoryLimit(200*1000);
-		policyMap.setDefaultEntry(defaultEntry);
+    protected void createBrokerWithMemoryLimit() throws Exception {
+        doCreateBroker(true);
+    }
+    
+    protected void createBroker() throws Exception {
+        doCreateBroker(false);
+    }
+    
+    private void doCreateBroker(boolean memoryLimit) throws Exception {
+        broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        broker.setDataDirectory("data/");
+        broker.setUseJmx(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:61616");
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(expiryPeriod);
+        defaultEntry.setMaxExpirePageSize(200);
+
+        if (memoryLimit) {
+            // so memory is not consumed by DLQ turn if off
+            defaultEntry.setDeadLetterStrategy(null);
+            defaultEntry.setMemoryLimit(200 * 1000);
+        }
+
+        policyMap.setDefaultEntry(defaultEntry);
         broker.setDestinationPolicy(policyMap);
+
+        broker.start();
+
+        broker.waitUntilStarted();
+    }
 		
-		broker.start();
-		
-		broker.waitUntilStarted();
-	}
-		
-	public void testExpiredMessages() throws Exception {
+	public void testExpiredMessagesWithNoConsumer() throws Exception {
 		
+	    createBrokerWithMemoryLimit();
+	    
 		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
 		connection = factory.createConnection();
 		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -121,7 +141,89 @@
         DestinationViewMBean view = createView(destination);
         assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
 	}
+
 	
+    
+    public void testExpiredMessagesWitVerySlowConsumer() throws Exception {
+        createBroker();  
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        connection = factory.createConnection();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        final int ttl = 4000;
+        producer.setTimeToLive(ttl);
+        
+        final long sendCount = 1001; 
+        final CountDownLatch receivedOneCondition = new CountDownLatch(1);
+        final CountDownLatch waitCondition = new CountDownLatch(1);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("Got my message: " + message);
+                    receivedOneCondition.countDown();
+                    waitCondition.await(60, TimeUnit.SECONDS);
+                    message.acknowledge();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    fail(e.toString());
+                }  
+            }        
+        });
+        
+        connection.start();
+      
+        
+        Thread producingThread = new Thread("Producing Thread") {
+            public void run() {
+                try {
+                    int i = 0;
+                    long tStamp = System.currentTimeMillis();
+                    while (i++ < sendCount) {
+                        producer.send(session.createTextMessage("test"));
+                        if (i%100 == 0) {
+                            LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis()
- tStamp) / 100)  + "m/ms");
+                            tStamp = System.currentTimeMillis() ;
+                        }
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        
+        producingThread.start();
+        
+        final long expiry = System.currentTimeMillis() + 20*1000;
+        while (producingThread.isAlive() && expiry > System.currentTimeMillis())
{
+            producingThread.join(1000);
+        }
+        
+        assertTrue("got one message", receivedOneCondition.await(10, TimeUnit.SECONDS));
+        assertTrue("producer completed within time ", !producingThread.isAlive());
+        
+        Thread.sleep(2 * Math.max(ttl, expiryPeriod));
+        DestinationViewMBean view = createView(destination);
+            
+        assertEquals("all dispatched up to default prefetch ", 1000, view.getDispatchCount());
+        assertEquals("All sent save one have expired ", sendCount, view.getExpiredCount());
    
+        
+        
+        // let the ack happen
+        waitCondition.countDown();
+     
+        Thread.sleep(Math.max(ttl, expiryPeriod));
+        
+        assertEquals("all sent save one have expired ", sendCount, view.getExpiredCount());
+        
+        assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
+        
+        consumer.close();
+        LOG.info("done: " + getName());
+    }
+
 	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception
{
 		 MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
 		 String domain = "org.apache.activemq";



Mime
View raw message