activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r475848 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/broker/
Date Thu, 16 Nov 2006 18:24:15 GMT
Author: chirino
Date: Thu Nov 16 10:24:14 2006
New Revision: 475848

URL: http://svn.apache.org/viewvc?view=rev&rev=475848
Log:
see: http://issues.apache.org/activemq/browse/AMQ-1056
We now expire messages on the broker.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Nov 16 10:24:14 2006
@@ -377,6 +377,13 @@
                     while(pending.hasNext()&&!isFull()){
                         MessageReference node=pending.next();
                         pending.remove();
+                        
+                        // Message may have been sitting in the pending list a while
+                        // waiting for the consumer to ak the message.
+                		if( node.isExpired() ) {
+                			continue; // just drop it.
+                		}
+
                         dispatch(node);
                     }
                 }finally{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Nov 16 10:24:14 2006
@@ -17,7 +17,14 @@
  */
 package org.apache.activemq.broker.region;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -28,7 +35,6 @@
 import org.apache.activemq.broker.region.group.MessageGroupSet;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
-import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -51,14 +57,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -122,7 +121,13 @@
                 store.recover(new MessageRecoveryListener(){
 
                     public void recoverMessage(Message message){
-                        message.setRegionDestination(Queue.this);
+                    	// Message could have expired while it was being loaded..
+                    	if( message.isExpired() ) {
+                    		// TODO: remove message from store.
+                    		return;
+                    	}
+
+                    	message.setRegionDestination(Queue.this);
                         synchronized(messages){
                             try{
                                 messages.addMessageLast(message);
@@ -295,11 +300,23 @@
     }
 
     public void send(final ConnectionContext context,final Message message) throws Exception{
+    	// There is delay between the client sending it and it arriving at the
+    	// destination.. it may have expired.
+    	if( message.isExpired() ) {
+    		return;
+    	}
+    		
         if(context.isProducerFlowControl()){
             if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
             }else{
                 usageManager.waitForSpace();
+                
+                // The usage manager could have delayed us by the time
+                // we unblock the message could have expired..
+            	if( message.isExpired() ) {
+            		return;
+            	}
             }
         }
         message.setRegionDestination(this);
@@ -310,6 +327,14 @@
             context.getTransaction().addSynchronization(new Synchronization(){
 
                 public void afterCommit() throws Exception{
+                	
+                	// It could take while before we receive the commit
+                	// operration.. by that time the message could have expired..
+                	if( message.isExpired() ) {
+                		// TODO: remove message from store.
+                		return;
+                	}
+
                     sendMessage(context,message);
                 }
             });

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Nov 16 10:24:14 2006
@@ -232,11 +232,23 @@
 
     public void send(final ConnectionContext context, final Message message) throws Exception
{
 
+    	// There is delay between the client sending it and it arriving at the
+    	// destination.. it may have expired.
+    	if( message.isExpired() ) {
+    		return;
+    	}
+
         if (context.isProducerFlowControl()) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
             } else {
                 usageManager.waitForSpace();
+                
+                // The usage manager could have delayed us by the time
+                // we unblock the message could have expired..
+            	if( message.isExpired() ) {
+            		return;
+            	}
             }    
         }
 
@@ -251,6 +263,12 @@
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
                     public void afterCommit() throws Exception {
+                    	// It could take while before we receive the commit
+                    	// operration.. by that time the message could have expired..
+                    	if( message.isExpired() ) {
+                    		// TODO: remove message from store.
+                    		return;
+                    	}
                         dispatch(context, message);
                     }
                 });

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Nov 16 10:24:14 2006
@@ -325,6 +325,14 @@
             for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
                 MessageReference message=(MessageReference) iter.next();
                 iter.remove();
+                
+                // Message may have been sitting in the matched list a while
+                // waiting for the consumer to ak the message.
+        		if( message.isExpired() ) {
+        			message.decrementReferenceCount();
+        			continue; // just drop it.
+        		}
+
                 dispatch(message);
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Thu Nov 16 10:24:14 2006
@@ -409,7 +409,10 @@
     }
 
     public boolean isExpired() {
-        // TODO: need to be implemented. 
+        long expireTime = getExpiration();
+        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+            return true;
+        }
         return false;
     }
     

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java?view=auto&rev=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Thu Nov 16 10:24:14 2006
@@ -0,0 +1,285 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class MessageExpirationTest extends BrokerTestSupport {
+    
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public byte destinationType;
+    public boolean durableConsumer;
+    
+    protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination,
int deliveryMode, int timeToLive) {
+    	Message message = createMessage(producerInfo, destination, deliveryMode);
+    	long now = System.currentTimeMillis();
+    	message.setTimestamp(now);
+    	message.setExpiration(now+timeToLive);
+        return message;
+    }
+    
+    
+    public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() {    
+        addCombinationValues( "deliveryMode", new Object[]{ 
+                new Integer(DeliveryMode.NON_PERSISTENT), 
+                new Integer(DeliveryMode.PERSISTENT)} );        
+        addCombinationValues( "destinationType", new Object[]{ 
+                new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE),
+                new Byte(ActiveMQDestination.QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TOPIC_TYPE), 
+                } );
+    }
+
+    public void testMessagesWaitingForUssageDecreaseExpire() throws Exception {
+        
+        // Start a producer
+        final StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        final ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        
+        // Start a consumer..
+        final StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        
+        destination = createDestinationInfo(connection2, connectionInfo2, destinationType);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setPrefetchSize(1);
+        connection2.send(consumerInfo2);
+        
+        // Reduce the limit so that only 1 message can flow through the broker at a time.
+        broker.getMemoryManager().setLimit(1);
+        
+        final Message m1 = createMessage(producerInfo, destination, deliveryMode);
+        final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000);
+        final Message m3 = createMessage(producerInfo, destination, deliveryMode);
+        final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000);
+        
+        // Produce in an async thread since the producer will be getting blocked by the usage
manager..
+        new Thread() {
+        	public void run() {
+                // m1 and m3 should not expire.. but the others should.
+                try {
+					connection.send(m1);
+					connection.send(m2);
+					connection.send(m3);  
+					connection.send(m4);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}        		
+        	}
+        }.start();
+        
+                
+        // Make sure only 1 message was delivered due to prefetch == 1
+        Message m = receiveMessage(connection2);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        assertNoMessagesLeft(connection);
+        
+        // Sleep before we ack so that the messages expire on the usage manager
+        Thread.sleep(1500);
+        connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // 2nd message received should be m3.. it should have expired 2nd message sent.
+        m = receiveMessage(connection2);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+        
+        // Sleep before we ack so that the messages expire on the usage manager
+        Thread.sleep(1500);
+        connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection2);
+        
+        connection.send(closeConnectionInfo(connectionInfo));
+        connection.send(closeConnectionInfo(connectionInfo2));
+    }
+    
+    
+    public void initCombosForTestMessagesInLongTransactionExpire() {    
+        addCombinationValues( "deliveryMode", new Object[]{ 
+                new Integer(DeliveryMode.NON_PERSISTENT), 
+                new Integer(DeliveryMode.PERSISTENT)} );        
+        addCombinationValues( "destinationType", new Object[]{ 
+                new Byte(ActiveMQDestination.QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TOPIC_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+                } );
+    }
+    
+    public void testMessagesInLongTransactionExpire() throws Exception {
+        
+        // Start a producer and consumer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, destinationType);
+        
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setPrefetchSize(1000);
+        connection.send(consumerInfo);
+
+        // Start the tx..
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        
+        // m1 and m3 should not expire.. but the others should.
+        Message m1 = createMessage(producerInfo, destination, deliveryMode);
+        m1.setTransactionId(txid);
+        connection.send(m1);
+        Message m = createMessage(producerInfo, destination, deliveryMode, 1000);
+        m.setTransactionId(txid);
+        connection.send(m);
+        Message m3 = createMessage(producerInfo, destination, deliveryMode);
+        m3.setTransactionId(txid);
+        connection.send(m3);  
+        m = createMessage(producerInfo, destination, deliveryMode, 1000);
+        m.setTransactionId(txid);
+        connection.send(m);
+        
+        // Sleep before we commit so that the messages expire on the commit list..
+        Thread.sleep(1500);        
+        connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+                
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // 2nd message received should be m3.. it should have expired 2nd message sent.
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection);
+        
+        connection.send(closeConnectionInfo(connectionInfo));
+    }
+
+
+    public void TestMessagesInSubscriptionPendingListExpire() {    
+        addCombinationValues( "deliveryMode", new Object[]{ 
+                new Integer(DeliveryMode.NON_PERSISTENT), 
+                new Integer(DeliveryMode.PERSISTENT)} );        
+        addCombinationValues( "destinationType", new Object[]{ 
+                new Byte(ActiveMQDestination.QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TOPIC_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+                } );
+    }
+    
+    public void initCombosForTestMessagesInSubscriptionPendingListExpire() {    
+        addCombinationValues( "deliveryMode", new Object[]{ 
+                new Integer(DeliveryMode.NON_PERSISTENT), 
+                new Integer(DeliveryMode.PERSISTENT)} );        
+        addCombinationValues( "destinationType", new Object[]{ 
+                new Byte(ActiveMQDestination.QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TOPIC_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE), 
+                new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+                } );
+    }
+
+    public void testMessagesInSubscriptionPendingListExpire() throws Exception {
+        
+        // Start a producer and consumer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, destinationType);
+        
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setPrefetchSize(1);
+        connection.send(consumerInfo);
+        
+        // m1 and m3 should not expire.. but the others should.
+        Message m1 = createMessage(producerInfo, destination, deliveryMode);
+        connection.send(m1);
+        connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
+        Message m3 = createMessage(producerInfo, destination, deliveryMode);
+        connection.send(m3);  
+        connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
+                
+        // Make sure only 1 message was delivered due to prefetch == 1
+        Message m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        assertNoMessagesLeft(connection);
+        
+        // Sleep before we ack so that the messages expire on the pending list..
+        Thread.sleep(1500);        
+        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // 2nd message received should be m3.. it should have expired 2nd message sent.
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+        
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection);
+        
+        connection.send(closeConnectionInfo(connectionInfo));
+    }
+    
+    public static Test suite() {
+        return suite(MessageExpirationTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}



Mime
View raw message