activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r360108 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/policy/
Date Fri, 30 Dec 2005 15:36:26 GMT
Author: jstrachan
Date: Fri Dec 30 07:36:19 2005
New Revision: 360108

URL: http://svn.apache.org/viewcvs?rev=360108&view=rev
Log:
added test cases to demonstrate shared and individual DLQ strategies; which highlight a bug
in the rollback logic in the client

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Dec 30 07:36:19 2005
@@ -175,13 +175,14 @@
                         Message message = node.getMessage();
                         if( message !=null ) {
                             
-                            // TODO is this meant to be == null?
-                            if( message.getOriginalDestination()!=null )
+                            // TODO is this meant to be == null - it was != ?
+                            if( message.getOriginalDestination()==null )
                                 message.setOriginalDestination(message.getDestination());
                             
                             ActiveMQDestination originalDestination = message.getOriginalDestination();
                             DeadLetterStrategy deadLetterStrategy = node.getRegionDestination().getDeadLetterStrategy();
-                            message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination));
+                            ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
+                            message.setDestination(deadLetterDestination);
                             
                             if( message.getOriginalTransactionId()!=null )
                                 message.setOriginalTransactionId(message.getTransactionId());

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Dec 30 07:36:19 2005
@@ -41,11 +41,17 @@
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
         }
+        if (deadLetterStrategy != null) {
+            queue.setDeadLetterStrategy(deadLetterStrategy);
+        }
     }
 
     public void configure(Topic topic) {
         if (dispatchPolicy != null) {
             topic.setDispatchPolicy(dispatchPolicy);
+        }
+        if (deadLetterStrategy != null) {
+            topic.setDeadLetterStrategy(deadLetterStrategy);
         }
         if (subscriptionRecoveryPolicy != null) {
             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java?rev=360108&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
Fri Dec 30 07:36:19 2005
@@ -0,0 +1,91 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.RedeliveryPolicy;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class DeadLetterTest extends DeadLetterTestSupport {
+
+    private int rollbackCount;
+
+    protected void doTest() throws Exception {
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        System.out.println("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        for (int i = 0; i < messageCount; i++) {
+            consumeAndRollback(i);
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(1000);
+            assertMessage(msg, i);
+            assertNotNull("Should be a DLQ message for loop: " + i, msg);
+        }
+    }
+
+    protected void consumeAndRollback(int messageCounter) throws Exception {
+        for (int i = 0; i < rollbackCount; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull("No message received for message: " + messageCounter + " and rollback
loop: " + i, message);
+            assertMessage(message, messageCounter);
+
+            session.rollback();
+        }
+        System.out.println("Rolled back: " + rollbackCount + " times");
+    }
+
+    protected void setUp() throws Exception {
+        transactedMode = true;
+        super.setUp();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setInitialRedeliveryDelay(10);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
Fri Dec 30 07:36:19 2005
@@ -18,9 +18,6 @@
 
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -30,6 +27,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 /**
@@ -49,6 +47,8 @@
     protected Destination dlqDestination;
     protected MessageConsumer dlqConsumer;
     protected BrokerService broker;
+    protected boolean transactedMode = false;
+    protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
 
     protected void setUp() throws Exception {
         super.setUp();
@@ -57,7 +57,7 @@
         connection = createConnection();
         connection.setClientID(toString());
 
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session = connection.createSession(transactedMode, acknowledgeMode);
         connection.start();
     }
 
@@ -80,6 +80,7 @@
 
     protected void makeConsumer() throws JMSException {
         Destination destination = getDestination();
+        System.out.println("Consuming from: " + destination);
         if (durableSubscriber) {
             consumer = session.createDurableSubscriber((Topic) destination, destination.toString());
         }
@@ -96,17 +97,34 @@
     }
 
     protected void sendMessages() throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         producer = session.createProducer(getDestination());
         producer.setDeliveryMode(deliveryMode);
         producer.setTimeToLive(timeToLive);
 
         System.out.println("Sending " + messageCount + " messages to: " + getDestination());
         for (int i = 0; i < messageCount; i++) {
-            Message message = session.createTextMessage("msg: " + i);
+            Message message = createMessage(session, i);
             producer.send(message);
         }
     }
 
+    protected TextMessage createMessage(Session session, int i) throws JMSException {
+        return session.createTextMessage(getMessageText(i));
+    }
+
+    protected String getMessageText(int i) {
+        return "message: " + i;
+    }
+
+    protected void assertMessage(Message message, int i) throws Exception {
+        System.out.println("Received message: " + message);
+        assertNotNull("No message received for index: " + i, message);
+        assertTrue("Should be a TextMessage not: " + message, message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+        assertEquals("text of message: " + i, getMessageText(i), textMessage .getText());
+    }
+    
     protected abstract Destination createDlqDestination();
 
     public void testTransientTopicMessage() throws Exception {
@@ -143,5 +161,4 @@
         }
         return destination;
     }
-
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java?rev=360108&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
Fri Dec 30 07:36:19 2005
@@ -0,0 +1,52 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import javax.jms.Destination;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class IndividualDeadLetterTest extends DeadLetterTest {
+
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ.Queue." + getClass().getName() + "." + getName());
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message