activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-589 https://issues.apache.org/jira/browse/AMQCPP-588
Date Mon, 30 Nov 2015 19:14:03 GMT
Repository: activemq-cpp
Updated Branches:
  refs/heads/3.9.x c9b9ef120 -> f0ecb22d8


https://issues.apache.org/jira/browse/AMQCPP-589
https://issues.apache.org/jira/browse/AMQCPP-588

Fix NPE error.
Update onMessage processing to always incluide the error cause exception
so that the poison cause in DLQ contains the original error message.
(cherry picked from commit ce523a140112d4d654e8ae660893e09bf9ffd7c3)


Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/f0ecb22d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/f0ecb22d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/f0ecb22d

Branch: refs/heads/3.9.x
Commit: f0ecb22d8972f4849819a588e2a707192b39e8eb
Parents: c9b9ef1
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Nov 23 12:15:55 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Nov 30 14:09:02 2015 -0500

----------------------------------------------------------------------
 .../core/kernels/ActiveMQConsumerKernel.cpp     |  32 +-
 activemq-cpp/src/test-integration/Makefile.am   |   2 +
 .../src/test-integration/TestRegistry.cpp       |   2 +
 .../OpenWireMessageListenerRedeliveryTest.cpp   | 436 +++++++++++++++++++
 .../OpenWireMessageListenerRedeliveryTest.h     |  56 +++
 5 files changed, 525 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f0ecb22d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index 25a08c5..c54cac0 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -482,6 +482,24 @@ namespace {
     };
 
     /**
+     * ActiveMQAckHandler used to support Managed Acknowledge modes.
+     */
+    class NoOpAckHandler : public ActiveMQAckHandler {
+    private:
+
+        NoOpAckHandler(const NoOpAckHandler&);
+        NoOpAckHandler& operator=(const NoOpAckHandler&);
+
+    public:
+
+        NoOpAckHandler() {
+        }
+
+        void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
+        }
+    };
+
+    /**
      * ActiveMQAckHandler used to support Client Acknowledge mode.
      */
     class ClientAckHandler : public ActiveMQAckHandler {
@@ -1520,9 +1538,14 @@ void ActiveMQConsumerKernel::rollback() {
                 Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
                                         this->internal->deliveredMessages.size()));
                 ack->setFirstMessageId(firstMsgId);
+
                 std::string message = "Exceeded RedeliveryPolicy max redelivery limit: "
+
-                                       Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries())
+
-                                       " cause: " + lastMsg->getRollbackCause().getMessage();
+                                       Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
+                if (!lastMsg->getRollbackCause().getMessage().empty()) {
+                    message.append(" cause: Exception -> ");
+                    message.append(lastMsg->getRollbackCause().getMessage());
+                }
+
                 ack->setPoisonCause(internal->createBrokerError(message));
                 session->sendAck(ack, true);
                 // Adjust the window size.
@@ -1622,9 +1645,9 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>&
dispatch)
                                 }
                                 afterMessageIsConsumed(dispatch, expired);
                             } catch (RuntimeException& e) {
+                                dispatch->setRollbackCause(e);
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() ||
session->isIndividualAcknowledge()) {
                                     // Schedule redelivery and possible DLQ processing
-                                    dispatch->setRollbackCause(e);
                                     rollback();
                                 } else {
                                     // Transacted or Client ack: Deliver the next message.
@@ -1711,6 +1734,9 @@ Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDi
         } else if (session->isIndividualAcknowledge()) {
             Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch));
             message->setAckHandler(ackHandler);
+        } else {
+            Pointer<ActiveMQAckHandler> ackHandler(new NoOpAckHandler());
+            message->setAckHandler(ackHandler);
         }
 
         return message.dynamicCast<cms::Message>();

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f0ecb22d/activemq-cpp/src/test-integration/Makefile.am
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/Makefile.am b/activemq-cpp/src/test-integration/Makefile.am
index 273c4d4..38e31bf 100644
--- a/activemq-cpp/src/test-integration/Makefile.am
+++ b/activemq-cpp/src/test-integration/Makefile.am
@@ -35,6 +35,7 @@ cc_sources = \
     activemq/test/TransactionTest.cpp \
     activemq/test/VirtualTopicTest.cpp \
     activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp \
+    activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp \
     activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp \
     activemq/test/openwire/OpenwireAdvisorysTest.cpp \
     activemq/test/openwire/OpenwireAsyncSenderTest.cpp \
@@ -98,6 +99,7 @@ h_sources = \
     activemq/test/TransactionTest.h \
     activemq/test/VirtualTopicTest.h \
     activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h \
+    activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h \
     activemq/test/openwire/OpenWireRedeliveryPolicyTest.h \
     activemq/test/openwire/OpenwireAdvisorysTest.h \
     activemq/test/openwire/OpenwireAsyncSenderTest.h \

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f0ecb22d/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index deaf816..5ac2c56 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -28,6 +28,7 @@
 #include "activemq/test/openwire/OpenwireJmsMessageGroupsTest.h"
 #include "activemq/test/openwire/OpenwireJmsRecoverTest.h"
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
+#include "activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h"
 #include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
 #include "activemq/test/openwire/OpenwireMapMessageTest.h"
 #include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h"
@@ -68,6 +69,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsMessageGro
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsRecoverTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest
);
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireMessageListenerRedeliveryTest
);
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest
);
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f0ecb22d/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp
new file mode 100644
index 0000000..8a77c48
--- /dev/null
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireMessageListenerRedeliveryTest.h"
+
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+
+#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/ArrayList.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+#include <memory>
+
+using namespace cms;
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    static std::string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
+
+    cms::Connection* createConnection(const std::string& brokerUri) {
+
+        ActiveMQConnectionFactory factory(brokerUri);
+
+        factory.getRedeliveryPolicy()->setInitialRedeliveryDelay(0);
+        factory.getRedeliveryPolicy()->setRedeliveryDelay(1000);
+        factory.getRedeliveryPolicy()->setMaximumRedeliveries(3);
+        factory.getRedeliveryPolicy()->setBackOffMultiplier((short) 2);
+        factory.getRedeliveryPolicy()->setUseExponentialBackOff(true);
+
+        return factory.createConnection();
+    }
+
+    class TestMessageListener : public cms::MessageListener {
+    private:
+
+        int counter;
+        cms::Session* session;
+
+    public:
+
+        TestMessageListener(cms::Session* session) : counter(0), session(session) {}
+
+        int getCounter() {
+            return counter;
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+
+            try {
+                counter++;
+
+                if (counter <= 4) {
+                    session->rollback();
+                } else {
+                    message->acknowledge();
+                    session->commit();
+                }
+            } catch (CMSException& e) {
+            }
+        }
+    };
+
+    class ExceptionMessageListener : public cms::MessageListener {
+    private:
+
+        CountDownLatch doneLatch;
+        ArrayList<std::string> received;
+        std::string testName;
+        int maxDeliveries;
+        int count;
+
+    public:
+
+        ExceptionMessageListener(const std::string& testName, int messageCount, int maxDeliveries)
:
+            doneLatch(messageCount),
+            received(),
+            testName(testName),
+            maxDeliveries(maxDeliveries),
+            count(0) {
+        }
+
+        int getCount() {
+            return count;
+        }
+
+        ArrayList<std::string>& getReceived() {
+            return received;
+        }
+
+        bool await(long long timeout, const TimeUnit& unit) {
+            return doneLatch.await(timeout, unit);
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                const TextMessage* textMessage = dynamic_cast<const cms::TextMessage*>(message);
+                received.add(textMessage->getText());
+            } catch (cms::CMSException& e) {
+                e.printStackTrace();
+            }
+
+            if (++count < maxDeliveries) {
+                throw decaf::lang::exceptions::RuntimeException(
+                    __FILE__, __LINE__, testName.append(" forced a redelivery").c_str());
+            }
+
+            // new blood
+            count = 0;
+            doneLatch.countDown();
+        }
+    };
+
+    class TrackingMessageListener : public cms::MessageListener {
+    private:
+
+        CountDownLatch doneLatch;
+        ArrayList< Pointer<cms::Message> > received;
+        std::string testName;
+        int count;
+
+    public:
+
+        TrackingMessageListener(const std::string& testName) :
+            doneLatch(1),
+            received(),
+            testName(testName),
+            count(0) {
+        }
+
+        TrackingMessageListener(const std::string& testName, int expected) :
+            doneLatch(expected),
+            received(),
+            testName(testName),
+            count(0) {
+        }
+
+        int getCount() {
+            return received.size();
+        }
+
+        ArrayList< Pointer<cms::Message> >& getReceived() {
+            return received;
+        }
+
+        bool await(long long timeout, const TimeUnit& unit) {
+            return doneLatch.await(timeout, unit);
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                Pointer<cms::Message> copy(message->clone());
+                received.add(copy);
+                doneLatch.countDown();
+            } catch (cms::CMSException& e) {
+                e.printStackTrace();
+            }
+        }
+    };
+
+    class FailingMessageListener : public cms::MessageListener {
+    private:
+
+        CountDownLatch doneLatch;
+        cms::Session* session;
+        std::string testName;
+
+    public:
+
+        FailingMessageListener(cms::Session* session, const std::string& testName, int
expected) :
+            doneLatch(expected),
+            session(session),
+            testName(testName) {
+        }
+
+        bool await(long long timeout, const TimeUnit& unit) {
+            return doneLatch.await(timeout, unit);
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                doneLatch.countDown();
+                if (session->isTransacted()) {
+                    session->rollback();
+                }
+            } catch (cms::CMSException& e) {
+                e.printStackTrace();
+            }
+
+            throw decaf::lang::exceptions::RuntimeException(
+                __FILE__, __LINE__, (testName + " forced a redelivery").c_str());
+        }
+    };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireMessageListenerRedeliveryTest::OpenWireMessageListenerRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireMessageListenerRedeliveryTest::~OpenWireMessageListenerRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireMessageListenerRedeliveryTest::testQueueRollbackConsumerListener() {
+
+    std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
+    connection->start();
+
+    std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+    std::auto_ptr<Queue> queue(session->createQueue("testQueueRollbackConsumerListener"));
+    std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
+    std::auto_ptr<Message> message(session->createTextMessage("Hello"));
+
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
+    amqConnection->destroyDestination(queue.get());
+
+    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
+    producer->send(message.get());
+    session->commit();
+
+    std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+
+    TestMessageListener listener(session.get());
+    consumer->setMessageListener(&listener);
+
+    TimeUnit::MILLISECONDS.sleep(500);
+
+    // first try.. should get 2 since there is no delay on the first redelivery.
+    CPPUNIT_ASSERT_EQUAL(2, listener.getCounter());
+
+    TimeUnit::MILLISECONDS.sleep(1000);
+
+    // 2nd redeliver (redelivery after 1 sec)
+    CPPUNIT_ASSERT_EQUAL(3, listener.getCounter());
+
+    TimeUnit::MILLISECONDS.sleep(2000);
+
+    // 3rd redeliver (redelivery after 2 seconds) - it should give up after that
+    CPPUNIT_ASSERT_EQUAL(4, listener.getCounter());
+
+    // create new message
+    std::auto_ptr<Message> secondMessage(session->createTextMessage("Hello 2"));
+    producer->send(secondMessage.get());
+    session->commit();
+
+    TimeUnit::MILLISECONDS.sleep(500);
+
+    // it should be committed, so no redelivery
+    CPPUNIT_ASSERT_EQUAL(5, listener.getCounter());
+
+    TimeUnit::MILLISECONDS.sleep(1500);
+
+    // no redelivery, counter should still be 4
+    CPPUNIT_ASSERT_EQUAL(5, listener.getCounter());
+
+    connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionRetry() {
+
+    std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
+    connection->start();
+
+    std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+    std::auto_ptr<Queue> queue(session->createQueue("testQueueSessionListenerExceptionRetry"));
+    std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
+    std::auto_ptr<Message> message1(session->createTextMessage("1"));
+    std::auto_ptr<Message> message2(session->createTextMessage("2"));
+
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
+    amqConnection->destroyDestination(queue.get());
+
+    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
+    producer->send(message1.get());
+    producer->send(message2.get());
+
+    std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+
+    int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
+
+    ExceptionMessageListener listener("testQueueSessionListenerExceptionRetry", 2, maxDeliveries);
+    consumer->setMessageListener(&listener);
+
+    CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(30, TimeUnit::SECONDS));
+
+    for (int i = 0; i < maxDeliveries; i++) {
+        CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i),
+            std::string("1"), listener.getReceived().get(i));
+    }
+
+    for (int i = maxDeliveries; i < maxDeliveries * 2; i++) {
+        CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i),
+            std::string("2"), listener.getReceived().get(i));
+    }
+
+    connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionDlq() {
+
+    const std::string TEST_NAME = "testQueueSessionListenerExceptionDlq";
+
+    std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
+    connection->start();
+
+    std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+    std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME));
+    std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
+
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
+    amqConnection->destroyDestination(queue.get());
+    amqConnection->destroyDestination(dlq.get());
+
+    std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
+    std::auto_ptr<Message> message(session->createTextMessage("1"));
+    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
+    producer->send(message.get());
+
+    // Track messages going to DLQ
+    TrackingMessageListener dlqListener(TEST_NAME);
+    std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
+    dlqConsumer->setMessageListener(&dlqListener);
+
+    // Receive and throw
+    int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
+    FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries);
+    std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+    consumer->setMessageListener(&listener);
+
+    CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS));
+
+    // check DLQ
+    CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS));
+
+    // check DLQ message cause is captured
+    Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0);
+    CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL);
+    String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+
+    CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception"));
+    CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME));
+    CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException"));
+    CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+
+    connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireMessageListenerRedeliveryTest::testTransactedQueueSessionListenerExceptionDlq()
{
+
+    const std::string TEST_NAME = "testTransactedQueueSessionListenerExceptionDlq";
+
+    std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
+    connection->start();
+
+    std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+    std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME));
+    std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
+
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
+    amqConnection->destroyDestination(queue.get());
+    amqConnection->destroyDestination(dlq.get());
+
+    std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
+    std::auto_ptr<Message> message(session->createTextMessage("1"));
+    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
+    producer->send(message.get());
+    session->commit();
+
+    // Track messages going to DLQ
+    TrackingMessageListener dlqListener(TEST_NAME);
+    std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
+    dlqConsumer->setMessageListener(&dlqListener);
+
+    // Receive and throw
+    int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
+    FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries);
+    std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+    consumer->setMessageListener(&listener);
+
+    CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS));
+
+    // check DLQ
+    CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS));
+
+    // check DLQ message cause is captured
+    Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0);
+    CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL);
+    String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+
+    CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception"));
+    CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME));
+    CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException"));
+    CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy"));
+
+    connection->close();
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f0ecb22d/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h
new file mode 100644
index 0000000..52aa83f
--- /dev/null
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_
+#define ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenWireMessageListenerRedeliveryTest : public CMSTestFixture {
+    private:
+
+        CPPUNIT_TEST_SUITE( OpenWireMessageListenerRedeliveryTest );
+        CPPUNIT_TEST( testQueueRollbackConsumerListener );
+        CPPUNIT_TEST( testQueueSessionListenerExceptionRetry );
+        CPPUNIT_TEST( testQueueSessionListenerExceptionDlq );
+        CPPUNIT_TEST( testTransactedQueueSessionListenerExceptionDlq );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenWireMessageListenerRedeliveryTest();
+        virtual ~OpenWireMessageListenerRedeliveryTest();
+
+        virtual std::string getBrokerURL() const {
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+        }
+
+        void testQueueRollbackConsumerListener();
+        void testQueueSessionListenerExceptionRetry();
+        void testQueueSessionListenerExceptionDlq();
+        void testTransactedQueueSessionListenerExceptionDlq();
+
+    };
+
+}}}
+
+#endif /* ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_ */


Mime
View raw message