Author: chirino
Date: Thu Nov 16 10:24:14 2006
New Revision: 475848
URL: http://svn.apache.org/viewvc?view=rev&rev=475848
Log:
see: http://issues.apache.org/activemq/browse/AMQ-1056
We now expire messages on the broker.
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Nov 16 10:24:14 2006
@@ -377,6 +377,13 @@
while(pending.hasNext()&&!isFull()){
MessageReference node=pending.next();
pending.remove();
+
+ // Message may have been sitting in the pending list a while
+ // waiting for the consumer to ak the message.
+ if( node.isExpired() ) {
+ continue; // just drop it.
+ }
+
dispatch(node);
}
}finally{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Nov 16 10:24:14 2006
@@ -17,7 +17,14 @@
*/
package org.apache.activemq.broker.region;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -28,7 +35,6 @@
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
-import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -51,14 +57,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -122,7 +121,13 @@
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message){
- message.setRegionDestination(Queue.this);
+ // Message could have expired while it was being loaded..
+ if( message.isExpired() ) {
+ // TODO: remove message from store.
+ return;
+ }
+
+ message.setRegionDestination(Queue.this);
synchronized(messages){
try{
messages.addMessageLast(message);
@@ -295,11 +300,23 @@
}
public void send(final ConnectionContext context,final Message message) throws Exception{
+ // There is delay between the client sending it and it arriving at the
+ // destination.. it may have expired.
+ if( message.isExpired() ) {
+ return;
+ }
+
if(context.isProducerFlowControl()){
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
}else{
usageManager.waitForSpace();
+
+ // The usage manager could have delayed us by the time
+ // we unblock the message could have expired..
+ if( message.isExpired() ) {
+ return;
+ }
}
}
message.setRegionDestination(this);
@@ -310,6 +327,14 @@
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
+
+ // It could take while before we receive the commit
+ // operration.. by that time the message could have expired..
+ if( message.isExpired() ) {
+ // TODO: remove message from store.
+ return;
+ }
+
sendMessage(context,message);
}
});
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Nov 16 10:24:14 2006
@@ -232,11 +232,23 @@
public void send(final ConnectionContext context, final Message message) throws Exception
{
+ // There is delay between the client sending it and it arriving at the
+ // destination.. it may have expired.
+ if( message.isExpired() ) {
+ return;
+ }
+
if (context.isProducerFlowControl()) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
} else {
usageManager.waitForSpace();
+
+ // The usage manager could have delayed us by the time
+ // we unblock the message could have expired..
+ if( message.isExpired() ) {
+ return;
+ }
}
}
@@ -251,6 +263,12 @@
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
+ // It could take while before we receive the commit
+ // operration.. by that time the message could have expired..
+ if( message.isExpired() ) {
+ // TODO: remove message from store.
+ return;
+ }
dispatch(context, message);
}
});
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Nov 16 10:24:14 2006
@@ -325,6 +325,14 @@
for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
MessageReference message=(MessageReference) iter.next();
iter.remove();
+
+ // Message may have been sitting in the matched list a while
+ // waiting for the consumer to ak the message.
+ if( message.isExpired() ) {
+ message.decrementReferenceCount();
+ continue; // just drop it.
+ }
+
dispatch(message);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?view=diff&rev=475848&r1=475847&r2=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Thu Nov 16 10:24:14 2006
@@ -409,7 +409,10 @@
}
public boolean isExpired() {
- // TODO: need to be implemented.
+ long expireTime = getExpiration();
+ if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+ return true;
+ }
return false;
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java?view=auto&rev=475848
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Thu Nov 16 10:24:14 2006
@@ -0,0 +1,285 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class MessageExpirationTest extends BrokerTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public byte destinationType;
+ public boolean durableConsumer;
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination,
int deliveryMode, int timeToLive) {
+ Message message = createMessage(producerInfo, destination, deliveryMode);
+ long now = System.currentTimeMillis();
+ message.setTimestamp(now);
+ message.setExpiration(now+timeToLive);
+ return message;
+ }
+
+
+ public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() {
+ addCombinationValues( "deliveryMode", new Object[]{
+ new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT)} );
+ addCombinationValues( "destinationType", new Object[]{
+ new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE),
+ new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE),
+ } );
+ }
+
+ public void testMessagesWaitingForUssageDecreaseExpire() throws Exception {
+
+ // Start a producer
+ final StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ final ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+
+ // Start a consumer..
+ final StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ destination = createDestinationInfo(connection2, connectionInfo2, destinationType);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setPrefetchSize(1);
+ connection2.send(consumerInfo2);
+
+ // Reduce the limit so that only 1 message can flow through the broker at a time.
+ broker.getMemoryManager().setLimit(1);
+
+ final Message m1 = createMessage(producerInfo, destination, deliveryMode);
+ final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000);
+ final Message m3 = createMessage(producerInfo, destination, deliveryMode);
+ final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000);
+
+ // Produce in an async thread since the producer will be getting blocked by the usage
manager..
+ new Thread() {
+ public void run() {
+ // m1 and m3 should not expire.. but the others should.
+ try {
+ connection.send(m1);
+ connection.send(m2);
+ connection.send(m3);
+ connection.send(m4);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+
+ // Make sure only 1 message was delivered due to prefetch == 1
+ Message m = receiveMessage(connection2);
+ assertNotNull(m);
+ assertEquals(m1.getMessageId(), m.getMessageId());
+ assertNoMessagesLeft(connection);
+
+ // Sleep before we ack so that the messages expire on the usage manager
+ Thread.sleep(1500);
+ connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // 2nd message received should be m3.. it should have expired 2nd message sent.
+ m = receiveMessage(connection2);
+ assertNotNull(m);
+ assertEquals(m3.getMessageId(), m.getMessageId());
+
+ // Sleep before we ack so that the messages expire on the usage manager
+ Thread.sleep(1500);
+ connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // And there should be no messages left now..
+ assertNoMessagesLeft(connection2);
+
+ connection.send(closeConnectionInfo(connectionInfo));
+ connection.send(closeConnectionInfo(connectionInfo2));
+ }
+
+
+ public void initCombosForTestMessagesInLongTransactionExpire() {
+ addCombinationValues( "deliveryMode", new Object[]{
+ new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT)} );
+ addCombinationValues( "destinationType", new Object[]{
+ new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE),
+ new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+ } );
+ }
+
+ public void testMessagesInLongTransactionExpire() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo, destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setPrefetchSize(1000);
+ connection.send(consumerInfo);
+
+ // Start the tx..
+ LocalTransactionId txid = createLocalTransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ // m1 and m3 should not expire.. but the others should.
+ Message m1 = createMessage(producerInfo, destination, deliveryMode);
+ m1.setTransactionId(txid);
+ connection.send(m1);
+ Message m = createMessage(producerInfo, destination, deliveryMode, 1000);
+ m.setTransactionId(txid);
+ connection.send(m);
+ Message m3 = createMessage(producerInfo, destination, deliveryMode);
+ m3.setTransactionId(txid);
+ connection.send(m3);
+ m = createMessage(producerInfo, destination, deliveryMode, 1000);
+ m.setTransactionId(txid);
+ connection.send(m);
+
+ // Sleep before we commit so that the messages expire on the commit list..
+ Thread.sleep(1500);
+ connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+
+ m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m1.getMessageId(), m.getMessageId());
+ connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // 2nd message received should be m3.. it should have expired 2nd message sent.
+ m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m3.getMessageId(), m.getMessageId());
+ connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // And there should be no messages left now..
+ assertNoMessagesLeft(connection);
+
+ connection.send(closeConnectionInfo(connectionInfo));
+ }
+
+
+ public void TestMessagesInSubscriptionPendingListExpire() {
+ addCombinationValues( "deliveryMode", new Object[]{
+ new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT)} );
+ addCombinationValues( "destinationType", new Object[]{
+ new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE),
+ new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+ } );
+ }
+
+ public void initCombosForTestMessagesInSubscriptionPendingListExpire() {
+ addCombinationValues( "deliveryMode", new Object[]{
+ new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT)} );
+ addCombinationValues( "destinationType", new Object[]{
+ new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE),
+ new Byte(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TEMP_TOPIC_TYPE)
+ } );
+ }
+
+ public void testMessagesInSubscriptionPendingListExpire() throws Exception {
+
+ // Start a producer and consumer
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ destination = createDestinationInfo(connection, connectionInfo, destinationType);
+
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setPrefetchSize(1);
+ connection.send(consumerInfo);
+
+ // m1 and m3 should not expire.. but the others should.
+ Message m1 = createMessage(producerInfo, destination, deliveryMode);
+ connection.send(m1);
+ connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
+ Message m3 = createMessage(producerInfo, destination, deliveryMode);
+ connection.send(m3);
+ connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
+
+ // Make sure only 1 message was delivered due to prefetch == 1
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m1.getMessageId(), m.getMessageId());
+ assertNoMessagesLeft(connection);
+
+ // Sleep before we ack so that the messages expire on the pending list..
+ Thread.sleep(1500);
+ connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // 2nd message received should be m3.. it should have expired 2nd message sent.
+ m = receiveMessage(connection);
+ assertNotNull(m);
+ assertEquals(m3.getMessageId(), m.getMessageId());
+ connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
+
+ // And there should be no messages left now..
+ assertNoMessagesLeft(connection);
+
+ connection.send(closeConnectionInfo(connectionInfo));
+ }
+
+ public static Test suite() {
+ return suite(MessageExpirationTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
|