activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1447894 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/examples: ./ stress-test/
Date Tue, 19 Feb 2013 20:12:30 GMT
Author: tabish
Date: Tue Feb 19 20:12:30 2013
New Revision: 1447894

URL: http://svn.apache.org/r1447894
Log:
Adds another example app.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am?rev=1447894&r1=1447893&r2=1447894&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am Tue Feb 19 20:12:30 2013
@@ -105,3 +105,17 @@ noinst_PROGRAMS += cmstemplate_stress
 cmstemplate_stress_SOURCES = $(cmstemplate_stress_sources)
 cmstemplate_stress_LDADD= $(AMQ_TEST_LIBS)
 cmstemplate_stress_CXXFLAGS = $(AMQ_TEST_CXXFLAGS) -I$(srcdir)/../main
+
+## CMS Template Stress Test Example
+stress_stress_sources = stress-test/TestSenderAndReceiver.cpp \
+                        stress-test/Sender.cpp \
+                        stress-test/Receiver.cpp \
+                        stress-test/MessagingTask.cpp \
+                        stress-test/ConnectionFactoryMgr.cpp \
+                        stress-test/CmsStress.cpp \
+                        stress-test/BrokerMonitor.cpp \
+                        stress-test/CmsMessageCreator.cpp
+noinst_PROGRAMS += stress_test
+stress_test_SOURCES = $(stress_stress_sources)
+stress_test_LDADD= $(AMQ_TEST_LIBS)
+stress_test_CXXFLAGS = $(AMQ_TEST_CXXFLAGS) -I$(srcdir)/../main

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BrokerMonitor.h"
+#include "ConnectionFactoryMgr.h"
+#include "TestSenderAndReceiver.h"
+
+#include <cms/Session.h>
+#include <cms/Message.h>
+#include <cms/ConnectionFactory.h>
+
+#include <activemq/cmsutil/MessageCreator.h>
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <decaf/util/Iterator.h>
+
+#include <stdio.h>
+
+extern bool VERBOSE;
+
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::cmsutil;
+using namespace decaf::lang::exceptions;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+BrokerMonitor::BrokerMonitor(const std::string& url, int interval, CountDownLatch* quit) :
+    closing(false), brokerOk(false), url(url), interval(interval), brokerMonitorThread(), quit(quit) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BrokerMonitor::~BrokerMonitor() {
+    try {
+        close();
+    } catch (...) {
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::close() {
+    closing = true;
+    if (brokerMonitorThread) {
+        brokerMonitorThread->join();
+        delete brokerMonitorThread;
+        brokerMonitorThread = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::start() {
+    if (!brokerMonitorThread) {
+        brokerMonitorThread = new Thread(this, "Message Broker Monitor Thread");
+        brokerMonitorThread->start();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::run() {
+    ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+    CmsTemplate* cmsTemplate = createCmsTemplate(connectionFactory);
+
+    while (!closing) {
+        try {
+            cmsTemplate->send(this);
+            Message* message = cmsTemplate->receive();
+
+            if (message) {
+                delete message;
+                if (VERBOSE) {
+                    printf("%c", SYM_MON_GOOD);
+                }
+                brokerOk = true;
+            } else {
+                if (VERBOSE) {
+                    printf("%c", SYM_MON_BAD);
+                }
+                brokerOk = false;
+            }
+        } catch (cms::CMSException& ex) {
+            if (VERBOSE) {
+                printf("%c", SYM_MON_CMS);
+            }
+            brokerOk = false;
+        } catch (...) {
+            if (VERBOSE) {
+                printf("%c", SYM_MON_EXC);
+            }
+            brokerOk = false;
+        }
+
+        if (quit->await(interval)) {
+            closing = true;
+        }
+    }
+
+    delete cmsTemplate;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Message* BrokerMonitor::createMessage(cms::Session* session) {
+    Message* message = NULL;
+    if (session) {
+        message = session->createTextMessage("Heart Beat");
+    }
+    return message;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsTemplate* BrokerMonitor::createCmsTemplate(ConnectionFactory* connectionFactory) {
+
+    CmsTemplate* cmsTemplate = new CmsTemplate(connectionFactory);
+    cmsTemplate->setDefaultDestinationName("cpp.CmsMessageHandler.BrokerMonitor.HeartBeatingChannel");
+    cmsTemplate->setTimeToLive(1000);
+    cmsTemplate->setReceiveTimeout(1000);
+
+    return cmsTemplate;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BrokerMonitor::isBrokerOk() {
+    return brokerOk;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_BROKERMONITOR_H_
+#define _CMS_STRESS_BROKERMONITOR_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+namespace cms {
+    class Session;
+namespace stress {
+
+    class Client;
+
+    class BrokerMonitor: public decaf::lang::Runnable, activemq::cmsutil::MessageCreator {
+    private:
+
+        bool closing;
+        bool brokerOk;
+        std::string url;
+        int interval;
+        decaf::lang::Thread* brokerMonitorThread;
+        decaf::util::concurrent::CountDownLatch* quit;
+
+    private:
+
+        activemq::cmsutil::CmsTemplate* createCmsTemplate(cms::ConnectionFactory* connectionFactory);
+
+    public:
+
+        BrokerMonitor(const std::string& url, int interval,
+                      decaf::util::concurrent::CountDownLatch* quit);
+
+        virtual ~BrokerMonitor();
+
+        virtual cms::Message* createMessage(cms::Session* session);
+
+        virtual void run();
+
+        void start();
+
+        void close();
+
+        bool isBrokerOk();
+
+    };
+
+}}
+
+#endif /** _CMS_STRESS_BROKERMONITOR_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CmsMessageCreator.h"
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::CmsMessageCreator(const std::string& text, const std::string& name, const std::string& value) :
+    text(text), headerName(name), headerValue(value) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::~CmsMessageCreator() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* CmsMessageCreator::createMessage(cms::Session* session) {
+
+    cms::Message* message = NULL;
+    if (session) {
+        message = session->createTextMessage(text);
+        if (headerName != "") {
+            message->setStringProperty(headerName, headerValue);
+        }
+    }
+    return message;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CMSMESSAGECREATOR_H_
+#define _CMS_STRESS_CMSMESSAGECREATOR_H_
+
+#include <decaf/util/Config.h>
+
+#include <cms/Session.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+namespace cms {
+namespace stress {
+
+    class CmsMessageCreator: public activemq::cmsutil::MessageCreator {
+    private:
+
+        std::string text;
+        std::string headerName;
+        std::string headerValue;
+
+    public:
+
+        CmsMessageCreator(const std::string& txt,
+                          const std::string& name = "",
+                          const std::string& value = "");
+
+        virtual ~CmsMessageCreator();
+
+        virtual cms::Message* createMessage(cms::Session* session);
+    };
+
+}}
+
+#endif /** _CMS_STRESS_CMSMESSAGECREATOR_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_
+#define _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_
+
+#include <decaf/util/Config.h>
+
+namespace cms {
+namespace stress {
+
+    #define SYM_GOOD_SEND 32
+    #define SYM_BAD_SEND 33
+    #define SYM_BIG_DIFF 63
+    #define SYM_GOOD_SEQ 2
+    #define SYM_BAD_MSG 105
+    #define SYM_MON_GOOD 71
+    #define SYM_MON_BAD 66
+    #define SYM_MON_CMS 67
+    #define SYM_MON_EXC 69
+
+    enum ErrorCode {         // NOTE: When added an entry to ErrorCode you must add the corresponding error string to ErrorDescription
+       CMS_SUCCESS = 0,
+       CMS_ERROR_UNABLE_TO_PARSE_XML,
+       CMS_ERROR_MESSAGE_HAS_BEEN_DEFINED_ALREADY,
+       CMS_ERROR_HEADER_HAS_BEEN_DEFINED_ALREADY,
+       CMS_ERROR_CLIENT_HAS_BEEN_DEFINED_ALREADY,
+       CMS_ERROR_DESTINATION_HAS_BEEN_DEFINED_ALREADY,
+       CMS_ERROR_INVALID_CLIENT,
+       CMS_ERROR_INVALID_DESTINATION,
+       CMS_ERROR_INVALID_MESSAGE,
+       CMS_ERROR_INVALID_HEADERS,
+       CMS_ERROR_INVALID_MESSAGELISTENER,
+       CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY,
+       CMS_ERROR_RECEIVER_TIMEDOUT,
+       CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_SENDING_MESSAGES,
+       CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_RECEIVING_MESSAGES,
+       CMS_ERROR_CAUGHT_CMS_EXCEPTION,
+       CMS_ERROR_CAUGHT_TBGENOBJ_ERROR,
+       CMS_ERROR_MESSAGE_BROKER_ERROR,
+       CMS_ERROR_BROKER_MONITOR_NOT_FOUND,
+       CMS_ERROR_BROKER_MONITORING_NOT_TURNED_ON,
+       CMS_ERROR_INVALID_BROKERSTATUSLISTENER,
+       CMS_ERROR_A_BROKERSTATUSLISTENER_HAS_BEEN_REGISTERED_ALREADY,
+       CMS_ERROR_CAUGHT_EXCEPTION_IN_INIT,
+       CMS_ERROR_CAUGHT_EXCEPTION_IN_UNINIT,
+       CMS_LAST                             // Put all error enums BEFORE this one. This one must be listed last.
+    };
+
+    const char ErrorDescription[][100] = {
+        "Success",
+        "Unable to parse xml",
+        "Message has been defined already",
+        "Header has been defined already",
+        "Client has been defined already",
+        "Destination has been defined already",
+        "Invalid client",
+        "Invalid destination",
+        "Invalid message",
+        "Invalid headers",
+        "Invalid messagelistener",
+        "A messagelistener has been registered already with the destination",
+        "Receiver timed out",
+        "Destination not configured for sending messages",
+        "Destination not configured for receiving messages",
+        "Caught CMS exception",
+        "Caught TBGenObj error",
+        "Message broker appears to be offline",
+        "Can not find broker monitor",
+        "The broker monitoring functinaliy has not been turned on",
+        "Invalid brokerStatuslistener",
+        "This brokerStatuslistener has been registered already with the broker",
+        "Caught an exception when initializing CmsMessageHandler",
+        "Caught an exception when uninitializing CmsMessageHandler",
+        "CMS_LAST - ErrorCodeToString macro index out of range",
+    };
+
+    #define ErrorCodeToString(i) (((i >= CMS_SUCCESS) && (i <= CMS_LAST)) ? ErrorDescription[i] : ErrorDescription[CMS_LAST])
+    #define IsError(i) (i != CMS_SUCCESS)
+
+}}
+
+#endif /** _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/library/ActiveMQCPP.h>
+
+#include "TestSenderAndReceiver.h"
+#include "ConnectionFactoryMgr.h"
+#include "BrokerMonitor.h"
+#include "MessagingTask.h"
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+#include <iostream>
+#include <stdlib.h>
+#include <stdio.h>
+
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::library;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+bool VERBOSE = false;
+static bool bPause = false;
+
+decaf::util::concurrent::CountDownLatch* quit;
+
+TESTINFO TestResults;
+int InitialPrivateMemorySize;
+
+////////////////////////////////////////////////////////////////////////////////
+void DisplayResults() {
+    long long mills = TestResults.endTime - TestResults.startTime;
+    int secs = (int) (mills / 1000);
+    int mins = secs / 60;
+    int hrs = mins / 60;
+    int days = hrs / 24;
+
+    printf("\nT E S T   S U M M A R Y\n");
+
+    if (days > 0) {
+        printf("Elapsed time = %d:%02.2d:%02.2d:%02.2d.%03.3d\n", days, hrs % 24, mins % 60, secs % 60, mills % 1000);
+    } else {
+        printf("Elapsed time = %02.2d:%02.2d:%02.2d.%03.3d\n", hrs % 24, mins % 60, secs % 60, mills % 1000);
+    }
+
+    printf("Threads used = %d\n", TestResults.threadCount);
+
+    printf("Messages sent = %d\n", TestResults.sent.get());
+    printf("Messages received = %d\n", TestResults.received.get());
+    if (TestResults.invalidMessages.get()) {
+        printf("Invalid Messages = %d\n", TestResults.invalidMessages.get());
+    }
+    if (TestResults.badSequenceMessages.get()) {
+        printf("Sequence Errors = %d\n", TestResults.badSequenceMessages.get());
+    }
+    if (TestResults.sendErrors.get()) {
+        printf("Send Errors = %d\n", TestResults.sendErrors.get());
+    }
+
+    if (TestResults.sent.get() > 0) {
+        printf("Reliability = %0.3f%%\n", (double) (((double) (TestResults.received.get()) * 100) / (double) TestResults.sent.get()));
+        printf("Sequenced = %0.3f%%\n", (double) (((double) (TestResults.received.get() - TestResults.badSequenceMessages.get()) * 100) / (double) TestResults.sent.get()));
+        printf("Output performance = %0.3f ms/msg\n", (double) ((double) mills / (double) TestResults.sent.get()));
+        if ((TestResults.received.get() + TestResults.invalidMessages.get()) > 0) {
+            printf("Input performance = %0.3f ms/msg\n", (double) ((double) mills / (double) (TestResults.received.get() + TestResults.invalidMessages.get())));
+        }
+    }
+
+    if (bPause) {
+        printf("Press any key to continue...\n");
+        std::cin.get();
+        printf("\n");
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int main(int argc, char** argv) {
+
+    BrokerMonitor* monitor = NULL;
+    std::string url = "tcp://";
+    int cnt = 25;
+    int done = 3600;
+    int monint = 5;
+    int inter = -1;
+    int seed = 0;
+    int thrdmax = 100;
+    int thrdmin = 3;
+    char *ptr;
+    bool bOK = true;
+    bool bReport = false;
+    bool bPool = true;
+    const char* pName = NULL;
+    const char* pHeader = NULL;
+
+    printf("\n");
+
+    for (int i = 1; bOK && (i < argc); i++) {
+        if (argv[i][0] != '-') {
+            bOK = false;
+        } else {
+            switch (argv[i][1]) {
+            case 'a':
+                bPool = false;
+                break;
+            case 'b':
+                monint = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 'f':
+                thrdmin = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 'g':
+                thrdmax = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 'i':
+                inter = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 'n':
+                pName = argv[i + 1];
+                i++;
+                break;
+            case 'p':
+                bPause = true;
+                break;
+            case 'r':
+                bReport = true;
+                break;
+            case 's':
+                done = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 't':
+                cnt = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            case 'u':
+                pHeader = argv[i + 1];
+                i++;
+                break;
+            case 'v':
+                VERBOSE = true;
+                break;
+            case 'x':
+                seed = Integer::parseInt(argv[i + 1]);
+                i++;
+                break;
+            default:
+                bOK = false;
+                break;
+            }
+        }
+    }
+
+    if (!bOK) {
+        printf("Usage: %s [-b #] [-i #] [-m] [-n name] [-p] [-r] [-s #] [-t #] [-u header] [-v] [-x # ]\n", argv[0]);
+        printf(" -a   : Do not use thread pools when dispatching incoming messages (default is to use pools)\n");
+        printf(" -b # : Number of seconds between heartbeat messages (0 disables) (default is 5)\n");
+        printf(" -f # : Minimum number of threads to have available when use thread pools (default is 3)\n");
+        printf(" -g # : Maximum number of threads that can be created in each thread pool (default is 100)\n");
+        printf(" -i # : Number of milliseconds to wait between message sends (-1 is random delay between 0 and 1000ms) (default is -1)\n");
+        printf(" -n s : Specify the name of the topic to use (default is 'topic')\n");
+        printf(" -p   : Pause for keyboard input (default is false)\n");
+        printf(" -r   : Display test summary (default is false)\n");
+        printf(" -s # : Number of seconds to run test for (0 is until CTRL+C is entered) (default is 3600 - 1hr)\n");
+        printf(" -t # : Number of threads to use (default is 25)\n");
+        printf(" -u s : Use the specified header name for ID and utilize selectors\n");
+        printf(" -v   : Display verbose progress where the following symbols have the specified meaning:\n");
+        printf("        (%c) = Good send of a message\n", SYM_GOOD_SEND);
+        printf("        (%c) = Error encountered sending a message\n", SYM_BAD_SEND);
+        printf("        (%c) = Good sequenced message received\n", SYM_GOOD_SEQ);
+        printf("        (%c) = Invalid message received\n", SYM_BAD_MSG);
+        printf("        (%c) = Received message sequence is off by 10 or more\n", SYM_BIG_DIFF);
+        printf("        (#) = Number indicating how far off received sequence number is\n");
+        printf("        (%c) = Good heartbeat sent and received\n", SYM_MON_GOOD);
+        printf("        (%c) = Failed to send and receive heartbeat\n", SYM_MON_BAD);
+        printf("        (%c) = CMS exception received while send/receiving heartbeat\n", SYM_MON_CMS);
+        printf("        (%c) = Exception received while send/receiving heartbeat\n", SYM_MON_EXC);
+        printf(" -x # : Seed the random number generator with the specified seed (0 uses time) (default is 0)\n");
+        printf("\n");
+        return -1;
+    }
+
+    ptr = getenv("MessageBrokerIP");
+    if (ptr != NULL) {
+        url += ptr;
+    } else {
+        url += "127.0.0.1";
+    }
+    url += ":";
+    ptr = getenv("MessageBrokerPort");
+    if (ptr != NULL) {
+        url += ptr;
+    } else {
+        url += "61616";
+    }
+    url += "?connection.sendTimeout=1000";
+
+    ActiveMQCPP::initializeLibrary();
+    ConnectionFactoryMgr::initialize();
+    MessagingTask::initializeThreads(thrdmin, thrdmax);
+    quit = new CountDownLatch(1);
+
+    TestSenderAndReceiver** sar = new TestSenderAndReceiver*[cnt];
+    TestResults.threadCount = cnt;
+    TestResults.lastSequence = new AtomicInteger[cnt];
+
+    if (pName == NULL) {
+        pName = "topic";
+    }
+    if (pHeader == NULL) {
+        pHeader = "";
+    }
+
+#ifdef _UNICODE
+    wstring wstr(pName);
+    string nstr(wstr.begin(), wstr.end());
+    wstring whdr(pHeader);
+    string nhdr(whdr.begin(), whdr.end());
+#else
+    string nstr(pName);
+    string nhdr(pHeader);
+#endif
+
+    TestResults.startTime = System::currentTimeMillis();
+
+    if (monint > 0) {
+        monitor = new BrokerMonitor(url, monint * 1000, quit);
+        monitor->start();
+    }
+
+    for (int i = 0; i < cnt; i++) {
+        string topic(nstr);
+
+        // If not using selectors send to different topic names
+        if (nhdr == "") {
+            stringstream str;
+            str << i;
+            topic += str.str();
+        }
+        sar[i] = new TestSenderAndReceiver(url, topic.c_str(), nhdr.c_str(), true, false, monitor, quit, 50, 1000, i, bPool, inter, seed);
+        sar[i]->init();
+    }
+
+    if (done != 0) {
+        quit->await(done * 1000);
+    } else {
+        quit->await();
+    }
+
+    quit->countDown();
+
+    for (int i = 0; i < cnt; i++) {
+        sar[i]->close();
+        delete sar[i];
+    }
+    delete sar;
+
+    if (monitor != NULL) {
+        monitor->close();
+        delete monitor;
+    }
+
+    TestResults.endTime = System::currentTimeMillis();
+    printf("\n");
+
+    delete[] TestResults.lastSequence;
+    MessagingTask::terminateThreads();
+    ConnectionFactoryMgr::unInitialize();
+    delete quit;
+    ActiveMQCPP::shutdownLibrary();
+
+    if (bReport) {
+        DisplayResults();
+    }
+
+    printf("\nTest Completed!\n");
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionFactoryMgr.h"
+#include <activemq/core/ActiveMQConnectionFactory.h>
+
+using namespace decaf::lang::exceptions;
+using namespace activemq::core;
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+
+StlMap<std::string, ConnectionFactory*> * ConnectionFactoryMgr::connectionFactories;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::ConnectionFactoryMgr() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::~ConnectionFactoryMgr() {
+    unInitialize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::initialize() {
+    connectionFactories = new StlMap<std::string, ConnectionFactory*>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::unInitialize() {
+    connectionFactories->lock();
+
+    Pointer<Iterator<ConnectionFactory*> > iter(connectionFactories->values().iterator());
+    while (iter->hasNext()) {
+        ConnectionFactory* connectionFactory = iter->next();
+        if (connectionFactory != NULL) {
+            delete connectionFactory;
+            connectionFactory = NULL;
+        }
+    }
+
+    connectionFactories->clear();
+    connectionFactories->unlock();
+
+    delete connectionFactories;
+    connectionFactories = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactory* ConnectionFactoryMgr::getConnectionFactory(const std::string& url) {
+
+    ConnectionFactory* connectionFactory = NULL;
+
+    connectionFactories->lock();
+    try {
+        if (connectionFactories->containsKey(url)) {
+            connectionFactory = connectionFactories->get(url);
+        }
+    } catch (NoSuchElementException& ex) {
+    }
+
+    if (!connectionFactory) {
+        connectionFactory = new ActiveMQConnectionFactory(url);
+        connectionFactories->put(url, connectionFactory);
+    }
+    connectionFactories->unlock();
+
+    return connectionFactory;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CONNECTIONFACTORYMGR_H_
+#define _CMS_STRESS_CONNECTIONFACTORYMGR_H_
+
+#include <decaf/util/Config.h>
+#include <cms/ConnectionFactory.h>
+
+#include <decaf/util/StlMap.h>
+
+using namespace cms;
+using namespace decaf::util;
+using namespace std;
+
+namespace cms {
+namespace stress {
+
+    class ConnectionFactoryMgr {
+    private:
+
+        static decaf::util::StlMap<std::string, cms::ConnectionFactory*>* connectionFactories;
+
+        ConnectionFactoryMgr();
+        virtual ~ConnectionFactoryMgr();
+
+    public:
+
+        static void initialize();
+        static void unInitialize();
+
+        static cms::ConnectionFactory* getConnectionFactory(const std::string& url);
+
+    };
+
+}}
+
+#endif /** _CMS_STRESS_CONNECTIONFACTORYMGR_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessagingTask.h"
+
+using namespace decaf::lang;
+using namespace decaf::util::concurrent;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor* MessagingTask::threadPoolExecutor = NULL;
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::MessagingTask(Receiver* receiver, const std::string& message) :
+    receiver(receiver), message(message) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::~MessagingTask() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::queue() {
+    if (threadPoolExecutor != NULL) {
+        threadPoolExecutor->execute(this);
+    } else {
+        run();
+        delete this;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::run() {
+    try {
+        if (receiver != NULL) {
+            receiver->executeMessagingTask(message);
+        }
+    } catch (...) {
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::initializeThreads(int min, int max) {
+    if (min > 0) {
+        threadPoolExecutor = new ThreadPoolExecutor(min, max, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+    } else {
+        threadPoolExecutor = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::terminateThreads() {
+    if (threadPoolExecutor != NULL) {
+        threadPoolExecutor->shutdown();
+        //threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
+        threadPoolExecutor->awaitTermination(-1, TimeUnit::SECONDS);
+        delete threadPoolExecutor;
+        threadPoolExecutor = NULL;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_MESSAGINGTASK_H_
+#define _CMS_STRESS_MESSAGINGTASK_H_
+
+#include <decaf/util/Config.h>
+
+#include "Receiver.h"
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+
+namespace cms {
+namespace stress {
+
+    class MessagingTask:public decaf::lang::Runnable {
+    private:
+
+        Receiver* receiver;
+        std::string message;
+
+        static decaf::util::concurrent::ThreadPoolExecutor* threadPoolExecutor;
+
+    public:
+
+        MessagingTask(Receiver* receiver, const std::string& message);
+
+        virtual ~MessagingTask();
+
+        virtual void run();
+
+        static void initializeThreads(int min, int max);
+        static void terminateThreads();
+
+        virtual void queue();
+
+    };
+
+}}
+
+#endif /** _CMS_STRESS_MESSAGINGTASK_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Receiver.h"
+#include "MessagingTask.h"
+#include "ConnectionFactoryMgr.h"
+#include "BrokerMonitor.h"
+#include "CmsMessageCreator.h"
+
+#include <cms/Message.h>
+#include <activemq/cmsutil/MessageCreator.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Exception.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <stdio.h>
+
+using namespace decaf::lang;
+using namespace decaf::util::concurrent;
+using namespace activemq::cmsutil;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::Receiver(const std::string& url, const std::string& queueOrTopicName,
+                   bool isTopic, BrokerMonitor* monitor, CountDownLatch* quit,
+                   long long receiveTimeout, bool useThreadPool) :
+    url(url),
+    mutexForCmsTemplate(),
+    mutexGeneral(),
+    closing(false),
+    brokerOnline(true),
+    ready(1),
+    quit(quit),
+    messageListener(NULL),
+    cmsTemplate(NULL),
+    asyncReceiverThread(NULL),
+    receiveTimeout(receiveTimeout),
+    cmsTemplateCreateTime(System::currentTimeMillis()),
+    useThreadPool(useThreadPool),
+    numOfMessagingTasks(0),
+    monitor(monitor),
+    selector() {
+
+    ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+    cmsTemplateCreateTime = System::currentTimeMillis();
+    cmsTemplate = new CmsTemplate(connectionFactory);
+    cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+    cmsTemplate->setPubSubDomain(isTopic);
+    cmsTemplate->setReceiveTimeout(receiveTimeout);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::~Receiver() {
+
+    closing = true;
+
+    //delete cmsTemplate
+    mutexForCmsTemplate.lock();
+    if (cmsTemplate) {
+        delete cmsTemplate;
+        cmsTemplate = NULL;
+    }
+    mutexForCmsTemplate.unlock();
+
+    //wait until all outstanding messaging tasks are done
+    while (getNumOfMessagingTasks() > 0) {
+        Thread::sleep(100);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::receiveMessage(std::string& message, ErrorCode& errorCode,
+                              const std::string &selector, bool retryOnError) {
+
+    long long stopRetryTime = System::currentTimeMillis() + receiveTimeout;
+    errorCode = CMS_SUCCESS;
+
+    if (receiveTimeout == 0 /*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
+        retryOnError = false;
+    } else if (receiveTimeout == -1 /*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
+        retryOnError = true;
+    }
+
+    if (monitor != NULL) {
+        if (monitor->isBrokerOk()) {
+            if (!brokerOnline) {
+                mutexForCmsTemplate.lock();
+                if (cmsTemplate) {
+                    cmsTemplateCreateTime = System::currentTimeMillis();
+                    CmsTemplate* cmsTemplate = new CmsTemplate(cmsTemplate->getConnectionFactory());
+                    cmsTemplate->setDefaultDestinationName(cmsTemplate->getDefaultDestinationName());
+                    cmsTemplate->setPubSubDomain(cmsTemplate->isPubSubDomain());
+                    cmsTemplate->setReceiveTimeout(cmsTemplate->getReceiveTimeout());
+                    delete cmsTemplate;
+                }
+                mutexForCmsTemplate.unlock();
+
+                brokerOnline = true;
+            }
+        } else {
+            brokerOnline = false;
+            errorCode = CMS_ERROR_MESSAGE_BROKER_ERROR;
+            return;
+        }
+    }
+
+    do {
+        long long timeoutForThisLoop;
+        if (receiveTimeout <= 0) {
+            timeoutForThisLoop = receiveTimeout;
+        } else {
+            timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
+            if (timeoutForThisLoop <= 0) {
+                errorCode = CMS_ERROR_RECEIVER_TIMEDOUT;
+                break;
+            }
+        }
+
+        mutexForCmsTemplate.lock();
+        if (cmsTemplate) {
+            cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
+
+            cms::Message* cmsMessage = NULL;
+            try {
+                if (selector != "") {
+                    cmsMessage = cmsTemplate->receiveSelected(selector);
+                } else {
+                    cmsMessage = cmsTemplate->receive();
+                }
+            } catch (cms::CMSException& ex) {
+                mutexForCmsTemplate.unlock();
+                errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+                break;
+            }
+
+            mutexForCmsTemplate.unlock();
+            if (cmsMessage == NULL) {
+                break;
+            }
+
+            if (isMessageExpired(cmsMessage)) {
+                errorCode = CMS_ERROR_INVALID_MESSAGE;
+                delete cmsMessage;
+                continue;
+            }
+
+            wstring text;
+            cms::TextMessage* txtMessage = dynamic_cast<cms::TextMessage*>(cmsMessage);
+            if (txtMessage) {
+                message = txtMessage->getText();
+            }
+            delete cmsMessage;
+        } else {
+            mutexForCmsTemplate.unlock();
+        }
+    } while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis() < stopRetryTime);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::waitUntilReady() {
+    ready.await();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::registerMessageListener(ReceiverListener* messageListener, ErrorCode& errorCode,
+                                       const std::string& selector, int id) {
+    errorCode = CMS_SUCCESS;
+    char buffer[512];
+
+    if (id != 0) {
+        sprintf(buffer, "TestListener-%d", id);
+    } else {
+        sprintf(buffer, "TestAsyncListener");
+    }
+
+    mutexGeneral.lock();
+    if (messageListener == NULL) {
+        errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
+        mutexGeneral.unlock();
+        return;
+    }
+
+    if (messageListener != NULL) {
+        errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
+        mutexGeneral.unlock();
+        return;
+    }
+
+    this->messageListener = messageListener;
+    this->selector = selector;
+
+    asyncReceiverThread = new Thread(this, buffer);
+    asyncReceiverThread->start();
+    mutexGeneral.unlock();
+
+    this->waitUntilReady();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::run() {
+    ready.countDown();
+    while (!closing) {
+        std::string message = "";
+
+        ErrorCode errorCode = CMS_SUCCESS;
+
+        Receiver::receiveMessage(message, errorCode, selector, false);
+        if (quit->getCount() == 0) {
+            closing = true;
+        }
+
+        if ((message != "") && (!closing)) {
+            if (useThreadPool) {
+                MessagingTask* task = new MessagingTask(this, message);
+
+                increaseNumOfMessagingTasks();
+                task->queue();
+            } else {
+                try {
+                    executeMessagingTask(message, false);
+                } catch (...) {
+                }
+            }
+        } else if (!closing) {
+            if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR) {
+                long long sleepTime = 0;
+                mutexForCmsTemplate.lock();
+                sleepTime = cmsTemplate->getReceiveTimeout();
+                mutexForCmsTemplate.unlock();
+
+                if (quit->await(sleepTime)) {
+                    closing = true;
+                }
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::executeMessagingTask(const std::string& message, bool isDecreaseNumOfMessagingTasks) {
+    if (!closing) {
+        mutexGeneral.lock();
+        ReceiverListener* copy = messageListener;
+        mutexGeneral.unlock();
+        if (copy) {
+            copy->onMessage(message);
+        }
+    }
+
+    if (isDecreaseNumOfMessagingTasks) {
+        decreaseNumOfMessagingTasks();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Receiver::isMessageExpired(cms::Message* message) {
+    long long expireTime = message->getCMSExpiration();
+    long long currentTime = System::currentTimeMillis();
+    if (expireTime > 0 && currentTime > expireTime) {
+        return true;
+    }
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::increaseNumOfMessagingTasks() {
+    mutexGeneral.lock();
+    numOfMessagingTasks++;
+    mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::decreaseNumOfMessagingTasks() {
+    mutexGeneral.lock();
+    numOfMessagingTasks--;
+    mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long Receiver::getNumOfMessagingTasks() {
+    long result = 0;
+
+    mutexGeneral.lock();
+    result = numOfMessagingTasks;
+    mutexGeneral.unlock();
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::close() {
+    closing = true;
+    if (asyncReceiverThread) {
+        asyncReceiverThread->join();
+        delete asyncReceiverThread;
+        asyncReceiverThread = NULL;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_RECEIVER_H_
+#define _CMS_STRESS_RECEIVER_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+
+#include <string>
+
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+    class BrokerMonitor;
+
+    class ReceiverListener {
+    public:
+
+        virtual ~ReceiverListener() {}
+
+        virtual void onMessage(const std::string& message) = 0;
+
+    };
+
+    class Receiver: public decaf::lang::Runnable {
+    private:
+
+        std::string url;
+        decaf::util::concurrent::Mutex mutexForCmsTemplate;
+        decaf::util::concurrent::Mutex mutexGeneral;
+        bool closing;
+        bool brokerOnline;
+        decaf::util::concurrent::CountDownLatch ready;
+        decaf::util::concurrent::CountDownLatch* quit;
+        ReceiverListener* messageListener;
+        activemq::cmsutil::CmsTemplate* cmsTemplate;
+        decaf::lang::Thread* asyncReceiverThread;
+        long long receiveTimeout;
+        long long cmsTemplateCreateTime;
+        bool useThreadPool;
+        long numOfMessagingTasks;
+        BrokerMonitor* monitor;
+        std::string selector;
+
+    private:
+
+        virtual void waitUntilReady();
+        void increaseNumOfMessagingTasks();
+        void decreaseNumOfMessagingTasks();
+        long getNumOfMessagingTasks();
+
+    public:
+
+        Receiver(const std::string& url,
+                 const std::string& queueOrTopicName,
+                 bool isTopic,
+                 BrokerMonitor* monitor,
+                 decaf::util::concurrent::CountDownLatch* quit,
+                 long long receiveTimeout = 2000,
+                 bool useThreadPool = true);
+
+        virtual ~Receiver();
+
+        void close();
+
+        virtual void run();
+
+        void registerMessageListener(ReceiverListener* messageListener,
+                                     ErrorCode& errorCode, const std::string& selector, int id = 0);
+
+        void receiveMessage(std::string& message, ErrorCode& errorCode,
+                            const std::string& selector, bool retryOnError = true);
+
+        static bool isMessageExpired(cms::Message* message);
+
+        void executeMessagingTask(const std::string& message,
+                                  bool bDecreaseNumOfMessagingTasks = true);
+
+    };
+
+}}
+
+#endif /** _CMS_STRESS_RECEIVER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Sender.h"
+
+#include "CmsMessageCreator.h"
+#include "ConnectionFactoryMgr.h"
+
+#include <cms/ConnectionFactory.h>
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::cmsutil;
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::Sender(const std::string& url, const std::string& queueOrTopicName,
+               bool isTopic, bool isDeliveryPersistent, int timeToLive) : cmsTemplateMutex(), cmsTemplate() {
+
+    ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+
+    cmsTemplate = new CmsTemplate(connectionFactory);
+
+    cmsTemplate->setExplicitQosEnabled(true);
+    cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+    cmsTemplate->setPubSubDomain(isTopic);
+    cmsTemplate->setDeliveryPersistent(isDeliveryPersistent);
+    cmsTemplate->setTimeToLive(timeToLive);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::~Sender() {
+    cmsTemplateMutex.lock();
+    if (cmsTemplate) {
+        delete cmsTemplate;
+        cmsTemplate = NULL;
+    }
+    cmsTemplateMutex.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Sender::SendMessage(const std::string& message, ErrorCode& errorCode,
+                         const std::string& header, const std::string& value) {
+
+    CmsMessageCreator messageCreator(message, header, value);
+
+    try {
+        cmsTemplateMutex.lock();
+        cmsTemplate->send(&messageCreator);
+        cmsTemplateMutex.unlock();
+        errorCode = CMS_SUCCESS;
+    } catch (cms::CMSException& ex) {
+        cmsTemplateMutex.unlock();
+        errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_SENDER_H_
+#define _CMS_STRESS_SENDER_H_
+
+#include <decaf/util/Config.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/Runnable.h>
+
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+    class Sender {
+    private:
+
+        decaf::util::concurrent::Mutex cmsTemplateMutex;
+        activemq::cmsutil::CmsTemplate* cmsTemplate;
+
+    public:
+
+        Sender(const std::string& url,
+               const std::string& queueOrTopicName,
+               bool isTopic, bool isDeliveryPersistent,
+               int timeToLive);
+
+        virtual ~Sender();
+
+        void SendMessage(const std::string& msg,
+                         ErrorCode& errorCode,
+                         const std::string& header,
+                         const std::string& value);
+
+    };
+
+}}
+
+#endif /** _CMS_STRESS_SENDER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestSenderAndReceiver.h"
+
+#include <stdio.h>
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+extern bool VERBOSE;
+extern TESTINFO TestResults;
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::TestSenderAndReceiver(const std::string& url, const std::string& queueOrTopicName,
+                                             const std::string& headerName, bool isTopic,
+                                             bool isDeliveryPersistent, BrokerMonitor* monitor,
+                                             CountDownLatch* quit, int timeToLive, int receiveTimeout,
+                                             int identifier, bool useThreadPool, int sleep, int seed) :
+    sender(NULL),
+    receiver(NULL),
+    senderThread(NULL),
+    monitor(monitor),
+    header(headerName),
+    closing(false),
+    sendIndex(0),
+    id(identifier),
+    sleep(sleep),
+    seed(seed),
+    quit(quit),
+    random(seed) {
+
+    sender = new Sender(url, queueOrTopicName, isTopic, isDeliveryPersistent, timeToLive);
+    receiver = new Receiver(url, queueOrTopicName, isTopic, monitor, quit, receiveTimeout, useThreadPool);
+    ErrorCode errorCode = CMS_SUCCESS;
+
+    std::string selector("");
+    if (headerName != "") {
+        std::stringstream sID;
+        selector = headerName;
+        selector.append("='");
+        sID << identifier;
+        selector.append(sID.str());
+        selector.append("'");
+    }
+
+    receiver->registerMessageListener(this, errorCode, selector, identifier);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::~TestSenderAndReceiver() {
+    close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::init() {
+    char buffer[512];
+    sprintf(buffer, "TestSender-%d", id);
+    senderThread = new Thread(this, buffer);
+    senderThread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::onMessage(const std::string& message) {
+
+    int index = (int) message.find(";");
+    std::string msg;
+    int thrdidx;
+    int thrdseq, curseq, diffseq;
+
+    if (index <= 0) {
+        // i for invalid message
+        if (VERBOSE) {
+            printf("%c", SYM_BAD_MSG);
+        }
+        TestResults.invalidMessages.incrementAndGet();
+    } else {
+        thrdidx = atoi(message.substr(0, index).c_str());
+        msg = message.substr(index + 1);
+
+        if (thrdidx > (int) TestResults.threadCount) {
+            if (VERBOSE) {
+                printf("%c", SYM_BAD_MSG);
+            }
+            TestResults.invalidMessages.incrementAndGet();
+        } else {
+            index = (int) msg.find(";");
+            if (index <= 0) {
+                if (VERBOSE) {
+                    printf("%c", SYM_BAD_MSG);
+                }
+                TestResults.invalidMessages.incrementAndGet();
+            } else {
+                TestResults.received.incrementAndGet();
+                thrdseq = Integer::parseInt(msg.substr(0, index));
+                msg = msg.substr(index + 1);
+                curseq = TestResults.lastSequence[thrdidx].incrementAndGet();
+                if (thrdseq == curseq) {
+                    if (VERBOSE) {
+                        // Smiley face for good message
+                        printf("%c", SYM_GOOD_SEQ);
+                    }
+                } else {
+                    TestResults.lastSequence[thrdidx].set(thrdseq);
+                    TestResults.badSequenceMessages.incrementAndGet();
+                    if (thrdseq > curseq) {
+                        diffseq = thrdseq - curseq;
+                    } else {
+                        diffseq = curseq - thrdseq;
+                    }
+                    TestResults.sequenceDifferences.addAndGet(diffseq);
+                    if (VERBOSE) {
+                        if ((diffseq > 0) && (diffseq < 10)) {
+                            printf("%d", diffseq);
+                        } else {
+                            printf("%c", SYM_BIG_DIFF);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::run() {
+    ErrorCode errorReturn;
+    int i, j;
+    bool result;
+
+    // Seed the random numbers - time if zero
+    if (seed == 0) {
+        random.setSeed(System::currentTimeMillis());
+    } else {
+        random.setSeed(seed);
+    }
+
+    // If randomizing sleeps - stagger start by up to 1 second
+    if (sleep == -1) {
+        Thread::sleep(random.nextInt(1000));
+    }
+
+    while (!closing) {
+        std::stringstream sID;
+        std::stringstream sSeq;
+        std::stringstream sHdr;
+        std::string message;
+
+        // Add id to messages
+        sID << id;
+        sID >> message;
+        sHdr << id;
+        message.append(";");
+
+        // Add sequence to messages
+        sSeq << sendIndex;
+        message.append(sSeq.str());
+        message.append(";");
+
+        // Add variable payload
+        j = random.nextInt(1024);
+        for (i = 0; i < j; i++) {
+            message += std::string(1, (char) (65 + (random.nextInt(24))));
+        }
+
+        errorReturn = CMS_SUCCESS;
+        sender->SendMessage(message, errorReturn, header, sHdr.str());
+        if (errorReturn == CMS_SUCCESS) {
+            sendIndex++;
+            if (VERBOSE) {
+                printf("%c", SYM_GOOD_SEND);
+            }
+            TestResults.sent.incrementAndGet();
+        } else {
+            if (VERBOSE) {
+                // Exclamation point for error
+                printf("%c", SYM_BAD_SEND);
+            }
+            TestResults.sendErrors.incrementAndGet();
+        }
+
+        if (sleep) {
+            if (sleep == -1) {
+                result = quit->await(random.nextInt(1000));
+            } else {
+                result = quit->await(random.nextInt(sleep));
+            }
+        } else {
+            result = quit->getCount() == 0;
+        }
+
+        if (result) {
+            closing = true;
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::close() {
+    closing = true;
+
+    if (senderThread) {
+        senderThread->join();
+        delete senderThread;
+        senderThread = NULL;
+    }
+
+    if (sender) {
+        delete sender;
+        sender = NULL;
+    }
+
+    if (receiver) {
+        receiver->close();
+        delete receiver;
+        receiver = NULL;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_TESTSENDERANDRECEIVER_H_
+#define _CMS_STRESS_TESTSENDERANDRECEIVER_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+#include <decaf/util/Random.h>
+
+#include "Sender.h"
+#include "Receiver.h"
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+    class TestSenderAndReceiver: public decaf::lang::Runnable,
+                                 public ReceiverListener {
+    private:
+
+        Sender* sender;
+        Receiver* receiver;
+        decaf::lang::Thread* senderThread;
+        BrokerMonitor* monitor;
+        std::string header;
+        bool closing;
+        int sendIndex;
+        int id;
+        int sleep;
+        unsigned int seed;
+        decaf::util::concurrent::CountDownLatch* quit;
+        decaf::util::Random random;
+
+    public:
+
+        TestSenderAndReceiver(const std::string& url, const std::string& queueOrTopicName,
+                              const std::string& headerName, bool isTopic, bool isDeliveryPersistent,
+                              BrokerMonitor *monitor, decaf::util::concurrent::CountDownLatch* quit,
+                              int timeToLive, int receiveTimeout, int identifier,
+                              bool useThreadPool = true, int sleep = -1, int seed = 0);
+
+        virtual ~TestSenderAndReceiver();
+
+        void init();
+
+        virtual void run();
+
+        void close();
+
+        void waitUntilReady();
+
+    public:
+
+        virtual void onMessage(const std::string& message);
+
+    };
+
+    typedef struct {
+        int threadCount;
+        long long startTime;
+        long long endTime;
+        decaf::util::concurrent::atomic::AtomicInteger sent;
+        decaf::util::concurrent::atomic::AtomicInteger received;
+        decaf::util::concurrent::atomic::AtomicInteger sendErrors;
+        decaf::util::concurrent::atomic::AtomicInteger receiveErrors;
+        decaf::util::concurrent::atomic::AtomicInteger invalidMessages;
+        decaf::util::concurrent::atomic::AtomicInteger badSequenceMessages;
+        decaf::util::concurrent::atomic::AtomicInteger sequenceDifferences;
+        decaf::util::concurrent::atomic::AtomicInteger* lastSequence;
+    } TESTINFO;
+
+}}
+
+#endif /** _CMS_STRESS_TESTSENDERANDRECEIVER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message