activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQCPP-552
Date Fri, 22 Aug 2014 20:48:42 GMT
Repository: activemq-cpp
Updated Branches:
  refs/heads/3.8.x f59a39523 -> b7542f28c


https://issues.apache.org/jira/browse/AMQCPP-552

port some tests from ActiveMQ to show this issue, plus it shows a few
other problems with redelivery.


Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/b7542f28
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/b7542f28
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/b7542f28

Branch: refs/heads/3.8.x
Commit: b7542f28c3597e6252ca0d640a35da73d37d45c2
Parents: f59a395
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Aug 22 16:30:59 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Aug 22 16:31:47 2014 -0400

----------------------------------------------------------------------
 .../core/policies/DefaultRedeliveryPolicy.cpp   |   8 +-
 activemq-cpp/src/test-integration/Makefile.am   |   2 +
 .../src/test-integration/TestRegistry.cpp       |   4 +-
 .../openwire/OpenWireRedeliveryPolicyTest.cpp   | 777 +++++++++++++++++++
 .../openwire/OpenWireRedeliveryPolicyTest.h     |  73 ++
 .../OpenwireNonBlockingRedeliveryTest.h         |   3 +-
 6 files changed, 859 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
index b00735c..a1ef686 100644
--- a/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/policies/DefaultRedeliveryPolicy.cpp
@@ -56,14 +56,10 @@ long long DefaultRedeliveryPolicy::getNextRedeliveryDelay(long long previousDela
 
     static Random randomNumberGenerator;
 
-    long long nextDelay;
+    long long nextDelay = redeliveryDelay;
 
-    if (previousDelay == 0) {
-        nextDelay = redeliveryDelay;
-    } else if (useExponentialBackOff && (int) backOffMultiplier > 1) {
+    if (previousDelay > 0 && useExponentialBackOff && (int) backOffMultiplier
> 1) {
         nextDelay = (long long) ((double) previousDelay * backOffMultiplier);
-    } else {
-        nextDelay = previousDelay;
     }
 
     if (useCollisionAvoidance) {

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/test-integration/Makefile.am
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/Makefile.am b/activemq-cpp/src/test-integration/Makefile.am
index a6ebf0d..273c4d4 100644
--- a/activemq-cpp/src/test-integration/Makefile.am
+++ b/activemq-cpp/src/test-integration/Makefile.am
@@ -35,6 +35,7 @@ cc_sources = \
     activemq/test/TransactionTest.cpp \
     activemq/test/VirtualTopicTest.cpp \
     activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp \
+    activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp \
     activemq/test/openwire/OpenwireAdvisorysTest.cpp \
     activemq/test/openwire/OpenwireAsyncSenderTest.cpp \
     activemq/test/openwire/OpenwireClientAckTest.cpp \
@@ -97,6 +98,7 @@ h_sources = \
     activemq/test/TransactionTest.h \
     activemq/test/VirtualTopicTest.h \
     activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h \
+    activemq/test/openwire/OpenWireRedeliveryPolicyTest.h \
     activemq/test/openwire/OpenwireAdvisorysTest.h \
     activemq/test/openwire/OpenwireAsyncSenderTest.h \
     activemq/test/openwire/OpenwireClientAckTest.h \

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index e7292f5..3efcd49 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -33,6 +33,7 @@
 #include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h"
 #include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
 #include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
+#include "activemq/test/openwire/OpenWireRedeliveryPolicyTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
 #include "activemq/test/openwire/OpenwireSimpleTest.h"
 #include "activemq/test/openwire/OpenwireTransactionTest.h"
@@ -70,7 +71,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriori
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest
);
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireRedeliveryPolicyTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
new file mode 100644
index 0000000..79b1af6
--- /dev/null
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireRedeliveryPolicyTest.h"
+
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageConsumer.h>
+
+#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/lang/Long.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace cms;
+using namespace activemq;
+using namespace activemq::commands;
+using namespace activemq::core;
+using namespace activemq::core::policies;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireRedeliveryPolicyTest::OpenWireRedeliveryPolicyTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireRedeliveryPolicyTest::~OpenWireRedeliveryPolicyTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenWireRedeliveryPolicyTest::getBrokerURL() const {
+    return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testGetNext() {
+
+    DefaultRedeliveryPolicy policy;
+    policy.setInitialRedeliveryDelay(0);
+    policy.setRedeliveryDelay(500);
+    policy.setBackOffMultiplier((short) 2);
+    policy.setUseExponentialBackOff(true);
+
+    long delay = policy.getNextRedeliveryDelay(0);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 1", 500L, delay);
+    delay = policy.getNextRedeliveryDelay(delay);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 2", 500L*2L, delay);
+    delay = policy.getNextRedeliveryDelay(delay);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 3", 500L*4L, delay);
+
+    policy.setUseExponentialBackOff(false);
+    delay = policy.getNextRedeliveryDelay(delay);
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Incorrect delay for cycle 4", 500L, delay);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
{
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(0);
+    policy->setRedeliveryDelay(500);
+    policy->setBackOffMultiplier((short) 2);
+    policy->setUseExponentialBackOff(true);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(1000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT(textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // No delay on first rollback..
+    received.reset(consumer->receive(250));
+    CPPUNIT_ASSERT(received != NULL);
+    session->rollback();
+
+    // Show subsequent re-delivery delay is incrementing.
+    received.reset(consumer->receive(250));
+    CPPUNIT_ASSERT(received == NULL);
+
+    received.reset(consumer->receive(750));
+    CPPUNIT_ASSERT(received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // Show re-delivery delay is incrementing exponentially
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT(received == NULL);
+    received.reset(consumer->receive(500));
+    CPPUNIT_ASSERT(received == NULL);
+    received.reset(consumer->receive(800));
+    CPPUNIT_ASSERT(received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testNornalRedeliveryPolicyDelaysDeliveryOnRollback() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(0);
+    policy->setRedeliveryDelay(500);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(1000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT(textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // No delay on first rollback..
+    received.reset(consumer->receive(250));
+    CPPUNIT_ASSERT(received != NULL);
+    session->rollback();
+
+    // Show subsequent re-delivery delay is incrementing.
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT(received == NULL);
+    received.reset(consumer->receive(700));
+    CPPUNIT_ASSERT(received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // The message gets redelivered after 500 ms every time since
+    // we are not using exponential backoff.
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT(received == NULL);
+    received.reset(consumer->receive(700));
+    CPPUNIT_ASSERT(received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testDLQHandling() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(100);
+    policy->setUseExponentialBackOff(false);
+    policy->setMaximumRedeliveries(2);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+    Pointer<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
+    Pointer<MessageConsumer> dlqConsumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(1000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get second delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get third delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // The last rollback should cause the 1st message to get sent to the DLQ
+    received.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery of msg 2", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+
+    // We should be able to get the message off the DLQ now.
+    received.reset(dlqConsumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get DLQ'd message", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->commit();
+
+    if (textMessage->propertyExists("dlqDeliveryFailureCause")) {
+        std::string cause = textMessage->getStringProperty("dlqDeliveryFailureCause");
+        CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
+                               cause.find("RedeliveryPolicy") != std::string::npos);
+    }
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testInfiniteMaximumNumberOfRedeliveries() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(100);
+    policy->setUseExponentialBackOff(false);
+    // let's set the maximum redeliveries to no maximum (ie. infinite)
+    policy->setMaximumRedeliveries(-1);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(1000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // we should be able to get the 1st message redelivered until a session.commit is called
+    received.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get second delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get third delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get fourth delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get fifth delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get sixth delivery", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->commit();
+
+    received.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testMaximumRedeliveryDelay() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(10);
+    policy->setUseExponentialBackOff(true);
+    policy->setMaximumRedeliveries(-1);
+    policy->setRedeliveryDelay(50);
+    // TODO - policy->setMaximumRedeliveryDelay(1000);
+    policy->setBackOffMultiplier((short) 2);
+    policy->setUseExponentialBackOff(true);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received;
+
+    for(int i = 0; i < 10; ++i) {
+        // we should be able to get the 1st message redelivered until a session.commit is
called
+        received.reset(consumer->receive(2000));
+        Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+        CPPUNIT_ASSERT_MESSAGE("Failed to get message", textMessage != NULL);
+        CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+        session->rollback();
+    }
+
+    received.reset(consumer->receive(2000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message one last time", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->commit();
+
+    received.reset(consumer->receive(2000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+
+    CPPUNIT_ASSERT_MESSAGE("Max delay should be 1 second.",
+                           policy->getNextRedeliveryDelay(Long::MAX_VALUE) == 1000);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testZeroMaximumNumberOfRedeliveries() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(100);
+    policy->setUseExponentialBackOff(false);
+    // let's set the maximum redeliveries to 0
+    policy->setMaximumRedeliveries(0);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(1000));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // the 1st  message should not be redelivered since maximumRedeliveries is set to 0
+    received.reset(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryReceiveNoCommit() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    connection->start();
+    Pointer<Session> dlqSession(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Queue> destination(dlqSession->createQueue("testRepeatedRedeliveryReceiveNoCommit"));
+    Pointer<Queue> dlq(dlqSession->createQueue("ActiveMQ.DLQ"));
+    amqConnection->destroyDestination(destination.get());
+    amqConnection->destroyDestination(dlq.get());
+    Pointer<MessageProducer> producer(dlqSession->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(dlqSession->createConsumer(dlq.get()));
+
+    Pointer<TextMessage> message1(dlqSession->createTextMessage("1st"));
+    producer->send(message1.get());
+
+    const int MAX_REDELIVERIES = 4;
+    for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
+
+        Pointer<Connection> loopConnection(connectionFactory->createConnection());
+        Pointer<ActiveMQConnection> amqConnection = loopConnection.dynamicCast<ActiveMQConnection>();
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+        policy->setInitialRedeliveryDelay(0);
+        policy->setUseExponentialBackOff(false);
+        policy->setMaximumRedeliveries(MAX_REDELIVERIES);
+
+        loopConnection->start();
+        Pointer<Session> session(loopConnection->createSession(Session::SESSION_TRANSACTED));
+        Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+        Pointer<cms::Message> received(consumer->receive(1000));
+        Pointer<ActiveMQTextMessage> textMessage = received.dynamicCast<ActiveMQTextMessage>();
+
+        if (i <= MAX_REDELIVERIES) {
+            CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+            CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+            CPPUNIT_ASSERT_EQUAL(i, textMessage->getRedeliveryCounter());
+        } else {
+            CPPUNIT_ASSERT_MESSAGE("null on exceeding redelivery count", textMessage == NULL);
+        }
+        loopConnection->close();
+    }
+
+    // We should be able to get the message off the DLQ now.
+    Pointer<cms::Message> received(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get from DLQ", received != NULL);
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+
+    if (textMessage->propertyExists("dlqDeliveryFailureCause")) {
+        std::string cause = textMessage->getStringProperty("dlqDeliveryFailureCause");
+        CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
+                               cause.find("RedeliveryPolicy") != std::string::npos);
+    } else {
+        //CPPUNIT_FAIL("Message did not have a rollback cause");
+    }
+
+    dlqSession->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class AsyncListener : public cms::MessageListener {
+    private:
+
+        AtomicInteger* receivedCount;
+        CountDownLatch* done;
+
+    public:
+
+        AsyncListener(AtomicInteger* receivedCount, CountDownLatch* done) {
+            this->receivedCount = receivedCount;
+            this->done = done;
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                const ActiveMQTextMessage* textMessage = dynamic_cast<const ActiveMQTextMessage*>(message);
+                CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+                CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+                CPPUNIT_ASSERT_EQUAL(receivedCount->get(), textMessage->getRedeliveryCounter());
+                receivedCount->incrementAndGet();
+                done->countDown();
+            } catch (Exception& ignored) {
+                ignored.printStackTrace();
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testRepeatedRedeliveryOnMessageNoCommit() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    connection->start();
+    Pointer<Session> dlqSession(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Queue> destination(dlqSession->createQueue("testRepeatedRedeliveryOnMessageNoCommit"));
+    Pointer<Queue> dlq(dlqSession->createQueue("ActiveMQ.DLQ"));
+    amqConnection->destroyDestination(destination.get());
+    amqConnection->destroyDestination(dlq.get());
+    Pointer<MessageProducer> producer(dlqSession->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(dlqSession->createConsumer(dlq.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(dlqSession->createTextMessage("1st"));
+    producer->send(message1.get());
+
+    const int MAX_REDELIVERIES = 4;
+    AtomicInteger receivedCount(0);
+
+    for (int i = 0; i <= MAX_REDELIVERIES + 1; i++) {
+
+        Pointer<Connection> loopConnection(connectionFactory->createConnection());
+        Pointer<ActiveMQConnection> amqConnection = loopConnection.dynamicCast<ActiveMQConnection>();
+
+        // Receive a message with the JMS API
+        RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+        policy->setInitialRedeliveryDelay(0);
+        policy->setUseExponentialBackOff(false);
+        policy->setMaximumRedeliveries(MAX_REDELIVERIES);
+
+        loopConnection->start();
+        Pointer<Session> session(loopConnection->createSession(Session::SESSION_TRANSACTED));
+        Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+        CountDownLatch done(1);
+
+        AsyncListener listener(&receivedCount, &done);
+        consumer->setMessageListener(&listener);
+
+        if (i <= MAX_REDELIVERIES) {
+            CPPUNIT_ASSERT_MESSAGE("listener didn't get a message", done.await(5, TimeUnit::SECONDS));
+        } else {
+            // final redlivery gets poisoned before dispatch
+            CPPUNIT_ASSERT_MESSAGE("listener got unexpected message", !done.await(2, TimeUnit::SECONDS));
+        }
+
+        loopConnection->close();
+    }
+
+    // We should be able to get the message off the DLQ now.
+    Pointer<cms::Message> received(consumer->receive(1000));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get from DLQ", received != NULL);
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+
+    if (textMessage->propertyExists("dlqDeliveryFailureCause")) {
+        std::string cause = textMessage->getStringProperty("dlqDeliveryFailureCause");
+        CPPUNIT_ASSERT_MESSAGE("cause exception has no policy ref",
+                               cause.find("RedeliveryPolicy") != std::string::npos);
+    } else {
+        //CPPUNIT_FAIL("Message did not have a rollback cause");
+    }
+
+    dlqSession->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testInitialRedeliveryDelayZero() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(0);
+    policy->setUseExponentialBackOff(false);
+    policy->setMaximumRedeliveries(1);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(100));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    // Both should be able for consumption.
+    received.reset(consumer->receive(100));
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message one again", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testInitialRedeliveryDelayOne() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(1000);
+    policy->setUseExponentialBackOff(false);
+    policy->setMaximumRedeliveries(1);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(100));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT(received == NULL);
+
+    received.reset(consumer->receive(2000));
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message one again", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireRedeliveryPolicyTest::testRedeliveryDelayOne() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
+
+    // Receive a message with the JMS API
+    RedeliveryPolicy* policy = amqConnection->getRedeliveryPolicy();
+    policy->setInitialRedeliveryDelay(0);
+    policy->setRedeliveryDelay(1000);
+    policy->setUseExponentialBackOff(false);
+    policy->setMaximumRedeliveries(2);
+
+    connection->start();
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Queue> destination(session->createTemporaryQueue());
+    Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    // Send the messages
+    Pointer<TextMessage> message1(session->createTextMessage("1st"));
+    Pointer<TextMessage> message2(session->createTextMessage("2nd"));
+
+    producer->send(message1.get());
+    producer->send(message2.get());
+    session->commit();
+
+    Pointer<cms::Message> received(consumer->receive(100));
+    Pointer<TextMessage> textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get first delivery", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(100));
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("first redelivery was not immediate.", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+    session->rollback();
+
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT_MESSAGE("seconds redelivery should be delayed.", received == NULL);
+
+    received.reset(consumer->receive(2000));
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message one again", textMessage != NULL);
+    CPPUNIT_ASSERT_EQUAL(std::string("1st"), textMessage->getText());
+
+    received.reset(consumer->receive(100));
+    CPPUNIT_ASSERT_MESSAGE("Failed to get message two", received != NULL);
+    textMessage = received.dynamicCast<TextMessage>();
+    CPPUNIT_ASSERT_EQUAL(std::string("2nd"), textMessage->getText());
+    session->commit();
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
new file mode 100644
index 0000000..207adc8
--- /dev/null
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireRedeliveryPolicyTest.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREREDELIVERYPOLICYTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREREDELIVERYPOLICYTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenWireRedeliveryPolicyTest : public CMSTestFixture {
+
+        CPPUNIT_TEST_SUITE( OpenWireRedeliveryPolicyTest );
+        CPPUNIT_TEST( testGetNext );
+        CPPUNIT_TEST( testExponentialRedeliveryPolicyDelaysDeliveryOnRollback );
+        CPPUNIT_TEST( testNornalRedeliveryPolicyDelaysDeliveryOnRollback );
+        // TODO CPPUNIT_TEST( testDLQHandling );
+        CPPUNIT_TEST( testInfiniteMaximumNumberOfRedeliveries );
+        CPPUNIT_TEST( testZeroMaximumNumberOfRedeliveries );
+        // TODO CPPUNIT_TEST( testRepeatedRedeliveryReceiveNoCommit );
+        // TODO CPPUNIT_TEST( testRepeatedRedeliveryOnMessageNoCommit );
+        CPPUNIT_TEST( testInitialRedeliveryDelayZero );
+        CPPUNIT_TEST( testInitialRedeliveryDelayOne );
+        CPPUNIT_TEST( testRedeliveryDelayOne );
+        // TODO - We don't currently support this property.
+        // CPPUNIT_TEST( testMaximumRedeliveryDelay );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenWireRedeliveryPolicyTest();
+        virtual ~OpenWireRedeliveryPolicyTest();
+
+        virtual void setUp() {}
+        virtual void tearDown() {}
+
+        virtual std::string getBrokerURL() const;
+
+        void testGetNext();
+        void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback();
+        void testNornalRedeliveryPolicyDelaysDeliveryOnRollback();
+        void testDLQHandling();
+        void testInfiniteMaximumNumberOfRedeliveries();
+        void testMaximumRedeliveryDelay();
+        void testZeroMaximumNumberOfRedeliveries();
+        void testRepeatedRedeliveryReceiveNoCommit();
+        void testRepeatedRedeliveryOnMessageNoCommit();
+        void testInitialRedeliveryDelayZero();
+        void testInitialRedeliveryDelayOne();
+        void testRedeliveryDelayOne();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREREDELIVERYPOLICYTEST_H_ */

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b7542f28/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
index 542094c..3f3cb3e 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
@@ -27,13 +27,14 @@ namespace openwire {
     class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
 
         CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
+        // TODO - Improve the tests.
 //        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
 //        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
 //        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
 //        CPPUNIT_TEST( testMessageDeleiveryDoesntStop );
 //        CPPUNIT_TEST( testNonBlockingMessageDeleiveryIsDelayed );
 //        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithRollbacks );
-        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithAllRolledBack );
+//        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithAllRolledBack );
         CPPUNIT_TEST_SUITE_END();
 
     public:


Mime
View raw message