activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1419576 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress: Receiver.cpp Receiver.h TestSenderAndReceiver.cpp TestSenderAndReceiver.h
Date Mon, 10 Dec 2012 16:40:52 GMT
Author: tabish
Date: Mon Dec 10 16:40:50 2012
New Revision: 1419576

URL: http://svn.apache.org/viewvc?rev=1419576&view=rev
Log:
Clean up example code in preparation for next release. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp?rev=1419576&r1=1419575&r2=1419576&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
Mon Dec 10 16:40:50 2012
@@ -34,44 +34,44 @@ using namespace activemq::cmsutil;
 using namespace cmstemplate;
 
 ////////////////////////////////////////////////////////////////////////////////
-ThreadPoolExecutor* Receiver::m_threadPoolExecutor = NULL;
+ThreadPoolExecutor* Receiver::threadPoolExecutor = NULL;
 
 ////////////////////////////////////////////////////////////////////////////////
 Receiver::Receiver(const string & url, const string & queueOrTopicName,
                    bool isTopic, long long receiveTimeout, bool useThreadPool) :
-        m_url(url),
-        m_mutexForCmsTemplate(),
-        m_mutexGeneral(),
-        m_isClosing(false),
-        m_ready(1),
-        m_messageListener(NULL),
-        m_cmsTemplate(NULL),
-        m_asyncReceiverThread(NULL),
-        m_receiveTimeout(receiveTimeout),
-        m_bUseThreadPool(useThreadPool),
-        m_cmsTemplateCreateTime(0),
-        m_numOfMessagingTasks(0) {
-
-    ConnectionFactory* connectionFactory = ConnectionFactoryMgr::GetConnectionFactory(m_url);
-    m_cmsTemplate = new CmsTemplate(connectionFactory);
-    m_cmsTemplate->setDefaultDestinationName(queueOrTopicName);
-    m_cmsTemplate->setPubSubDomain(isTopic);
-    m_cmsTemplate->setReceiveTimeout(receiveTimeout);
+        url(url),
+        mutexForCmsTemplate(),
+        mutexGeneral(),
+        closing(false),
+        ready(1),
+        messageListener(NULL),
+        cmsTemplate(NULL),
+        asyncReceiverThread(NULL),
+        receiveTimeout(receiveTimeout),
+        bUseThreadPool(useThreadPool),
+        cmsTemplateCreateTime(0),
+        numOfMessagingTasks(0) {
+
+    ConnectionFactory* connectionFactory = ConnectionFactoryMgr::GetConnectionFactory(url);
+    cmsTemplate = new CmsTemplate(connectionFactory);
+    cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+    cmsTemplate->setPubSubDomain(isTopic);
+    cmsTemplate->setReceiveTimeout(receiveTimeout);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Receiver::~Receiver() {
 
     try {
-        m_isClosing = true;
+        closing = true;
 
         //delete m_cmsTemplate
-        m_mutexForCmsTemplate.lock();
-        if (m_cmsTemplate) {
-            delete m_cmsTemplate;
-            m_cmsTemplate = NULL;
+        mutexForCmsTemplate.lock();
+        if (cmsTemplate) {
+            delete cmsTemplate;
+            cmsTemplate = NULL;
         }
-        m_mutexForCmsTemplate.unlock();
+        mutexForCmsTemplate.unlock();
 
         //wait until all outstanding messaging tasks are done
         while (true) {
@@ -87,39 +87,38 @@ Receiver::~Receiver() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::Initialize(int reservedThreads, int maxThreads) {
-    m_threadPoolExecutor = new ThreadPoolExecutor(reservedThreads, maxThreads, 5, TimeUnit::SECONDS,
new LinkedBlockingQueue<Runnable*>());
-    m_threadPoolExecutor->prestartCoreThread();
+    threadPoolExecutor = new ThreadPoolExecutor(reservedThreads, maxThreads, 5, TimeUnit::SECONDS,
new LinkedBlockingQueue<Runnable*>());
+    threadPoolExecutor->prestartCoreThread();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::UnInitialize() {
-    if (m_threadPoolExecutor != NULL) {
+    if (threadPoolExecutor != NULL) {
         try {
-            m_threadPoolExecutor->shutdown();
-            m_threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
-
-        } catch (Exception & ie) {
+            threadPoolExecutor->shutdown();
+            threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
+        } catch (Exception& ie) {
         }
-        delete m_threadPoolExecutor;
-        m_threadPoolExecutor = NULL;
+        delete threadPoolExecutor;
+        threadPoolExecutor = NULL;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::ReceiveMessage(std::string& message, ErrorCode& errorCode, bool retryOnError)
{
-    long long stopRetryTime = System::currentTimeMillis() + m_receiveTimeout;
+    long long stopRetryTime = System::currentTimeMillis() + receiveTimeout;
     errorCode = CMS_SUCCESS;
 
-    if (m_receiveTimeout == 0/*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
+    if (receiveTimeout == 0/*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
         retryOnError = false;
-    } else if (m_receiveTimeout == -1/*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
+    } else if (receiveTimeout == -1/*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
         retryOnError = true;
     }
 
     do {
         long long timeoutForThisLoop;
-        if (m_receiveTimeout <= 0) {
-            timeoutForThisLoop = m_receiveTimeout;
+        if (receiveTimeout <= 0) {
+            timeoutForThisLoop = receiveTimeout;
         } else {
             timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
             if (timeoutForThisLoop <= 0) {
@@ -128,20 +127,20 @@ void Receiver::ReceiveMessage(std::strin
             }
         }
 
-        m_mutexForCmsTemplate.lock();
-        if (m_cmsTemplate) {
-            m_cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
+        mutexForCmsTemplate.lock();
+        if (cmsTemplate) {
+            cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
 
             cms::Message* cmsMessage = NULL;
             try {
-                cmsMessage = m_cmsTemplate->receive();
+                cmsMessage = cmsTemplate->receive();
             } catch (cms::CMSException& ex) {
-                m_mutexForCmsTemplate.unlock();
+                mutexForCmsTemplate.unlock();
                 errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
                 break;
             }
 
-            m_mutexForCmsTemplate.unlock();
+            mutexForCmsTemplate.unlock();
             if (cmsMessage == NULL) {
                 break;
             }
@@ -160,53 +159,53 @@ void Receiver::ReceiveMessage(std::strin
             }
             delete cmsMessage;
         } else {
-            m_mutexForCmsTemplate.unlock();
+            mutexForCmsTemplate.unlock();
         }
     } while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis()
< stopRetryTime);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::WaitUntilReady() {
-    m_ready.await();
+    ready.await();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::RegisterMessageListener(const RecvMessageListener messageListener, ErrorCode&
errorCode) {
     errorCode = CMS_SUCCESS;
 
-    m_mutexGeneral.lock();
+    mutexGeneral.lock();
     if (messageListener == NULL) {
         errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
-        m_mutexGeneral.unlock();
+        mutexGeneral.unlock();
         return;
     }
 
-    if (m_messageListener != NULL) {
+    if (this->messageListener != NULL) {
         errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
-        m_mutexGeneral.unlock();
+        mutexGeneral.unlock();
         return;
     }
 
-    m_messageListener = messageListener;
+    this->messageListener = messageListener;
 
-    m_asyncReceiverThread = new Thread(this, "AsyncReceiver");
-    m_asyncReceiverThread->start();
-    m_mutexGeneral.unlock();
+    asyncReceiverThread = new Thread(this, "AsyncReceiver");
+    asyncReceiverThread->start();
+    mutexGeneral.unlock();
 
     this->WaitUntilReady();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::run() {
-    m_ready.countDown();
-    while (!m_isClosing) {
+    ready.countDown();
+    while (!closing) {
         string message = "";
 
         ErrorCode errorCode = CMS_SUCCESS;
 
         Receiver::ReceiveMessage(message, errorCode, false);
         if (message != "") {
-            if (m_bUseThreadPool) {
+            if (bUseThreadPool) {
                 QueueMessagingTask(message);
             } else {
                 try {
@@ -217,9 +216,9 @@ void Receiver::run() {
         } else {
             if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR)
{
                 long long sleepTime = 0;
-                m_mutexForCmsTemplate.lock();
-                sleepTime = m_cmsTemplate->getReceiveTimeout();
-                m_mutexForCmsTemplate.unlock();
+                mutexForCmsTemplate.lock();
+                sleepTime = cmsTemplate->getReceiveTimeout();
+                mutexForCmsTemplate.unlock();
                 Thread::sleep(sleepTime);
             }
         }
@@ -228,19 +227,19 @@ void Receiver::run() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::QueueMessagingTask(const string& message) {
-    if (message != "" && (!m_isClosing)) {
+    if (message != "" && (!closing)) {
         MessagingTask* task = new MessagingTask(this, message);
-        m_threadPoolExecutor->execute(task);
+        threadPoolExecutor->execute(task);
         IncreaseNumOfMessagingTasks();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::ExecuteMessagingTask(const string& message, bool bDecreaseNumOfMessagingTasks/*=true*/)
{
-    if ((!m_isClosing)) {
-        m_mutexGeneral.lock();
-        RecvMessageListener copy = m_messageListener;
-        m_mutexGeneral.unlock();
+    if ((!closing)) {
+        mutexGeneral.lock();
+        RecvMessageListener copy = messageListener;
+        mutexGeneral.unlock();
         if (copy) {
             (*copy)(message); //listener will release the message and make reference count
0
         }
@@ -263,28 +262,28 @@ bool Receiver::IsMessageExpired(cms::Mes
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::IncreaseNumOfMessagingTasks() {
-    m_mutexGeneral.lock();
-    m_numOfMessagingTasks++;
-    m_mutexGeneral.unlock();
+    mutexGeneral.lock();
+    numOfMessagingTasks++;
+    mutexGeneral.unlock();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::DecreaseNumOfMessagingTasks() {
-    m_mutexGeneral.lock();
-    m_numOfMessagingTasks--;
-    m_mutexGeneral.unlock();
+    mutexGeneral.lock();
+    numOfMessagingTasks--;
+    mutexGeneral.unlock();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long Receiver::GetNumOfMessagingTasks() {
     long numOfMessagingTasks = 0;
-    m_mutexGeneral.lock();
-    numOfMessagingTasks = m_numOfMessagingTasks;
-    m_mutexGeneral.unlock();
+    mutexGeneral.lock();
+    this->numOfMessagingTasks = numOfMessagingTasks;
+    mutexGeneral.unlock();
     return numOfMessagingTasks;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void Receiver::Close() {
-    m_isClosing = true;
+    closing = true;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h?rev=1419576&r1=1419575&r2=1419576&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h Mon
Dec 10 16:40:50 2012
@@ -38,27 +38,19 @@ namespace cmstemplate {
 
     private:
 
-        std::string m_url;
-        decaf::util::concurrent::Mutex m_mutexForCmsTemplate;
-        decaf::util::concurrent::Mutex m_mutexGeneral;
-        bool m_isClosing;
-
-        decaf::util::concurrent::CountDownLatch m_ready;
-
-        RecvMessageListener m_messageListener;
-
-        activemq::cmsutil::CmsTemplate* m_cmsTemplate;
-
-        decaf::lang::Thread* m_asyncReceiverThread;
-
-        long long m_receiveTimeout;
-
-        bool m_bUseThreadPool; //determines if we should use the thread pool to process async
received messages or not
-
-        long long m_cmsTemplateCreateTime;
-
-        static decaf::util::concurrent::ThreadPoolExecutor* m_threadPoolExecutor;
-        long m_numOfMessagingTasks; //number of pending messaging tasks created by this receiver
that has been queued in the threadpool
+        std::string url;
+        decaf::util::concurrent::Mutex mutexForCmsTemplate;
+        decaf::util::concurrent::Mutex mutexGeneral;
+        bool closing;
+        decaf::util::concurrent::CountDownLatch ready;
+        RecvMessageListener messageListener;
+        activemq::cmsutil::CmsTemplate* cmsTemplate;
+        decaf::lang::Thread* asyncReceiverThread;
+        long long receiveTimeout;
+        bool bUseThreadPool;
+        long long cmsTemplateCreateTime;
+        static decaf::util::concurrent::ThreadPoolExecutor* threadPoolExecutor;
+        long numOfMessagingTasks;
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp?rev=1419576&r1=1419575&r2=1419576&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
Mon Dec 10 16:40:50 2012
@@ -40,22 +40,15 @@ void TestSenderAndReceiver::onMessage(co
     }
 }
 
-Sender* m_sender;
-Receiver* m_receiver;
-decaf::lang::Thread* m_senderThread;
-bool m_isClosing;
-int m_sendIndex;
-int m_receiveIndex;
-
 ////////////////////////////////////////////////////////////////////////////////
 TestSenderAndReceiver::TestSenderAndReceiver(const string& url, const string& queueOrTopicName,
bool isTopic,
                                              bool isDeliveryPersistent, int timeToLive, int
receiveTimeout) :
-    m_sender(NULL), m_receiver(NULL), m_senderThread(NULL), m_isClosing(false), m_sendIndex(0),
m_receiveIndex(0) {
+    sender(NULL), receiver(NULL), senderThread(NULL), closing(false), sendIndex(0), receiveIndex(0)
{
 
-    m_sender = new Sender(url, queueOrTopicName, isTopic, isDeliveryPersistent, timeToLive);
-    m_receiver = new Receiver(url, queueOrTopicName, isTopic, receiveTimeout, true);
+    sender = new Sender(url, queueOrTopicName, isTopic, isDeliveryPersistent, timeToLive);
+    receiver = new Receiver(url, queueOrTopicName, isTopic, receiveTimeout, true);
     ErrorCode errorCode = CMS_SUCCESS;
-    m_receiver->RegisterMessageListener(onMessage, errorCode);
+    receiver->RegisterMessageListener(onMessage, errorCode);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -67,8 +60,8 @@ TestSenderAndReceiver::~TestSenderAndRec
 
 ////////////////////////////////////////////////////////////////////////////////
 void TestSenderAndReceiver::init() {
-    m_senderThread = new Thread(this, "TestSenderAndReceiver");
-    m_senderThread->start();
+    senderThread = new Thread(this, "TestSenderAndReceiver");
+    senderThread->start();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -79,9 +72,9 @@ void TestSenderAndReceiver::run() {
 
     j = (int) ((random.nextInt() * 50) / RAND_MAX);
 
-    while (!m_isClosing) {
+    while (!closing) {
         std::stringstream str;
-        str << m_sendIndex;
+        str << sendIndex;
 
         string message;
         str >> message;
@@ -94,9 +87,9 @@ void TestSenderAndReceiver::run() {
         }
 
         errorReturn = CMS_SUCCESS;
-        m_sender->SendMessage(message, errorReturn);
+        sender->SendMessage(message, errorReturn);
         if (errorReturn == CMS_SUCCESS) {
-            m_sendIndex++;
+            sendIndex++;
         } else {
             // Exclamation point for error
             printf("%c", 33);
@@ -109,20 +102,23 @@ void TestSenderAndReceiver::run() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void TestSenderAndReceiver::close() {
-    if (!m_isClosing) {
-        m_isClosing = true;
+    if (!closing) {
+        closing = true;
 
-        if (m_senderThread) {
-            m_senderThread->join();
-            delete m_senderThread;
-            m_senderThread = NULL;
+        if (senderThread) {
+            senderThread->join();
+            delete senderThread;
+            senderThread = NULL;
         }
 
-        delete m_sender;
-        m_sender = NULL;
+        delete sender;
+        sender = NULL;
+
+        try {
+            receiver->Close();
+        } catch(Exception& ex) {}
 
-        m_receiver->Close();
-        delete m_receiver;
-        m_receiver = NULL;
+        delete receiver;
+        receiver = NULL;
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h?rev=1419576&r1=1419575&r2=1419576&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h
Mon Dec 10 16:40:50 2012
@@ -31,12 +31,12 @@ namespace cmstemplate {
     class TestSenderAndReceiver : public decaf::lang::Runnable {
     private:
 
-        Sender* m_sender;
-        Receiver* m_receiver;
-        decaf::lang::Thread* m_senderThread;
-        bool m_isClosing;
-        int m_sendIndex;
-        int m_receiveIndex;
+        Sender* sender;
+        Receiver* receiver;
+        decaf::lang::Thread* senderThread;
+        bool closing;
+        int sendIndex;
+        int receiveIndex;
 
         static void DECAF_STDCALL onMessage(const std::string& message);
 
@@ -59,6 +59,7 @@ namespace cmstemplate {
         void close();
 
         void waitUntilReady();
+
     };
 }
 



Mime
View raw message