activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oscar Pernas <os...@pernas.es>
Subject Re: AMQCPP: unexpected exception
Date Tue, 01 Mar 2011 23:43:21 GMT
Hi,

I ran into this bug before. If I am shutting down and starting the broker so
many times, connection are not reestablished and the consumer gets frozen.
If you try to close the consumer that is frozen, the closing stucks your
thread too? In my case, this problem only happens when I'm debugging the
application (without breaks but with visual studio debug).
I've partially resolved it manually, if I dont received anything over a
topic in a time, I try to close the consumer and reconnect it again.

regards

2011/3/2 Timothy Bish <tabish121@gmail.com>

> On Tue, 2011-03-01 at 20:22 -0300, Henrique Magarotto wrote:
> > Hi everybody,
> >
> > First, sorry for long mail and thanks for great job in ActiveMQ and
> > ActiveMQ-CPP.
> >
> > I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP.
> >
> > In my test, an unexpected exception is thrown if broker is stopped
> > while consumer is in transaction.
> >
> > Environment:
> > ActiveMQ 5.4.2
> > ActiveMQ-CPP 3.2.4
> > Ubuntu 10.04
> > JRE 1.6.0_20-b02
> >
>
> > I get the following message:
> >
> > terminate called after throwing an instance of 'cms::CMSException'
> >   what():  Failover timeout of 1000 ms reached.
> > Aborted (core dumped)
> >
>
> Sounds like a bug, can you open a new issue in are Jira bug tracker and
> attach your test case along with instructions for reproducing the error?
>
> https://issues.apache.org/jira/browse/AMQCPP
>
> Regards
> Tim.
>
> > core backtrace:
> > #0  0x001ef422 in __kernel_vsyscall ()
> > #1  0x00c5a651 in *__GI_raise (sig=6) at
> > ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> > #2  0x00c5da82 in *__GI_abort () at abort.c:92
> > #3  0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from
> > /usr/lib/libstdc++.so.6
> > #4  0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6
> > #5  0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6
> > #6  0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6
> > #7  0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6
> > #8  0x0052f8ae in
> > activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548)
> > at activemq/core/ActiveMQConsumer.cpp:84
> > #9  0x00550588 in
> > activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48)
> > at activemq/core/ActiveMQTransactionContext.cpp:192
> > #10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit
> > (this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127
> > #11 0x0053e460 in activemq::core::ActiveMQSession::commit
> > (this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189
> > #12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit
> > (this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87
> > #13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30,
> > message=0x93b49b0) at main.cpp:455
> > #14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415
> > #15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381
> > #16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback
> > (properties=0x93abff0) at decaf/lang/Thread.cpp:135
> > #17 0x00822847 in threadWorker (arg=0x93abff0) at
> decaf/lang/Thread.cpp:188
> > #18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300
> > #19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> >
> >
> > Exception specification for method
> > 'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw(
> > exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge'
> > throws 'cms::CMSException', so 'std::unexpected' is called and 'abort'
> > occurs. (
> http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil)
> >
> >
> > When 'MessageListener' is used, the commit freezes, probability, in
> thread 5:
> >
> > Thread 1:
> > #0  0x00cfd422 in __kernel_vsyscall ()
> > #1  0x00ddcb5d in pthread_join (threadid=3061869424,
> > thread_return=0xbf964bdc) at pthread_join.c:89
> > #2  0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at
> > decaf/lang/Thread.cpp:421
> > #3  0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586
> > #4  0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555
> > #5  0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633
> >
> > Thread 2:
> > #0  0x00cfd422 in __kernel_vsyscall ()
> > #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
> > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
> > #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38,
> > mutex=0x8eb0e08) at forward.c:139
> > #3  0x00a606a9 in
> > decaf::internal::util::concurrent::ConditionImpl::wait
> > (condition=0x8eb0e38) at
> > decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
> > #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
> > (this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95
> > #5  0x007fc9c5 in activemq::threads::CompositeTaskRunner::run
> > (this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118
> > #6  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> > (properties=0x8eb0e90) at decaf/lang/Thread.cpp:135
> > #7  0x00a92847 in threadWorker (arg=0x8eb0e90) at
> decaf/lang/Thread.cpp:188
> > #8  0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300
> > #9  0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> >
> > Thread 3:
> > #0  0x00cfd422 in __kernel_vsyscall ()
> > #1  0x00de2af9 in __lll_lock_wait () at
> > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
> > #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
> > #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at
> > pthread_mutex_lock.c:61
> > #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182
> > #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
> > (handle=0x8ebb3a8) at
> > decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
> > #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
> > (this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75
> > #7  0x007c5770 in
> >
> decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch,
> > decaf::util::concurrent::atomic::AtomicRefCounter> >::lock
> > (this=0x8ebb2a8)
> >     at ./decaf/util/StlQueue.h:253
> > #8  activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at
> > activemq/core/MessageDispatchChannel.h:153
> > #9  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
> > (this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54
> > #10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8,
> > intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
> > #11 0x0078e102 in
> > activemq::core::ActiveMQConsumer::clearMessagesInProgress
> > (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112
> > #12 0x007af15c in
> > activemq::core::ActiveMQSession::clearMessagesInProgress
> > (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239
> > #13 0x00778543 in
> > activemq::core::ActiveMQConnection::transportInterrupted
> > (this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704
> > #14 0x00803b34 in
> > activemq::transport::TransportFilter::transportInterrupted
> > (this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67
> > #15 0x008187ae in
> > activemq::transport::failover::FailoverTransport::handleTransportFailure
> > (this=0x8eb0a38, error=...) at
> > activemq/transport/failover/FailoverTransport.cpp:476
> > #16 0x0082406c in
> > activemq::transport::failover::FailoverTransportListener::onException
> > (this=0x8eb0878, ex=...) at
> > activemq/transport/failover/FailoverTransportListener.cpp:97
> > #17 0x00803c0b in activemq::transport::TransportFilter::fire
> > (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49
> > #18 0x00803c64 in activemq::transport::TransportFilter::onException
> > (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41
> > #19 0x00803c0b in activemq::transport::TransportFilter::fire
> > (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49
> > #20 0x00803c64 in activemq::transport::TransportFilter::onException
> > (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41
> > #21 0x00801b13 in activemq::transport::IOTransport::fire
> > (this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73
> > #22 0x008023bf in activemq::transport::IOTransport::run
> > (this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246
> > #23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> > (properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135
> > #24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at
> decaf/lang/Thread.cpp:188
> > #25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300
> > #26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> >
> > Thread 4:
> > #0  0x00cfd422 in __kernel_vsyscall ()
> > #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
> > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
> > #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8,
> > mutex=0x8eba698) at forward.c:139
> > #3  0x00a606a9 in
> > decaf::internal::util::concurrent::ConditionImpl::wait
> > (condition=0x8eba6c8) at
> > decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
> > #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
> > (this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95
> > #5  0x00ad029a in decaf::util::concurrent::CountDownLatch::await
> > (this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56
> > #6  0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574,
> > timeOut=-1) at main.cpp:226
> > #7  0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379
> > #8  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> > (properties=0x8eba5b0) at decaf/lang/Thread.cpp:135
> > #9  0x00a92847 in threadWorker (arg=0x8eba5b0) at
> decaf/lang/Thread.cpp:188
> > #10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300
> > #11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> >
> > Thread 5:
> > #0  0x00cfd422 in __kernel_vsyscall ()
> > #1  0x00de2af9 in __lll_lock_wait () at
> > ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
> > #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
> > #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at
> > pthread_mutex_lock.c:61
> > #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182
> > #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
> > (handle=0x8eb05e0) at
> > decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
> > #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
> > (this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75
> > #7  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
> > (this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54
> > #8  0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0,
> > intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
> > #9  0x0081b8fd in
> > activemq::transport::failover::FailoverTransport::oneway
> > (this=0x8eb0a38, command=...) at
> > activemq/transport/failover/FailoverTransport.cpp:186
> > #10 0x00807f9f in
> > activemq::transport::correlator::ResponseCorrelator::oneway
> > (this=0x8eb11c0, command=...) at
> > activemq/transport/correlator/ResponseCorrelator.cpp:82
> > #11 0x007713be in activemq::core::ActiveMQConnection::oneway
> > (this=0x8eb1210, command=...) at
> > activemq/core/ActiveMQConnection.cpp:741
> > #12 0x007b0a4f in activemq::core::ActiveMQSession::oneway
> > (this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903
> > #13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge
> > (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860
> > #14 0x0079f885 in
> > activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8)
> > at activemq/core/ActiveMQConsumer.cpp:85
> > #15 0x007c0588 in
> > activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8)
> > at activemq/core/ActiveMQTransactionContext.cpp:192
> > #16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit
> > (this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127
> > #17 0x007ae460 in activemq::core::ActiveMQSession::commit
> > (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189
> > #18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit
> > (this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87
> > #19 0x0804c0fb in Consumer::onMessage (this=0x8eba570,
> > message=0x8ebbec0) at main.cpp:455
> > #20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch
> > (this=0x8ebb270, dispatch=...) at
> > activemq/core/ActiveMQConsumer.cpp:1018
> > #21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch
> > (this=0x8ebae78, dispatch=...) at
> > activemq/core/ActiveMQSessionExecutor.cpp:129
> > #22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate
> > (this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166
> > #23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run
> > (this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111
> > #24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
> > (properties=0x8ebaad8) at decaf/lang/Thread.cpp:135
> > #25 0x00a92847 in threadWorker (arg=0x8ebaad8) at
> decaf/lang/Thread.cpp:188
> > #26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300
> > #27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
> >
> >
> >
> > I tried to resolve this issue changing configurations and code, but i
> > always get an error or client freezes.
> > Is it a bug? Any suggestions?
> >
> > Thanks.
> > Henrique
> >
> > How to reproduce:
> > 1. Start ActiveMQ
> > 2. Send a massage './activemqTest -p1 -n1'
> > 3. Run consumer './activemqTest -c1 -d10000'
> > 4. Stop activemq when 'Starting delay...' message appears.
> > 5. After activemq is down and delay is finish, consumer try commit
> > (message 'Try commit') and unexpected exception occurs.
> >
> > Following test code, based on ActiveMQ-CPP sample
> > 'activemq-cpp-library-3.2.4/src/examples/main.cpp':
> >
> >
> > /*
> >  * Licensed to the Apache Software Foundation (ASF) under one or more
> >  * contributor license agreements.  See the NOTICE file distributed with
> >  * this work for additional information regarding copyright ownership.
> >  * The ASF licenses this file to You under the Apache License, Version
> 2.0
> >  * (the "License"); you may not use this file except in compliance with
> >  * the License.  You may obtain a copy of the License at
> >  *
> >  *     http://www.apache.org/licenses/LICENSE-2.0
> >  *
> >  * Unless required by applicable law or agreed to in writing, software
> >  * distributed under the License is distributed on an "AS IS" BASIS,
> >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> >  * See the License for the specific language governing permissions and
> >  * limitations under the License.
> >  */
> >
> > // START SNIPPET: demo
> >
> > #include <activemq/library/ActiveMQCPP.h>
> > #include <activemq/cmsutil/SessionPool.h>
> > #include <activemq/cmsutil/PooledSession.h>
> > #include <activemq/cmsutil/ResourceLifecycleManager.h>
> > #include <activemq/exceptions/ActiveMQException.h>
> > #include <decaf/lang/Thread.h>
> > #include <decaf/lang/Runnable.h>
> > #include <decaf/util/concurrent/CountDownLatch.h>
> > #include <decaf/util/concurrent/Mutex.h>
> > #include <decaf/lang/Integer.h>
> > #include <decaf/lang/Long.h>
> > #include <decaf/lang/System.h>
> > #include <activemq/core/ActiveMQConnectionFactory.h>
> > #include <activemq/util/Config.h>
> > #include <cms/Connection.h>
> > #include <cms/Session.h>
> > #include <cms/TextMessage.h>
> > #include <cms/BytesMessage.h>
> > #include <cms/MapMessage.h>
> > #include <cms/ExceptionListener.h>
> > #include <cms/MessageListener.h>
> > #include <iostream>
> > #include <memory>
> > #include <map>
> > #include <vector>
> > #include <unistd.h>
> >
> > using namespace activemq::core;
> > using namespace activemq::cmsutil;
> > using namespace decaf::util::concurrent;
> > using namespace decaf::util;
> > using namespace decaf::lang;
> > using namespace cms;
> >
> >
> > class ConnPoolManager : protected decaf::util::concurrent::Mutex,
> >                         public ExceptionListener {
> > private:
> >     typedef std::map< int, activemq::cmsutil::SessionPool* >
> tSessionPoolMap;
> >
> >     std::string _brokerURI;
> >     ConnectionFactory* _connectionFactory;
> >     Connection* _connection;
> >     ResourceLifecycleManager* _lifecycleManager;
> >     tSessionPoolMap _pools;
> >     bool _started;
> >
> > public:
> >     ConnPoolManager() :
> >             _brokerURI("tcp://localhost:61616"),
> >             _connectionFactory(NULL),
> >             _connection(NULL),
> >             _lifecycleManager(NULL),
> >             _started(false)
> >     {
> >     }
> >
> >     virtual ~ConnPoolManager() {
> >         cleanup();
> >     }
> >
> >     void setBrokerURI(const std::string& uri) {
> >         _brokerURI = uri;
> >     }
> >
> >     bool startPools() {
> >         synchronized(this) {
> >             if(_started) return true;
> >             try {
> >                 createConnection();
> >                 createPools();
> >                 _started=true;
> >                 return true;
> >             } catch(CMSException& e) {
> >                 std::cout<<"(ConnPoolManager::startPools)
> > CMSException: " << e.getMessage() << std::endl;
> >             } catch(...) {
> >                 std::cout<<"(ConnPoolManager::startPools) Unknown
> > exception!!!" << std::endl;
> >             }
> >         }
> >         cleanup();
> >         return false;
> >     }
> >
> >     PooledSession* getSession(cms::Session::AcknowledgeMode ack) {
> >         if(_started) {
> >             return _pools[ack]->takeSession();
> >         } else if(startPools()) {
> >             return _pools[ack]->takeSession();
> >         } else {
> >             return NULL;
> >         }
> >     }
> >
> > private:
> >
> >     void createConnection(){
> >         // Create a ConnectionFactory
> >         _connectionFactory =
> > ConnectionFactory::createCMSConnectionFactory( _brokerURI );
> >
> >         // Create a Connection
> >         _connection = _connectionFactory->createConnection();
> >         _connection->start();
> >         _connection->setExceptionListener(this);
> >     }
> >
> >     void createPools(){
> >         // Create lifecycle manager
> >         _lifecycleManager = new
> activemq::cmsutil::ResourceLifecycleManager();
> >
> >         // Create pools, one for each acknowledge type
> >         for(    int ack = cms::Session::AUTO_ACKNOWLEDGE;
> >                 ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE;
> >                 ack++)
> >         {
> >             _pools[ack] =  new activemq::cmsutil::SessionPool(
> >                     _connection,
> >                     static_cast<cms::Session::AcknowledgeMode>(ack),
> >                     _lifecycleManager);
> >         }
> >     }
> >
> >     void cleanup() {
> >         synchronized(this) {
> >             releasePools();
> >             if(_lifecycleManager!=NULL) {
> >                 try { delete _lifecycleManager;
> >                 } catch(...) {}
> >             } _lifecycleManager = NULL;
> >
> >             if(_connection!=NULL) {
> >                 try { delete _connection;
> >                 } catch(...) {}
> >             } _connection = NULL;
> >             if(_connectionFactory!=NULL) {
> >                 try { delete _connectionFactory;
> >                 } catch(...) {}
> >             } _connectionFactory = NULL;
> >             _started=false;
> >         }
> >     }
> >
> >     void releasePools() {
> >         for(tSessionPoolMap::iterator it=_pools.begin();
> > it!=_pools.end(); it++) {
> >             try {
> >                 delete it->second;
> >             } catch(...) { }
> >         }
> >         _pools.clear();
> >     }
> >
> >     // If something bad happens you see it here as this class is also
> been
> >     // registered as an ExceptionListener with the connection.
> >     virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
> >         std::cout<<"(ConnPoolManager::onException) CMS Exception
> > occurred.  Shutting down client."<<std::endl;
> >         ex.printStackTrace();
> >         exit(1);
> >     }
> >
> >
> >
> > };
> >
> > class JMSEndPointThread  : public Thread {
> > private:
> >     CountDownLatch _stopNow;
> >     ConnPoolManager* _pools;
> >     std::string _destinationString;
> >     PooledSession* _session;
> >     Destination* _destination;
> >     bool _sessionTransacted;
> >
> > protected:
> >
> >     JMSEndPointThread(  ConnPoolManager* pools,
> >                         const std::string& dest,
> >                         bool sessionTransacted) :
> >             _stopNow(1),
> >             _pools(pools),
> >             _destinationString(dest),
> >             _session(NULL),
> >             _destination(NULL),
> >             _sessionTransacted(sessionTransacted)
> >     {
> >     }
> >
> >     virtual bool prepareSession() {
> >         cleanup();
> >         // Create a Session
> >         if( _sessionTransacted ) {
> >             _session = _pools->getSession(Session::SESSION_TRANSACTED);
> >         } else {
> >             _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE);
> >         }
> >         if(_session==NULL) return false;
> >
> >         // Create the Queue destination
> >         _destination = _session->createQueue( _destinationString );
> >         return true;
> >     }
> >
> >     PooledSession* getSession() { return _session; }
> >     Destination* getDestination() { return _destination; }
> >     bool getTransacted() { return _sessionTransacted; }
> >
> >     bool stopCheck(int timeOut=0) {
> >         if(timeOut>=0) return _stopNow.await(timeOut);
> >         else _stopNow.await();
> >         return true;
> >     }
> >
> >     virtual void cleanup() {
> >         // Destroy resources.
> >         try{
> >             if( _destination != NULL ) delete _destination;
> >         } catch(...) { }
> >         _destination = NULL;
> >
> >         // Back session to pool
> >         try{
> >             if( _session != NULL ) _session->close();
> >         } catch(...) { }
> >         _session = NULL;
> >     }
> >
> > public:
> >
> >     typedef std::vector<JMSEndPointThread*> List;
> >
> >     virtual ~JMSEndPointThread() {
> >         cleanup();
> >     }
> >
> >     void stopNow() {
> >         _stopNow.countDown();
> >     }
> >
> > };
> >
> > class Producer : public JMSEndPointThread {
> > private:
> >
> >     MessageProducer* _producer;
> >     int _delay;
> >     int _numMessages;
> >
> > public:
> >
> >     Producer(   ConnPoolManager* pools,
> >                 const std::string& dest,
> >                 int delay,
> >                 int numMessages,
> >                 bool sessionTransacted = false ) :
> >         JMSEndPointThread(pools,dest,sessionTransacted)
> >     {
> >         _producer = NULL;
> >         _delay = delay;
> >         _numMessages = numMessages;
> >     }
> >
> >     virtual ~Producer() {
> >         cleanup();
> >     }
> >
> >     virtual void run() {
> >         std::cout<<"Producer started!!!! Thread:
> "<<Thread::getId()<<std::endl;
> >         while(!stopCheck() && _numMessages) {
> >             try {
> >                 if(prepareProducer()) {
> >                     sendMessages();
> >                     continue;
> >                 }
> >             } catch(CMSException& e) {
> >                 std::cout<<"(Producer::run) CMSException: " <<
> > e.getMessage() << std::endl;
> >             } catch(...) {
> >                 std::cout<<"(Producer::run) Unknown exception!!!" <<
> std::endl;
> >             }
> >             cleanup();
> >             stopCheck(5000); // reconnect delay
> >         }
> >         std::cout<<"Producer end!!!! Thread:
> "<<Thread::getId()<<std::endl;
> >         cleanup();
> >     }
> >
> > private:
> >
> >     bool prepareProducer() {
> >         // Get Session
> >         if(!prepareSession()) return false;
> >         // Create a MessageProducer from the Session to the Queue
> >         _producer = getSession()->createProducer( getDestination() );
> >         _producer->setDeliveryMode( DeliveryMode::PERSISTENT );
> >         return true;
> >     }
> >     void sendMessages() {
> >         // Create a messages
> >         std::stringstream text;
> >         text<<"Hello world! from thread " << Thread::getId();
> >
> >         while(!stopCheck() && _numMessages) {
> >
> >             TextMessage* message = getSession()->createTextMessage(
> > text.str() );
> >             message->setIntProperty( "Integer", _numMessages );
> >
> >             // Tell the producer to send the message
> >             std::cout<<"Sent message #"<<_numMessages<<" from
thread
> > "<<Thread::getId()<<std::endl;
> >             _producer->send( message );
> >             delete message;
> >             --_numMessages;
> >
> >             stopCheck(_delay);
> >             if(getTransacted()) getSession()->commit();
> >         }
> >     }
> >
> >     virtual void cleanup() {
> >         try{
> >             if( _producer != NULL ) delete _producer;
> >         } catch(...) { }
> >         _producer = NULL;
> >
> >         JMSEndPointThread::cleanup();
> >     }
> >
> > };
> >
> >
> > class Consumer :    public MessageListener,
> >                     public JMSEndPointThread {
> >
> > private:
> >
> >     MessageConsumer* _consumer;
> >     int _delay;
> >     bool _useListener;
> >
> > public:
> >
> >     Consumer(   ConnPoolManager* pools,
> >                 const std::string& dest,
> >                 int delay,
> >                 bool useListener = true,
> >                 bool sessionTransacted = true ) :
> >         JMSEndPointThread(pools,dest,sessionTransacted)
> >     {
> >         _consumer = NULL;
> >         _delay = delay;
> >         _useListener = useListener;
> >     }
> >     virtual ~Consumer(){
> >         cleanup();
> >     }
> >
> >     virtual void run() {
> >         std::cout<<"Consumer started!!!! Thread:
> "<<Thread::getId()<<std::endl;
> >         while(!stopCheck()) {
> >             try {
> >                 if(prepareConsumer()) {
> >                     // Wait while asynchronous messages come in.
> >                     if(_useListener) {
> >                         stopCheck(-1);
> >                     } else {
> >                         consumeLoop();
> >                     }
> >                     continue;
> >                 }
> >             } catch (CMSException& e) {
> >                 std::cout<<"(Consumer::run) CMSException: " <<
> > e.getMessage() << std::endl;
> >             } catch(...) {
> >                 std::cout<<"(Consumer::run) Unknown exception!!!" <<
> std::endl;
> >             }
> >             cleanup();
> >             stopCheck(5000); // reconnect delay
> >         }
> >         std::cout<<"Consumer end!!!! Thread:
> "<<Thread::getId()<<std::endl;
> >         cleanup();
> >     }
> >
> >
> > private:
> >
> >     bool prepareConsumer() {
> >         // Get Session
> >         if(!prepareSession()) return false;
> >         // Create a MessageConsumer from the Session to the Queue
> >         _consumer = getSession()->createConsumer( getDestination() );
> >         if(_useListener)
> >             _consumer->setMessageListener( this );
> >
> >         return true;
> >     }
> >
> >     void consumeLoop() {
> >         while(!stopCheck()) {
> >             Message *msg = _consumer->receive(500);
> >             if(msg!=NULL) {
> >                 onMessage(msg);
> >                 delete msg;
> >             }
> >         }
> >     }
> >
> >     // Called from the consumer since this class is a registered
> > MessageListener.
> >     virtual void onMessage( const Message* message ) {
> >
> >         static int count = 0;
> >
> >         try
> >         {
> >             count++;
> >             const TextMessage* textMessage =
> >                 dynamic_cast< const TextMessage* >( message );
> >             std::string text = "";
> >
> >             if( textMessage != NULL ) {
> >                 text = textMessage->getText();
> >             } else {
> >                 text = "NOT A TEXTMESSAGE!";
> >             }
> >
> >             std::cout<<"Message #"<<count<<" Received:
> "<<text<<std::endl;
> >             if(_delay) {
> >                 std::cout<<"Starting delay..."<<std::endl;
> >                 stopCheck(_delay);
> >             }
> >
> >         } catch (CMSException& e) {
> >             std::cout<<"(Consumer::onMessage) CMSException: " <<
> > e.getMessage() << std::endl;
> >         } catch(...) {
> >             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
> > std::endl;
> >         }
> >
> >         try
> >         {   // Commit all messages.
> >             if( getTransacted() ) {
> >                 std::cout<<"Try commit....."<<std::endl;
> >                 getSession()->commit();
> >                 std::cout<<"Commit OK!!!"<<std::endl;
> >             }
> >         } catch (CMSException& e) {
> >             std::cout<<"(Consumer::onMessage) CMSException: " <<
> > e.getMessage() << std::endl;
> >         } catch(...) {
> >             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
> > std::endl;
> >         }
> >
> >     }
> >
> >     virtual void cleanup() {
> >         try{
> >             if( _consumer != NULL ) delete _consumer;
> >         } catch (...) { }
> >         _consumer = NULL;
> >         JMSEndPointThread::cleanup();
> >     }
> > };
> >
> >
> > class ScopedActiveMQLibrary {
> > public:
> >     ScopedActiveMQLibrary() {
> >         activemq::library::ActiveMQCPP::initializeLibrary();
> >     }
> >     virtual ~ScopedActiveMQLibrary() {
> >         activemq::library::ActiveMQCPP::shutdownLibrary();
> >     }
> > };
> >
> > class AppTest : private ScopedActiveMQLibrary {
> > private:
> >     int _consumer;
> >     int _producer;
> >     int _numMessages;
> >     bool _useListener;
> >     int _delay;
> >     ConnPoolManager _poolManager;
> >     JMSEndPointThread::List _endPointList;
> >
> >     static AppTest* _app;
> >     static CountDownLatch _terminationRequest;
> >
> > public:
> >     AppTest(int argc, char** argv) :
> >             _consumer(0),
> >             _producer(0),
> >             _numMessages(1),
> >             _useListener(false),
> >             _delay(0)
> >     {
> >         _app=this;
> >         int c;
> >         while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) {
> >             switch(c) {
> >             case 'l': _useListener=true; break;
> >             case 'c': _consumer=atoi(optarg); break;
> >             case 'p': _producer=atoi(optarg); break;
> >             case 'n': _numMessages=atoi(optarg); break;
> >             case 'd': _delay=atoi(optarg); break;
> >             }
> >         }
> >         if(_consumer<0) _consumer=0;
> >         if(_producer<0) _producer=0;
> >         _poolManager.setBrokerURI(
> >                 "failover://("
> >                 "tcp://localhost:61616"
> >                 "?transport.useInactivityMonitor=false"
> >                 ")?timeout=1000"
> >                 "&cms.RedeliveryPolicy.maximumRedeliveries=-1"
> >                 );
> >         _poolManager.startPools();
> >     }
> >
> >     virtual ~AppTest() {
> >     }
> >
> >     int run() {
> >
> >         // Install signal handler
> >         if(!installSigAction())
> >             return 1;
> >
> >         // Create consumer/producer objects
> >         loadEndPoins();
> >
> >         // Start the producer/consumer thread.
> >         startEndPoins();
> >
> >         // start another tasks
> >         // ...
> >
> >         // Wait for termination request
> >         /*if(_consumer) {
> >             _terminationRequest.await();
> >             stopEndPoins();
> >         }*/
> >
> >         // join threads
> >         joinEndPoins();
> >
> >         return 0;
> >     }
> >
> > private:
> >
> >     void loadEndPoins() {
> >         for(int i=0; i<_consumer;i++) {
> >             _endPointList.push_back( new
> > Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) );
> >         }
> >         for(int i=0; i<_producer;i++) {
> >             _endPointList.push_back( new
> > Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) );
> >         }
> >     }
> >
> >     void startEndPoins() {
> >         for(JMSEndPointThread::List::iterator it=_endPointList.begin();
> >             it!=_endPointList.end(); it++ )
> >             (**it).start();
> >     }
> >
> >     void stopEndPoins() {
> >         for(JMSEndPointThread::List::reverse_iterator
> it=_endPointList.rbegin();
> >             it!=_endPointList.rend(); it++ )
> >             (**it).stopNow();
> >     }
> >
> >     void joinEndPoins() {
> >         for(JMSEndPointThread::List::reverse_iterator
> it=_endPointList.rbegin();
> >             it!=_endPointList.rend(); it++ ) {
> >             (**it).join();
> >             delete *it;
> >             *it = NULL;
> >         }
> >         _endPointList.clear();
> >     }
> >
> >     static void signalHandler(int sig) {
> >         _app->_terminationRequest.countDown();
> >         _app->stopEndPoins();
> >     }
> >
> >     bool installSigAction() {
> >         struct sigaction action;
> >         memset(&action, 0, sizeof(action));
> >         action.sa_handler = signalHandler;
> >         if( sigaction(SIGTERM, &action, NULL)<0 ||
> >             sigaction(SIGQUIT, &action, NULL)<0 ||
> >             sigaction(SIGINT,  &action, NULL)<0 )
> >         { return false; }
> >         return true;
> >     }
> >
> >     int termWait() {
> >         sigset_t sset;
> >         sigemptyset(&sset);
> >         sigaddset(&sset, SIGINT);
> >         sigaddset(&sset, SIGQUIT);
> >         sigaddset(&sset, SIGTERM);
> >         sigprocmask(SIG_BLOCK, &sset, NULL);
> >         int sig;
> >         sigwait(&sset, &sig);
> >         return sig;
> >     }
> >
> > };
> > AppTest* AppTest::_app = NULL;
> > CountDownLatch AppTest::_terminationRequest(1);
> >
> >
> > void my_unexpected() {
> >     throw activemq::exceptions::ActiveMQException();
> > }
> >
> > int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
> >     //std::set_unexpected(my_unexpected);
> >     AppTest test(argc, argv);
> >     return test.run();
> > }
> >
> > // END SNIPPET: demo
>
> --
> Tim Bish
> ------------
> FuseSource
> Email: tim.bish@fusesource.com
> Web: http://fusesource.com
> Twitter: tabish121
> Blog: http://timbish.blogspot.com/
>
>
>


-- 
Óscar Pernas Plaza.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message