activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r646340 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
Date Wed, 09 Apr 2008 13:38:49 GMT
Author: rajdavies
Date: Wed Apr  9 06:38:47 2008
New Revision: 646340

URL: http://svn.apache.org/viewvc?rev=646340&view=rev
Log:
Applied patch from https://issues.apache.org/activemq/browse/AMQ-1640

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
  (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java?rev=646340&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
Wed Apr  9 06:38:47 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class MessageListenerDeadLetterTest extends DeadLetterTestSupport {
+    private static final Log LOG = LogFactory
+            .getLog(MessageListenerDeadLetterTest.class);
+
+    private int rollbackCount;
+
+    private Session dlqSession;
+
+    private final Error[] error = new Error[1];
+
+    protected void doTest() throws Exception {
+        messageCount = 200;
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        int maxRollbacks = messageCount * rollbackCount;
+        consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount));
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(4000);
+            if (error[0] != null) {
+                // error from message listener
+                throw error[0];
+            }
+            assertMessage(msg, i);
+            assertNotNull("Should be a DLQ message for loop: " + i, msg);
+        }
+        if (error[0] != null) {
+            throw error[0];
+        }
+    }
+
+    protected void makeDlqConsumer() throws JMSException {
+        dlqDestination = createDlqDestination();
+
+        LOG.info("Consuming from dead letter on: " + dlqDestination);
+        dlqConsumer = dlqSession.createConsumer(dlqDestination);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        transactedMode = true;
+        super.setUp();
+        dlqSession = connection.createSession(transactedMode, acknowledgeMode);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        dlqConsumer.close();
+        dlqSession.close();
+        session.close();
+        super.tearDown();
+    };
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setInitialRedeliveryDelay(0);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+    class RollbackMessageListener implements MessageListener {
+
+        final int maxRollbacks;
+
+        final int deliveryCount;
+
+        AtomicInteger rollbacks = new AtomicInteger();
+
+        RollbackMessageListener(int c, int delvery) {
+            maxRollbacks = c;
+            deliveryCount = delvery;
+        }
+
+        public void onMessage(Message message) {
+            try {
+                int expectedMessageId = rollbacks.get() / deliveryCount;
+                LOG.info("expecting messageId: " + expectedMessageId);
+                assertMessage(message, expectedMessageId);
+                if (rollbacks.incrementAndGet() > maxRollbacks) {
+                    fail("received too many messages, already done too many rollbacks: "
+                            + rollbacks);
+                }
+                session.rollback();
+
+            } catch (Throwable e) {
+                LOG.error("unexpected exception:" + e, e);
+                // propagating assertError to execution task will cause a hang
+                // at shutdown
+                if (e instanceof Error) {
+                    error[0] = (Error) e;
+                } else {
+                    fail("unexpected exception: " + e);
+                }
+
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message