activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1460900 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ test/ test/activemq/core/
Date Mon, 25 Mar 2013 21:18:46 GMT
Author: tabish
Date: Mon Mar 25 21:18:46 2013
New Revision: 1460900

URL: http://svn.apache.org/r1460900
Log:
https://issues.apache.org/jira/browse/AMQCPP-367

Adds a message Audit to use for dup detection. 

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
  (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Mar 25 21:18:46 2013
@@ -96,6 +96,7 @@ cc_sources = \
     activemq/core/ActiveMQConnectionMetaData.cpp \
     activemq/core/ActiveMQConstants.cpp \
     activemq/core/ActiveMQConsumer.cpp \
+    activemq/core/ActiveMQMessageAudit.cpp \
     activemq/core/ActiveMQProducer.cpp \
     activemq/core/ActiveMQQueueBrowser.cpp \
     activemq/core/ActiveMQSession.cpp \
@@ -732,6 +733,7 @@ h_sources = \
     activemq/core/ActiveMQConnectionMetaData.h \
     activemq/core/ActiveMQConstants.h \
     activemq/core/ActiveMQConsumer.h \
+    activemq/core/ActiveMQMessageAudit.h \
     activemq/core/ActiveMQProducer.h \
     activemq/core/ActiveMQQueueBrowser.h \
     activemq/core/ActiveMQSession.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp?rev=1460900&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQMessageAudit.h"
+
+#include <activemq/util/IdGenerator.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/ProducerId.h>
+
+#include <decaf/util/LRUCache.h>
+#include <decaf/util/BitSet.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+#include <string>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+    class MessageAuditImpl {
+    private:
+
+        MessageAuditImpl(const MessageAuditImpl&);
+        MessageAuditImpl& operator= (const MessageAuditImpl&);
+
+    public:
+
+        int auditDepth;
+        int maximumNumberOfProducersToTrack;
+        Mutex mutex;
+
+        LRUCache<std::string, Pointer<BitSet> > map;
+
+        MessageAuditImpl() : auditDepth(2048),
+                             maximumNumberOfProducersToTrack(64),
+                             mutex(),
+                             map() {
+        }
+
+        MessageAuditImpl(int auditDepth, int maximumNumberOfProducersToTrack) :
+            auditDepth(auditDepth),
+            maximumNumberOfProducersToTrack(maximumNumberOfProducersToTrack),
+            mutex(),
+            map() {
+        }
+
+        void adjustMaxProducersToTrack(int value) {
+            // When value is smaller than current we need to move the entries
+            // to a new cache with that setting so that old ones are pruned
+            // since putAll will access the entries in the right order,
+            // this shouldn't result in wrong cache entries being removed
+            if (value < maximumNumberOfProducersToTrack) {
+                LRUCache<std::string, Pointer<BitSet> > newMap(0, value, 0.75f,
true);
+                newMap.putAll(this->map);
+                this->map.clear();
+                this->map.putAll(newMap);
+            }
+            this->map.setMaxCacheSize(value);
+            this->maximumNumberOfProducersToTrack = value;
+        }
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::ActiveMQMessageAudit() : impl(new MessageAuditImpl) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::ActiveMQMessageAudit(int auditDepth, int maximumNumberOfProducersToTrack)
:
+    impl(new MessageAuditImpl(auditDepth, maximumNumberOfProducersToTrack)) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::~ActiveMQMessageAudit() {
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQMessageAudit::getAuditDepth() const {
+    return this->impl->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::setAuditDepth(int value) {
+    this->impl->auditDepth = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQMessageAudit::getMaximumNumberOfProducersToTrack() const {
+    return this->impl->maximumNumberOfProducersToTrack;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::getMaximumNumberOfProducersToTrack(int value) {
+    this->impl->adjustMaxProducersToTrack(value);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isDuplicate(const std::string& id) const {
+    bool answer = false;
+    std::string seed = IdGenerator::getSeedFromId(id);
+    if (!seed.empty()) {
+
+        synchronized(&this->impl->mutex) {
+
+            Pointer<BitSet> bits;
+            try {
+                bits = this->impl->map.get(seed);
+            } catch (NoSuchElementException& ex) {
+                bits.reset(new BitSet(this->impl->auditDepth));
+                this->impl->map.put(seed, bits);
+            }
+
+            long long index = IdGenerator::getSequenceFromId(id);
+            if (index >= 0) {
+                int scaledIndex = (int) index;
+                if (index > Integer::MAX_VALUE) {
+                    scaledIndex = (int)(index - Integer::MAX_VALUE);
+                }
+
+                answer = bits->get(scaledIndex);
+                if (!answer) {
+                    bits->set(scaledIndex, true);
+                }
+            }
+        }
+    }
+    return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isDuplicate(decaf::lang::Pointer<MessageId> msgId) const
{
+    bool answer = false;
+
+    if (msgId != NULL) {
+        Pointer<ProducerId> pid = msgId->getProducerId();
+        if (pid != NULL) {
+            std::string seed = pid->toString();
+            if (!seed.empty()) {
+
+                synchronized(&this->impl->mutex) {
+
+                    Pointer<BitSet> bits;
+                    try {
+                        bits = this->impl->map.get(seed);
+                    } catch (NoSuchElementException& ex) {
+                        bits.reset(new BitSet(this->impl->auditDepth));
+                        this->impl->map.put(seed, bits);
+                    }
+
+                    long long index = msgId->getProducerSequenceId();
+                    if (index >= 0) {
+                        int scaledIndex = (int) index;
+                        if (index > Integer::MAX_VALUE) {
+                            scaledIndex = (int)(index - Integer::MAX_VALUE);
+                        }
+
+                        answer = bits->get(scaledIndex);
+                        if (!answer) {
+                            bits->set(scaledIndex, true);
+                        }
+                    }
+                }
+            }
+        }
+    }
+    return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::rollback(const std::string& msgId) {
+    std::string seed = IdGenerator::getSeedFromId(msgId);
+    if (!seed.empty()) {
+
+        synchronized(&this->impl->mutex) {
+
+            Pointer<BitSet> bits;
+            try {
+                bits = this->impl->map.get(seed);
+            } catch (NoSuchElementException& ex) {
+            }
+
+            if (bits != NULL) {
+                long long index = IdGenerator::getSequenceFromId(msgId);
+                if (index >= 0) {
+                    int scaledIndex = (int) index;
+                    if (index > Integer::MAX_VALUE) {
+                        scaledIndex = (int)(index - Integer::MAX_VALUE);
+                    }
+
+                    bits->set(scaledIndex, false);
+                }
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::rollback(decaf::lang::Pointer<commands::MessageId> msgId)
{
+    if (msgId != NULL) {
+        Pointer<ProducerId> pid = msgId->getProducerId();
+        if (pid != NULL) {
+            std::string seed = pid->toString();
+            if (!seed.empty()) {
+
+                synchronized(&this->impl->mutex) {
+
+                    Pointer<BitSet> bits;
+                    try {
+                        bits = this->impl->map.get(seed);
+                    } catch (NoSuchElementException& ex) {
+                    }
+
+                    if (bits != NULL) {
+                        long long index = msgId->getProducerSequenceId();
+                        if (index >= 0) {
+                            int scaledIndex = (int) index;
+                            if (index > Integer::MAX_VALUE) {
+                                scaledIndex = (int)(index - Integer::MAX_VALUE);
+                            }
+
+                            bits->set(scaledIndex, false);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isInOrder(const std::string& msgId) const {
+    bool answer = true;
+
+    if (!msgId.empty()) {
+        std::string seed = IdGenerator::getSeedFromId(msgId);
+        if (!seed.empty()) {
+
+            synchronized(&this->impl->mutex) {
+
+                Pointer<BitSet> bits;
+                try {
+                    bits = this->impl->map.get(seed);
+                } catch (NoSuchElementException& ex) {
+                    bits.reset(new BitSet(this->impl->auditDepth));
+                    this->impl->map.put(seed, bits);
+                }
+
+                long long index = IdGenerator::getSequenceFromId(msgId);
+                if (index >= 0) {
+
+                    int scaledIndex = (int) index;
+                    if (index > Integer::MAX_VALUE) {
+                        scaledIndex = (int)(index - Integer::MAX_VALUE);
+                    }
+
+                    answer = ((bits->length() - 1) == scaledIndex);
+                }
+            }
+        }
+    }
+    return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isInOrder(decaf::lang::Pointer<commands::MessageId> msgId)
const {
+    bool answer = false;
+
+    if (msgId != NULL) {
+        Pointer<ProducerId> pid = msgId->getProducerId();
+        if (pid != NULL) {
+            std::string seed = pid->toString();
+            if (!seed.empty()) {
+
+                synchronized(&this->impl->mutex) {
+
+                    Pointer<BitSet> bits;
+                    try {
+                        bits = this->impl->map.get(seed);
+                    } catch (NoSuchElementException& ex) {
+                        bits.reset(new BitSet(this->impl->auditDepth));
+                        this->impl->map.put(seed, bits);
+                    }
+
+                    long long index = msgId->getProducerSequenceId();
+                    if (index >= 0) {
+                        int scaledIndex = (int) index;
+                        if (index > Integer::MAX_VALUE) {
+                            scaledIndex = (int)(index - Integer::MAX_VALUE);
+                        }
+                        answer = ((bits->length() - 1) == scaledIndex);
+                    }
+                }
+            }
+        }
+    }
+    return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQMessageAudit::getLastSeqId(decaf::lang::Pointer<commands::ProducerId>
id) const {
+    long result = -1;
+    if (id != NULL) {
+        std::string seed = id->toString();
+        if (!seed.empty()) {
+
+            synchronized(&this->impl->mutex) {
+
+                Pointer<BitSet> bits;
+                try {
+                    bits = this->impl->map.get(seed);
+                } catch (NoSuchElementException& ex) {
+                }
+
+                if (bits != NULL) {
+                    result = bits->length() - 1;
+                }
+            }
+        }
+    }
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::clear() {
+    this->impl->map.clear();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h?rev=1460900&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/MessageId.h>
+#include <activemq/commands/ProducerId.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+    class MessageAuditImpl;
+
+    class AMQCPP_API ActiveMQMessageAudit {
+    private:
+
+        MessageAuditImpl* impl;
+
+    public:
+
+        /**
+         * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
+         */
+        ActiveMQMessageAudit();
+
+        /**
+         * Construct a MessageAudit
+         *
+         * @param auditDepth
+         *      The range of ids to track.
+         * @param maximumNumberOfProducersToTrack
+         *      The number of producers expected in the system
+         */
+        ActiveMQMessageAudit(int auditDepth, int maximumNumberOfProducersToTrack);
+
+        ~ActiveMQMessageAudit();
+
+    public:
+
+        /**
+         * Gets the currently configured Audit Depth
+         *
+         * @returns the current audit depth setting
+         */
+        int getAuditDepth() const;
+
+        /**
+         * Sets a new Audit Depth value.
+         *
+         * @param value
+         *      The range of ids to track.
+         */
+        void setAuditDepth(int value);
+
+        /**
+         * @returns the current number of producers that will be tracked.
+         */
+        int getMaximumNumberOfProducersToTrack() const;
+
+        /**
+         * Sets the number of producers to track
+         *
+         * @param value
+         *      The number of producers expected in the system
+         */
+        void getMaximumNumberOfProducersToTrack(int value);
+
+        /**
+         * checks whether this messageId has been seen before and adds this
+         * messageId to the list
+         *
+         * @param msgId
+         *      The string value Message Id.
+         *
+         * @return true if the message is a duplicate.
+         */
+        bool isDuplicate(const std::string& msgId) const;
+
+        /**
+         * Checks if this messageId has been seen before
+         *
+         * @param msgId
+         *      The target MessageId to check.
+         *
+         * @return true if the message is a duplicate
+         */
+        bool isDuplicate(decaf::lang::Pointer<commands::MessageId> msgId) const;
+
+        /**
+         * Marks this message as being received.
+         *
+         * @param msgId
+         *      The string value Message Id.
+         */
+        void rollback(const std::string& msgId);
+
+        /**
+         * Marks this message as being received.
+         *
+         * @param msgId
+         *      The target MessageId to check.
+         */
+        void rollback(decaf::lang::Pointer<commands::MessageId> msgId);
+
+        /**
+         * Check the MessageId is in order
+         *
+         * @param msgId
+         *      The string value Message Id.
+         *
+         * @return true if the MessageId is in order.
+         */
+        bool isInOrder(const std::string& msgId) const;
+
+        /**
+         * Check the MessageId is in order
+         *
+         * @param msgId
+         *      The target MessageId to check.
+         *
+         * @return true if the MessageId is in order.
+         */
+        bool isInOrder(decaf::lang::Pointer<commands::MessageId> msgId) const;
+
+        /**
+         * @returns the last sequence Id that we've audited for the given producer.
+         */
+        long long getLastSeqId(decaf::lang::Pointer<commands::ProducerId> id) const;
+
+        /**
+         * Clears this Audit.
+         */
+        void clear();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Mon Mar 25 21:18:46 2013
@@ -36,6 +36,7 @@ cc_sources = \
     activemq/commands/XATransactionIdTest.cpp \
     activemq/core/ActiveMQConnectionFactoryTest.cpp \
     activemq/core/ActiveMQConnectionTest.cpp \
+    activemq/core/ActiveMQMessageAuditTest.cpp \
     activemq/core/ActiveMQSessionTest.cpp \
     activemq/core/FifoMessageDispatchChannelTest.cpp \
     activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
@@ -283,6 +284,7 @@ h_sources = \
     activemq/commands/XATransactionIdTest.h \
     activemq/core/ActiveMQConnectionFactoryTest.h \
     activemq/core/ActiveMQConnectionTest.h \
+    activemq/core/ActiveMQMessageAuditTest.h \
     activemq/core/ActiveMQSessionTest.h \
     activemq/core/FifoMessageDispatchChannelTest.h \
     activemq/core/SimplePriorityMessageDispatchChannelTest.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp?rev=1460900&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQMessageAuditTest.h"
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/util/IdGenerator.h>
+
+#include <decaf/util/ArrayList.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAuditTest::ActiveMQMessageAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAuditTest::~ActiveMQMessageAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsDuplicateString() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    IdGenerator idGen;
+
+    ArrayList<std::string> list;
+    for (int i = 0; i < count; i++) {
+        std::string id = idGen.generateId();
+        list.add(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        std::string id = list.get(index);
+        CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsDuplicateMessageId() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+
+    for (int i = 0; i < count; i++) {
+        Pointer<MessageId> id(new MessageId);
+        id->setProducerId(pid);
+        id->setProducerSequenceId(i);
+        list.add(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+    }
+
+    int index = list.size() -1 -audit.getAuditDepth();
+    for (; index < list.size(); index++) {
+        Pointer<MessageId> id = list.get(index);
+        CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), audit.isDuplicate(id));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsInOrderString() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    IdGenerator idGen;
+
+    ArrayList<std::string> list;
+    for (int i = 0; i < count; i++) {
+        std::string id = idGen.generateId();
+        if (i == 0) {
+            CPPUNIT_ASSERT(!audit.isDuplicate(id));
+            CPPUNIT_ASSERT(audit.isInOrder(id));
+        }
+        if (i > 1 && i % 2 != 0) {
+            list.add(id);
+        }
+    }
+
+    for (int i = 0; i < list.size(); i++) {
+        std::string id = list.get(i);
+        CPPUNIT_ASSERT_MESSAGE(std::string("Out of order msg: ") + id, !audit.isInOrder(id));
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsInOrderMessageId() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+
+    for (int i = 0; i < count; i++) {
+        Pointer<MessageId> id(new MessageId);
+        id->setProducerId(pid);
+        id->setProducerSequenceId(i);
+
+        if (i == 0) {
+            CPPUNIT_ASSERT(!audit.isDuplicate(id));
+            CPPUNIT_ASSERT(audit.isInOrder(id));
+        }
+        if (i > 1 && i % 2 != 0) {
+            list.add(id);
+        }
+    }
+
+    for (int i = 0; i < list.size(); i++) {
+        Pointer<MessageId> mid = list.get(i);
+        CPPUNIT_ASSERT_MESSAGE(std::string("Out of order msg: ") + mid->toString(), !audit.isInOrder(mid));
+        CPPUNIT_ASSERT(!audit.isDuplicate(mid));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testGetLastSeqId() {
+
+    int count = 10000;
+    ActiveMQMessageAudit audit;
+    ArrayList<Pointer<MessageId> > list;
+
+    Pointer<ProducerId> pid(new ProducerId);
+    pid->setConnectionId("test");
+    pid->setSessionId(0);
+    pid->setValue(1);
+    Pointer<MessageId> id(new MessageId);
+    id->setProducerId(pid);
+
+    for (int i = 0; i < count; i++) {
+        id->setProducerSequenceId(i);
+        list.add(id);
+        CPPUNIT_ASSERT(!audit.isDuplicate(id));
+        CPPUNIT_ASSERT_EQUAL((long long)i, audit.getLastSeqId(pid));
+    }
+
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h?rev=1460900&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+    class ActiveMQMessageAuditTest : public CppUnit::TestFixture {
+
+        CPPUNIT_TEST_SUITE( ActiveMQMessageAuditTest );
+        CPPUNIT_TEST( testIsDuplicateString );
+        CPPUNIT_TEST( testIsDuplicateMessageId );
+        CPPUNIT_TEST( testIsInOrderString );
+        CPPUNIT_TEST( testIsInOrderMessageId );
+        CPPUNIT_TEST( testGetLastSeqId );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        ActiveMQMessageAuditTest();
+        virtual ~ActiveMQMessageAuditTest();
+
+        void testIsDuplicateString();
+        void testIsDuplicateMessageId();
+        void testIsInOrderString();
+        void testIsInOrderMessageId();
+        void testGetLastSeqId();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Mon Mar 25 21:18:46
2013
@@ -88,6 +88,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::FifoMessageDispatchChannelTest );
 #include <activemq/core/SimplePriorityMessageDispatchChannelTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::SimplePriorityMessageDispatchChannelTest
);
+#include <activemq/core/ActiveMQMessageAuditTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQMessageAuditTest );
 
 #include <activemq/state/ConnectionStateTrackerTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );



Mime
View raw message