activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1461355 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ test/ test/activemq/core/
Date Tue, 26 Mar 2013 22:36:23 GMT
Author: tabish
Date: Tue Mar 26 22:36:22 2013
New Revision: 1461355

URL: http://svn.apache.org/r1461355
Log:
https://issues.apache.org/jira/browse/AMQCPP-367

Adds ConnectionAudit for use in dup detection and some more tests. 

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp  
(with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h   (with
props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
  (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Mar 26 22:36:22 2013
@@ -106,6 +106,7 @@ cc_sources = \
     activemq/core/ActiveMQXAConnectionFactory.cpp \
     activemq/core/ActiveMQXASession.cpp \
     activemq/core/AdvisoryConsumer.cpp \
+    activemq/core/ConnectionAudit.cpp \
     activemq/core/DispatchData.cpp \
     activemq/core/Dispatcher.cpp \
     activemq/core/FifoMessageDispatchChannel.cpp \
@@ -743,6 +744,7 @@ h_sources = \
     activemq/core/ActiveMQXAConnectionFactory.h \
     activemq/core/ActiveMQXASession.h \
     activemq/core/AdvisoryConsumer.h \
+    activemq/core/ConnectionAudit.h \
     activemq/core/DispatchData.h \
     activemq/core/Dispatcher.h \
     activemq/core/FifoMessageDispatchChannel.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Tue Mar 26 22:36:22 2013
@@ -23,6 +23,7 @@
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQConnectionMetaData.h>
 #include <activemq/core/AdvisoryConsumer.h>
+#include <activemq/core/ConnectionAudit.h>
 #include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
 #include <activemq/core/policies/DefaultPrefetchPolicy.h>
@@ -195,6 +196,8 @@ namespace core{
 
         TempDestinationMap activeTempDestinations;
 
+        ConnectionAudit connectionAudit;
+
         ConnectionConfig(const Pointer<transport::Transport> transport,
                          const Pointer<decaf::util::Properties> properties) :
                              properties(properties),
@@ -454,6 +457,8 @@ ActiveMQConnection::ActiveMQConnection(c
     configuration->connectionInfo->setManageable(true);
     configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
 
+    configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant());
+
     this->config = configuration.release();
 }
 

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp Tue
Mar 26 22:36:22 2013
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAudit.h"
+
+#include <decaf/util/LinkedHashMap.h>
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/commands/ActiveMQDestination.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+    class ConnectionAuditImpl {
+    private:
+
+        ConnectionAuditImpl(const ConnectionAuditImpl&);
+        ConnectionAuditImpl& operator= (const ConnectionAuditImpl&);
+
+    public:
+
+        Mutex mutex;
+        LinkedHashMap<Pointer<ActiveMQDestination>, Pointer<ActiveMQMessageAudit>
> destinations;
+        LinkedHashMap<Pointer<Dispatcher>, Pointer<ActiveMQMessageAudit> >
dispatchers;
+
+        ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000) {
+        }
+    };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit() : impl(new ConnectionAuditImpl),
+                                     checkForDuplicates(true),
+                                     auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+                                     auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT)
{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit(int auditDepth, int maxProducers) :
+    impl(new ConnectionAuditImpl),
+    checkForDuplicates(true),
+    auditDepth(auditDepth),
+    auditMaximumProducerNumber(maxProducers) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::~ConnectionAudit() {
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+    synchronized(&this->impl->mutex) {
+        this->impl->dispatchers.remove(dispatcher);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message>
message) {
+
+    if (checkForDuplicates && message != NULL) {
+        Pointer<ActiveMQDestination> destination = message->getDestination();
+        if (destination != NULL) {
+            if (destination->isQueue()) {
+                Pointer<ActiveMQMessageAudit> audit;
+                try {
+                    audit = this->impl->destinations.get(destination);
+                } catch (NoSuchElementException& ex) {
+                    audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
+                    this->impl->destinations.put(destination, audit);
+                }
+                bool result = audit->isDuplicate(message->getMessageId());
+                return result;
+            }
+            Pointer<ActiveMQMessageAudit> audit;
+            try {
+                audit = this->impl->dispatchers.get(dispatcher);
+            } catch (NoSuchElementException& ex) {
+                audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
+                this->impl->dispatchers.put(dispatcher, audit);
+            }
+            bool result = audit->isDuplicate(message->getMessageId());
+            return result;
+        }
+    }
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message>
message) {
+    if (checkForDuplicates && message != NULL) {
+        Pointer<ActiveMQDestination> destination = message->getDestination();
+        if (destination != NULL) {
+            if (destination->isQueue()) {
+                try {
+                    Pointer<ActiveMQMessageAudit> audit = this->impl->destinations.get(destination);
+                    audit->rollback(message->getMessageId());
+                } catch (NoSuchElementException& ex) {}
+            } else {
+                try {
+                    Pointer<ActiveMQMessageAudit> audit = this->impl->dispatchers.get(dispatcher);
+                    audit->rollback(message->getMessageId());
+                } catch (NoSuchElementException& ex) {}
+            }
+        }
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h Tue
Mar 26 22:36:22 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/core/Dispatcher.h>
+
+namespace activemq {
+namespace core {
+
+    class ConnectionAuditImpl;
+
+    /**
+     * Provides the Auditing functionality used by Connections to attempt to
+     * filter out duplicate Messages.
+     *
+     * @since 3.7.0
+     */
+    class AMQCPP_API ConnectionAudit {
+    private:
+
+        ConnectionAudit(const ConnectionAudit&);
+        ConnectionAudit& operator= (const ConnectionAudit&);
+
+    private:
+
+        ConnectionAuditImpl* impl;
+
+        bool checkForDuplicates;
+        int auditDepth;
+        int auditMaximumProducerNumber;
+
+    public:
+
+        ConnectionAudit();
+
+        ConnectionAudit(int auditDepth, int maxProducers);
+
+        ~ConnectionAudit();
+
+    public:
+
+        void removeDispatcher(Pointer<Dispatcher> dispatcher);
+
+        bool isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message>
message);
+
+        void rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message>
message);
+
+    public:
+
+        bool isCheckForDuplicates() const {
+            return this->checkForDuplicates;
+        }
+
+        void setCheckForDuplicates(bool checkForDuplicates) {
+            this->checkForDuplicates = checkForDuplicates;
+        }
+
+        int getAuditDepth() {
+            return auditDepth;
+        }
+
+        void setAuditDepth(int auditDepth) {
+            this->auditDepth = auditDepth;
+        }
+
+        int getAuditMaximumProducerNumber() {
+            return auditMaximumProducerNumber;
+        }
+
+        void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+            this->auditMaximumProducerNumber = auditMaximumProducerNumber;
+        }
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Tue Mar 26 22:36:22 2013
@@ -38,6 +38,7 @@ cc_sources = \
     activemq/core/ActiveMQConnectionTest.cpp \
     activemq/core/ActiveMQMessageAuditTest.cpp \
     activemq/core/ActiveMQSessionTest.cpp \
+    activemq/core/ConnectionAuditTest.cpp \
     activemq/core/FifoMessageDispatchChannelTest.cpp \
     activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
     activemq/exceptions/ActiveMQExceptionTest.cpp \
@@ -286,6 +287,7 @@ h_sources = \
     activemq/core/ActiveMQConnectionTest.h \
     activemq/core/ActiveMQMessageAuditTest.h \
     activemq/core/ActiveMQSessionTest.h \
+    activemq/core/ConnectionAuditTest.h \
     activemq/core/FifoMessageDispatchChannelTest.h \
     activemq/core/SimplePriorityMessageDispatchChannelTest.h \
     activemq/exceptions/ActiveMQExceptionTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
Tue Mar 26 22:36:22 2013
@@ -88,6 +88,58 @@ void ActiveMQMessageAuditTest::testIsDup
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackString() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    IdGenerator idGen;
+
+    ArrayList<std::string> list;
+    for (int i = 0; i < count; i++) {
+        std::string id = idGen.generateId();
+        list.add(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        std::string id = list.get(index);
+        CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+        audit.rollback(id);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id, !audit.isDuplicate(id));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackMessageId() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+
+    for (int i = 0; i < count; i++) {
+        Pointer<MessageId> id(new MessageId);
+        id->setProducerId(pid);
+        id->setProducerSequenceId(i);
+        list.add(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        Pointer<MessageId> id = list.get(index);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), audit.isDuplicate(id));
+        audit.rollback(id);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), !audit.isDuplicate(id));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQMessageAuditTest::testIsInOrderString() {
 
     int count = 10000;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
Tue Mar 26 22:36:22 2013
@@ -31,6 +31,8 @@ namespace core {
         CPPUNIT_TEST( testIsDuplicateMessageId );
         CPPUNIT_TEST( testIsInOrderString );
         CPPUNIT_TEST( testIsInOrderMessageId );
+        CPPUNIT_TEST( testRollbackString );
+        CPPUNIT_TEST( testRollbackMessageId );
         CPPUNIT_TEST( testGetLastSeqId );
         CPPUNIT_TEST_SUITE_END();
 
@@ -43,6 +45,8 @@ namespace core {
         void testIsDuplicateMessageId();
         void testIsInOrderString();
         void testIsInOrderMessageId();
+        void testRollbackString();
+        void testRollbackMessageId();
         void testGetLastSeqId();
 
     };

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAuditTest.h"
+
+#include <activemq/core/ConnectionAudit.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/util/IdGenerator.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQQueue.h>
+
+#include <decaf/util/ArrayList.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class MyDispatcher : public Dispatcher {
+    public:
+
+        virtual ~MyDispatcher() {}
+
+        virtual void dispatch(const Pointer<commands::MessageDispatch>& message)
{
+
+        }
+
+        virtual int getHashCode() const {
+            return 1;
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::~ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor1() {
+
+    ConnectionAudit audit;
+    CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+    CPPUNIT_ASSERT(audit.getAuditDepth() == ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE);
+    CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() == ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor2() {
+
+    ConnectionAudit audit(100, 200);
+    CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+    CPPUNIT_ASSERT(audit.getAuditDepth() == 100);
+    CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() == 200);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testIsDuplicate() {
+
+    int count = 10000;
+    ConnectionAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+    Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+
+    Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+    Pointer<Message> message(new Message());
+    message->setDestination(destination);
+
+    for (int i = 0; i < count; i++) {
+        Pointer<MessageId> id(new MessageId);
+        id->setProducerId(pid);
+        id->setProducerSequenceId(i);
+        list.add(id);
+
+        message->setMessageId(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        Pointer<MessageId> id = list.get(index);
+        message->setMessageId(id);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+                               audit.isDuplicate(dispatcher, message));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testRollbackDuplicate() {
+
+    int count = 10000;
+    ConnectionAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+    Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+
+    Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+    Pointer<Message> message(new Message());
+    message->setDestination(destination);
+
+    for (int i = 0; i < count; i++) {
+        Pointer<MessageId> id(new MessageId);
+        id->setProducerId(pid);
+        id->setProducerSequenceId(i);
+        list.add(id);
+
+        message->setMessageId(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        Pointer<MessageId> id = list.get(index);
+        message->setMessageId(id);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+                               audit.isDuplicate(dispatcher, message));
+        audit.rollbackDuplicate(dispatcher, message);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
+                               !audit.isDuplicate(dispatcher, message));
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h?rev=1461355&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+    class ConnectionAuditTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( ConnectionAuditTest );
+        CPPUNIT_TEST( testConstructor1 );
+        CPPUNIT_TEST( testConstructor2 );
+        CPPUNIT_TEST( testIsDuplicate );
+        CPPUNIT_TEST( testRollbackDuplicate );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        ConnectionAuditTest();
+        virtual ~ConnectionAuditTest();
+
+        void testConstructor1();
+        void testConstructor2();
+        void testIsDuplicate();
+        void testRollbackDuplicate();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Tue Mar 26 22:36:22
2013
@@ -90,6 +90,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::SimplePriorityMessageDispatchChannelTest
);
 #include <activemq/core/ActiveMQMessageAuditTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQMessageAuditTest );
+#include <activemq/core/ConnectionAuditTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ConnectionAuditTest );
 
 #include <activemq/state/ConnectionStateTrackerTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );



Mime
View raw message