activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Rocha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQCPP-413) Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control.
Date Wed, 11 Jul 2012 18:07:35 GMT

    [ https://issues.apache.org/jira/browse/AMQCPP-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13411788#comment-13411788 ] 

John Rocha commented on AMQCPP-413:
-----------------------------------

We're currently using AMQ Broker 5.5.0. I'm going to try to upgrade to 5.6.0 and see if this resolves the consumer lockup problems. It looks like there are many cases for AMQ Broker 5.6.0 relating to consumers not getting their messages.
                
> Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control.
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQCPP-413
>                 URL: https://issues.apache.org/jira/browse/AMQCPP-413
>             Project: ActiveMQ C++ Client
>          Issue Type: Bug
>          Components: CMS Impl
>    Affects Versions: 3.4.0, 3.4.4
>         Environment: Linux
>            Reporter: John Rocha
>            Assignee: Timothy Bish
>            Priority: Critical
>              Labels: cpp, producer_flow_control
>
> Producer connection that cause the broker to reach its memory/disk limits
> doesn't get the 'all full' exception even though the broker is configured to
> send it for Producer Flow Control.
> +Scenario #1+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the default sendTimeot of zero(0), it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will lock up and never recovers.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception.
> +Scenario #2+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the sendTimeot of 500 ms, it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will cause the broker to reach it's limit. And then the send method will start timing out. It never gets a 'broker full' exception.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #1
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:#  /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K     /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#  
> ##
> ## View the activemq.xml configuration file used for startint active MQ
> ##
> root@psbu-jrr-lnx:#  cat /usr/BWhttpd/tomcat/webapps/amqbroker/WEB-INF/classes/conf/activemq.xml
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
>   http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
>   http://activemq.apache.org/schema/core 
>   http://activemq.apache.org/schema/core/activemq-core-5.3.2.xsd"
>   default-autowire="byName">
>   <!-- Allows to use system properties as variables in this configuration file -->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>     </bean>
>     <broker xmlns="http://activemq.apache.org/schema/core"
>             brokerName="localhost"
>             advisorySupport="true"
>             dataDirectory="${catalina.home}/activemq-data"
>             useJmx="false"
>             useShutdownHook="false">
>         <!-- Destination specific policies using destination names or wildcards
>         -->
>         <destinationPolicy>
>             <policyMap>
>                 <policyEntries>
>                     <policyEntry queue=">" memoryLimit="5mb" />
>                     <policyEntry topic=">" memoryLimit="5mb" />
>                 </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <systemUsage>
>             <systemUsage sendFailIfNoSpace="true">
>                 <memoryUsage>
>                     <memoryUsage limit="5 mb" />
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="4 mb" />
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="1 mb" />
>                 </tempUsage>
>                 </systemUsage>
>         </systemUsage>
>         <!-- The transport connectors ActiveMQ will listen to -->
>         <transportConnectors>
>             <transportConnector name="tcp"
>              uri="tcp://0.0.0.0:61616?wireFormat.maxInactivityDuration=0" />
>         </transportConnectors>
>     </broker>
> </beans>
> ##
> ## View the runtime environment to validate the library is 3.4.4, the latests.
> ## I cannot explain why the number is 14.0.4, but I observed that 3.4.0 used
> ## 14.0.0
> ##
> 242(TEST)jrr@[SUSE10.1]>  ls $LD_LIBRARY_PATH/libactive*
> /usr/BWhttpd/lib//libactivemq-cpp.so*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14.0.4*
> 243(TEST)jrr@[SUSE10.1]>  
> ##
> ## Compile the simple producer
> ##
> Compiling simple_producer.o
> g++ -g -c -MD -Wall -Werror -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/activemq-cpp/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/apr-1  simple_producer.cpp
> g++ -o simple_producer.exe simple_producer.o -lc -lrt \
>     -lactivemq-cpp \
>     -lboost_thread \
>     -L /usr/BWhttpd/lib
> Compilation finished at Mon Jul  9 15:32:48
> ##
> ## Execute the test program. It creates a connection and then loops forever
> ## sending messages.  Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## It locks up at 1775 messages
> ##
> 257(TEST)jrr@[SUSE10.1]>  simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Sending message #2
> Sending message #3
> Sending message #4
> Sending message #5
> ...
> ...
> ...
> Sending message #1772
> Sending message #1773
> Sending message #1774
> Sending message #1775
> Sending message #1776
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> du -sh activemq-data/
> 12M     activemq-data/
> root@psbu-jrr-lnx:#  
> ##
> ## Use GDB to see where the simple_producer is blocked
> ##
> psbu-jrr-lnx[SUSE10.1]:176>  ps auxw | egrep simple_producer.exe
> jrr      16974  1.3  0.6  53980  6724 pts/12   Sl+  17:58   0:00 simple_producer.exe
> jrr      16997  0.0  0.0   1864   660 pts/18   S+   17:59   0:00 /bin/grep -E simple_producer.exe
> psbu-jrr-lnx[SUSE10.1]:177>  
> psbu-jrr-lnx[SUSE10.1]:177>  gdb simple_producer.exe 
> GNU gdb 6.6
> Copyright (C) 2006 Free Software Foundation, Inc.
> GDB is free software, covered by the GNU General Public License, and you are
> welcome to change it and/or distribute copies of it under certain conditions.
> Type "show copying" to see the conditions.
> There is absolutely no warranty for GDB.  Type "show warranty" for details.
> This GDB was configured as "i586-suse-linux"...
> Using host libthread_db library "/lib/libthread_db.so.1".
> (gdb) set print pretty
> (gdb) set pagination off
> (gdb) attach 16974
> Attaching to program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> Reading symbols from /lib/librt.so.1...done.
> Loaded symbols for /lib/librt.so.1
> Reading symbols from /usr/BWhttpd/lib/libactivemq-cpp.so.14...done.
> Loaded symbols for /usr/BWhttpd/lib/libactivemq-cpp.so.14
> Reading symbols from /usr/BWhttpd/lib/libboost_thread.so.1.43.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libboost_thread.so.1.43.0
> Reading symbols from /usr/lib/libstdc++.so.6...done.
> Loaded symbols for /usr/lib/libstdc++.so.6
> Reading symbols from /lib/libm.so.6...done.
> Loaded symbols for /lib/libm.so.6
> Reading symbols from /lib/libc.so.6...done.
> Loaded symbols for /lib/libc.so.6
> Reading symbols from /lib/libgcc_s.so.1...done.
> Loaded symbols for /lib/libgcc_s.so.1
> Reading symbols from /lib/libpthread.so.0...done.
> [Thread debugging using libthread_db enabled]
> [New Thread -1221044560 (LWP 16974)]
> [New Thread -1254835296 (LWP 16981)]
> [New Thread -1246442592 (LWP 16978)]
> [New Thread -1238049888 (LWP 16977)]
> [New Thread -1229657184 (LWP 16976)]
> [New Thread -1221264480 (LWP 16975)]
> Loaded symbols for /lib/libpthread.so.0
> Reading symbols from /lib/ld-linux.so.2...done.
> Loaded symbols for /lib/ld-linux.so.2
> Reading symbols from /usr/BWhttpd/lib/libapr-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libapr-1.so.0
> Reading symbols from /lib/libuuid.so.1...done.
> Loaded symbols for /lib/libuuid.so.1
> Reading symbols from /lib/libcrypt.so.1...done.
> Loaded symbols for /lib/libcrypt.so.1
> Reading symbols from /usr/BWhttpd/lib/libaprutil-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libaprutil-1.so.0
> Reading symbols from /usr/BWhttpd/lib/libexpat.so.1...done.
> Loaded symbols for /usr/BWhttpd/lib/libexpat.so.1
> Reading symbols from /usr/lib/libssl.so.0.9.8...done.
> Loaded symbols for /usr/lib/libssl.so.0.9.8
> Reading symbols from /usr/lib/libcrypto.so.0.9.8...done.
> Loaded symbols for /usr/lib/libcrypto.so.0.9.8
> Reading symbols from /lib/libdl.so.2...done.
> Loaded symbols for /lib/libdl.so.2
> 0xffffe410 in __kernel_vsyscall ()
> (gdb) thread apply all where
> Thread 6 (Thread -1221264480 (LWP 16975)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b120) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806ae5c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806ae58) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806ae50) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806aff8) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806aff8) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 5 (Thread -1229657184 (LWP 16976)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b368) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806b1fc) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806b1f8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806b1f0) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806b240) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806b240) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 4 (Thread -1238049888 (LWP 16977)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806c718) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806c2ec) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806c2e8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806c2e0) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c5c0) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c5c0) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 3 (Thread -1246442592 (LWP 16978)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb765ba8b in std::exception::what () from /lib/libc.so.6
> #2  0xb7580b7a in apr_socket_recv (sock=0x80649e8, buf=0x8066c10 "", len=0xb5b4c040) at network_io/unix/sendrecv.c:81
> #3  0xb7d49ebd in decaf::internal::net::tcp::TcpSocket::read (this=0x8064898, buffer=0x8066c10 "", size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocket.cpp:649
> #4  0xb7d4d1c0 in decaf::internal::net::tcp::TcpSocketInputStream::doReadArrayBounded (this=0x8066c10, buffer=0x2000 <Address 0x2000 out of bounds>, size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocketInputStream.cpp:108
> #5  0xb7d91d1f in decaf::io::InputStream::doReadArray (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:138
> #6  0xb7d92333 in decaf::io::InputStream::read (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:72
> #7  0xb7d866ef in decaf::io::BufferedInputStream::bufferData (this=0x8066b60, inputStream=0x8066998, buffer=@0xb5b4c1f8) at decaf/io/BufferedInputStream.cpp:326
> #8  0xb7d86d18 in decaf::io::BufferedInputStream::doReadArrayBounded (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/BufferedInputStream.cpp:228
> #9  0xb7d92191 in decaf::io::InputStream::read (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/InputStream.cpp:84
> #10 0xb7d8a757 in decaf::io::DataInputStream::readAllData (this=0x80669b8, buffer=0x80669ca "", length=4) at decaf/io/DataInputStream.cpp:492
> #11 0xb7d8c684 in decaf::io::DataInputStream::readInt (this=0x80669b8) at decaf/io/DataInputStream.cpp:124
> #12 0xb7cae1b8 in activemq::wireformat::openwire::OpenWireFormat::unmarshal (this=0x8063e10, transport=0x8064790, dis=0x80669b8) at activemq/wireformat/openwire/OpenWireFormat.cpp:245
> #13 0xb7c2c9f7 in activemq::transport::IOTransport::run (this=0x8064790) at activemq/transport/IOTransport.cpp:246
> #14 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c088) at decaf/lang/Thread.cpp:137
> #15 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c088) at decaf/lang/Thread.cpp:190
> #16 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #17 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 2 (Thread -1254835296 (LWP 16981)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806cde8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806d24c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7c1fada in activemq::threads::CompositeTaskRunner::run (this=0x806d208) at activemq/threads/CompositeTaskRunner.cpp:115
> #6  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806cbc0) at decaf/lang/Thread.cpp:137
> #7  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806cbc0) at decaf/lang/Thread.cpp:190
> #8  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #9  0xb766aa4e in clone () from /lib/libc.so.6
> Thread 1 (Thread -1221044560 (LWP 16974)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806f1a8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806f13c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7de9395 in decaf::util::concurrent::CountDownLatch::await (this=0x806f134) at decaf/util/concurrent/CountDownLatch.cpp:53
> #6  0xb7c36025 in activemq::transport::correlator::FutureResponse::getResponse (this=0x806f130) at ./activemq/transport/correlator/FutureResponse.h:62
> #7  0xb7c33aa2 in activemq::transport::correlator::ResponseCorrelator::request (this=0x806b740, command=@0xbf9301cc) at activemq/transport/correlator/ResponseCorrelator.cpp:120
> #8  0xb7b649fa in activemq::core::ActiveMQConnection::syncRequest (this=0x806b858, command=@0xbf9301cc, timeout=0) at activemq/core/ActiveMQConnection.cpp:896
> #9  0xb7baf1d8 in activemq::core::ActiveMQSession::send (this=0x80635b8, message=0x806e640, producer=0x806e530, usage=0x0) at activemq/core/ActiveMQSession.cpp:921
> #10 0xb7ba1e71 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640, deliveryMode=1, priority=4, timeToLive=0) at activemq/core/ActiveMQProducer.cpp:211
> #11 0xb7ba2b07 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:152
> #12 0xb7ba3cab in activemq::core::ActiveMQProducer::send (this=0x806e530, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:128
> #13 0x0804c5ac in AMQ_Producer::send (this=0xbf9305c8, msg=@0xbf930600, type=@0xbf9305f8) at simple_producer.cpp:720
> #14 0x0804ca45 in run_test () at simple_producer.cpp:767
> #15 0x0804cd47 in main () at simple_producer.cpp:798
> #0  0xffffe410 in __kernel_vsyscall ()
> (gdb) detach
> Detaching from program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> (gdb) quit
> psbu-jrr-lnx[SUSE10.1]:178>  
> ##
> ## If we kill the program and execute it again, then this time it will
> ## terminate with the expected exception.
> ##
> 258(TEST)jrr@[SUSE10.1]>  simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622326f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Sending message #2
> ...
> ...
> ...
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622426f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> 259(TEST)jrr@[SUSE10.1]>  
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #2
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:#  /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K     /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#  
> ##
> ## Execute the test program, this time pass in variables to cause the sendTime
> ## to be set to 500ms. It creates a connection and then loops forever sending
> ## messages.  Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## Now at 1775 messages the send times out. Moreover, the test moves on trying
> ## more sends, and they all time out. After 10 failures the test exits. This
> ## sender that causes the broker memory limmit to be reached never gets a
> ## 'broker full' exception.
> ##
> 262(TEST)jrr@[SUSE10.1]>  env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> SEND_TO set to 500
> Sending message #2
> SEND_TO set to 500
> Sending message #3
> SEND_TO set to 500
> Sending message #4
> ...
> ...
> ...
> Sending message #1776
> Exception[1]: Error while sending message [No valid response received for command: Message { commandId = 8879, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882406582, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1776, check broker.]
> SEND_TO set to 500
> Sending message #1777
> ...
> ...
> ...
> Sending message #1785
> Exception[10]: Error while sending message [No valid response received for command: Message { commandId = 8924, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882411136, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1785, check broker.]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> ##
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> root@psbu-jrr-lnx:#  !du
> du -sh activemq-data/
> 12M     activemq-data/
> root@psbu-jrr-lnx:#  
> ##
> ## If we re-execute the test now that the broker is full, then this time it
> ## will terminate with the expected broker full exception.
> ##
> 263(TEST)jrr@[SUSE10.1]>  env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622756f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> SEND_TO set to 500
> ...
> ...
> ...
> SEND_TO set to 500
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001623156f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> 264(TEST)jrr@[SUSE10.1]>  
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {code}
> //////////////////////////////////////////////////////////////////////
> // Not so simple producer.
> //
> // The Active MQ Client's simple producer code has been modified to allow for
> // investigation into client behaviour. I've integrated the products engine for
> // obtaining a connection, session, queue, destination, sender and message.
> //
> // The code basically does the following:
> //       - create a connection to be used over and over for the test
> //       - loop forever
> //         - create a session, queue, destination, sender and message
> //         - send the message
> //         - destroy the message, sender, destination, topic and session.
> //
> // ----------------------------------------
> // ENVIRONMENT VARIABLE OVERRIDES
> // ----------------------------------------
> //
> // SEND_TO : default is 0. If non-zero then it appends connection.sendTimeout
> //           to the destination URI using the value set for SEND_TO. Refer to
> //           the connection options at
> //           http://activemq.apache.org/cms/configuring.html for details
> //           regarding its values.
> //
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/lang/Long.h>
> #include <decaf/util/Date.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> #include <memory>
> #include <boost/scoped_ptr.hpp>
> #include <boost/thread/thread.hpp>
> #include <boost/lexical_cast.hpp>
> using namespace activemq;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> //
> // Override values with environement values if they exist
> //
> template <class DEST>
> bool GetEnv (const std::string    var,
>              DEST                 &dst,
>              const DEST           &def_value)
> {
>     const char *val(::getenv(var.c_str()));
>     dst = def_value;
>     if (NULL == val) {
>         return (false);
>     }
>     try {
>         dst = boost::lexical_cast<DEST>(val);
>         printf("%s set to %s\n", var.c_str(), val);
>         return(true);
>     }
>     catch (std::exception    &e) {
>         printf("%s\n", e.what());
>         printf("Cannot convert '%s' to the desired output format\n", val);
>     }
>     return(false);
> }
> // ---------------------------------------------------------------------------
> // EventEngineUtil.hxx
> // ---------------------------------------------------------------------------
> class ee_except : public std::exception
> {
> public:
>     // ee_except(std::string)
>     //
>     //    Create an exception using the specified string
>     //
>     explicit ee_except (const std::string    &_msg) :
>         msg(_msg)
>         {};
>     explicit ee_except (const std::stringstream    &_msg) :
>         msg(_msg.str())
>         {};
>     // ee_except(std::string, errNum)
>     //
>     //    Create an excpetion using the specified string, and then append the
>     //    numeric and string representation of the specified errno
>     //
>     explicit ee_except (const std::string    &_msg,
>                         int                  errNum) {
>         ee_except_helper(_msg, errNum);
>     }
>     explicit ee_except (const std::stringstream    &_msg,
>                        int                         errNum) {
>         ee_except_helper(_msg.str(), errNum);
>     }
>     // destroy the exception
>     ~ee_except() throw() {};
>     // return the message, overloading the standard what method
>     virtual const char *what() const throw() {
>         return msg.c_str();
>     }
> protected:
>     void ee_except_helper (const std::string    &_msg,
>                            int                  errNum) {
>         std::stringstream    ss_error;
>         if (errNum) {
>             ss_error << _msg << ", errno: " << errNum
>                      << ", " << strerror(errNum);
>             msg = ss_error.str(); 
>         } else {
>             msg = _msg;
>         }
>     }
> private:
>     std::string msg;
> };
> // ---------------------------------------------------------------------------
> // From EventEngineAMQProducer
> // ---------------------------------------------------------------------------
> #include <boost/thread/thread.hpp>
> #include <boost/shared_ptr.hpp>
> #include <cms/DeliveryMode.h>
> #include <cms/Connection.h>
> #include <activemq/library/ActiveMQCPP.h>
> // CLASS: ConnSessionManager
> //
> //    Singleton class to manage Active MQ (AMQ) connections.
> //
> // getConnSession():
> //
> //    This method gets a connection for the specified broker, creates a session
> //    with the specified ackMode and populates the connSesn parameter with
> //    shared pointers to the alloated resources.
> //
> //    If a connection already exists for the brokerURI, it creates a session
> //    for it and returns them, if the connection does not exist it makes one
> //    attaches a session and returns them. If it cannot create the connection
> //    (such as the broker is not up), it throws an exception.
> //
> //    The connection manager can manage multiple connections based on the
> //    brokerURI. Each unique broker URI will be a different connection managed
> //    by the connection manager.
> //
> //
> class ConnSessionManager {
> public:
>     typedef boost::shared_ptr<cms::Connection>    P_SHR_CONN_T;
>     typedef boost::shared_ptr<cms::Session>       P_SHR_SESSION_T;
>     struct ConnSession {
>         P_SHR_CONN_T       p_conn;
>         P_SHR_SESSION_T    p_session;
>     };
>     static void getConnSession(ConnSession                    &connSesn,
>                                const std::string              &brokerURI,
>                                cms::Session::AcknowledgeMode  ackMode);
> protected:
>     // ----------------------------------------------------------------------
>     // P R O T E C T E D   M E M B E R   V A R I A B L E S
>     typedef std::map<const std::string, P_SHR_CONN_T>     CM_MAP_T;
>     CM_MAP_T               cm_map;
>     static boost::mutex    access_mtx;
>     // ----------------------------------------------------------------------
>     // P R O T E C T E D   M E M B E R   F U N C T I O N
>     ConnSessionManager();
>     ~ConnSessionManager();
>     static ConnSessionManager &Singleton(void);
>     void addConnection(ConnSession                    &connSesn,
>                        const std::string              &brokerURI,
>                        cms::Session::AcknowledgeMode  ackMode);
> };
> // CLASS: AMQ_Producer
> //
> //    Active MQ (AMQ) message producer.
> //
> // The constructor creates an AMQ Producer object that has the ability to send
> // messages to the specified DESTination, HOST and PORT.
> //
> // The send() method causes the message to be sent. While creating the message,
> // the send() method will apply the properites (if any) that have been set.
> //
> // The initProperty() method clears the list of properties that have been
> // established for this producer. 
> //
> // The setProperty() method sets an AMQ message property for the message to be
> // sent. A new property is added each time this command is invoked, unless it's
> // for an existing property, in which case the old property is overwritten with
> // the new property.
> //
> class AMQ_Producer {
> public:
>     AMQ_Producer(const std::string    &dest,
>                  const std::string    &host,
>                  const std::string    &port);
>     ~AMQ_Producer() {};
>     void send(const std::string    &msg,
>               const std::string    &type = "");
> protected:
>     const std::string    m_dest;
>     std::string          m_broker_uri;
>     std::map<const std::string, std::string>    propertyInfo;
>     static const cms::DeliveryMode::DELIVERY_MODE    m_delivery_mode;
> };
> // ---------------------------------------------------------------------------
> // ---------------------------------------------------------------------------
> #include <cms/ConnectionFactory.h>
> #include <boost/thread/once.hpp>
> // access control to the singleton.
> boost::mutex    ConnSessionManager::access_mtx;
> static boost::once_flag    init_flag = BOOST_ONCE_INIT;
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::ConnSessionManager
> **
> ** Function:    Create the connection manage object (singleton) and initialize
> **              the Active MQ library that we use, once.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::ConnSessionManager () :
>     cm_map()
> {
>     // only call the library initialization ONCE.
>     boost::call_once(activemq::library::ActiveMQCPP::initializeLibrary,
>                      init_flag);
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::~ConnSessionManager
> **
> ** Function:    Destroty the connection manager
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::~ConnSessionManager ()
> {
>     activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::Singleton
> **
> ** Function:    Return a referece to our singleton.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager &
> ConnSessionManager::Singleton (void)
> {
>     static ConnSessionManager    singleton;
>     return (singleton);
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::addConnection
> **
> ** Function:    Add, or re-add a connection to the connection manager.
> **              Create a session on the connection and return them.
> **
> **              An exception is thrown if we cannot get a connection.
> **
> ** Input Parms: connSesn  - ConnSession reference that is populated with a
> **                          connections shared pointer and session shared
> **                          pointer.
> **
> **              brokerURI - The Active MQ broker URI string to use to create
> **                          the connection.
> **
> **                  http://activemq.apache.org/cms/cms-api-overview.html
> **                  http://activemq.apache.org/cms/configuring.html
> **
> **              ackMode - The kind of acknowledgement we want the session to
> **                         have
> **
> **                  http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> ConnSessionManager::addConnection (ConnSessionManager::ConnSession  &connSesn,
>                                    const std::string                &brokerURI,
>                                    cms::Session::AcknowledgeMode    ackMode)
> {
>     CM_MAP_T           &map_ref = Singleton().cm_map;
>     P_SHR_CONN_T       &p_conn(connSesn.p_conn);
>     P_SHR_SESSION_T    &p_session(connSesn.p_session);
>     // Create a ConnectionFactory that we automatically dealloate itself
>     std::auto_ptr<cms::ConnectionFactory>
>         connectionFactory(cms::ConnectionFactory::
>                           createCMSConnectionFactory(brokerURI));
>     // Create the connection and attach to shared pointer
>     p_conn.reset(connectionFactory->createConnection());
>     // Start the connection
>     p_conn->start();
>     // Create the session
>     p_session.reset(p_conn->createSession(ackMode));
>     // Update the map
>     map_ref.erase(brokerURI);
>     map_ref[brokerURI] = p_conn;
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::getConnSession
> **
> ** Function:    Get a working Active MQ connection for the specified brokerURI
> **              and return it along with an allocated session or thrown an
> **              exception.
> **
> **              This finds the existing AMQ connection for the specified
> **              brokerURI, validates that it's still connected, creates a
> **              session on it and returns them. If the connection isn't valid
> **              anymore (i.e. the broker restarted) or if the connection
> **              doesn't exist yet, we create it.
> **
> **              If we cannot create the connection we throw an exception.
> **
> **              WHY DO WE ALLOCATE A SESSION TOO? Why don't we just get the
> **              conection and return it? It's possible that the connection was
> **              reset since the last time we used it (i.e the broker
> **              restarted). The only way we can tell this is if the
> **              createSession call fails. Since we have to create a session to
> **              prove that the connection works, lets use it. The caller would
> **              do it once it had a connection anyway!
> **
> **              Why don't we just return a shared pointer to the session, why
> **              do we also return a shared pointer to the connection? Sessions
> **              are built on top of connections. It would be BAD if the
> **              connection was deleted before the session. So we keep a shared
> **              pointer of both together to ensure that they have the same
> **              lifecycle.
> **
> ** Input Parms: connSesn  - ConnSession reference that is populated with a
> **                          connections shared pointer and session shared
> **                          pointer.
> **
> **              brokerURI - The Active MQ broker URI string to use to create
> **                          the connection.
> **
> **                  http://activemq.apache.org/cms/cms-api-overview.html
> **                  http://activemq.apache.org/cms/configuring.html
> **
> **              ackMode - The kind of acknowledgement we want the session to
> **                         have
> **
> **                  http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: P_SHR_CONN_T - A shared pointer to the connection to use
> **
> ****************************************************************************/
> void
> ConnSessionManager::getConnSession (ConnSessionManager::ConnSession &connSesn,
>                                     const std::string               &brokerURI,
>                                     cms::Session::AcknowledgeMode   ackMode)
> {
>     P_SHR_CONN_T          &p_conn(connSesn.p_conn);
>     P_SHR_SESSION_T       &p_session(connSesn.p_session);
>     ConnSessionManager    &me = Singleton();
>     CM_MAP_T              &map_ref = me.cm_map;
>     // restrict access
>     boost::lock_guard<boost::mutex>    lock(access_mtx);
>     // get the entry
>     p_conn = map_ref[brokerURI];
>     // if it's not allocated yet, then allocate it, attach it and return it
>     if (NULL == p_conn) {
>         printf("Creating connection for %s\n",
>                brokerURI.c_str());
>         return (me.addConnection(connSesn, brokerURI, ackMode));
>     }
>     // Check if the broker restarted and reset our connection. If it has,
>     // then create session will generate an exception and we know we then
>     // need to make a new connection. If it it works then use the session,
>     // passing it back in the connSession
>     try {
>         p_session.reset(p_conn->createSession(ackMode));
>     }
>     catch (std::exception    &e) {
>         printf("Re-creating connection for %s: %s\n",
>                brokerURI.c_str(), e.what());
>         return (me.addConnection(connSesn, brokerURI, ackMode));
>     }
> }
> // **********************************************************************
> // **********************************************************************
> //                               AMQ_Producer
> // **********************************************************************
> // **********************************************************************
> // m_delivery_mode
> //
> //     The producer's delivery mode is set to NON_PERSISTENT. When persistent
> //     it causes the broker to save the message to disk so that it is available
> //     even if the broker restarts. However, this has a price, it greatly slows
> //     down the send (220,475 microseconds with persistance, vs. 226
> //     microseconds without.) These measurements where taken WITH synchronous
> //     sends. We could have investigated and used async sends to speed up the
> //     send call and keep persistence, however, this wasn't deemed necessary.
> //
> //     http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_delivery_mode.html
> //
> //     http://activemq.apache.org/persistence-questions.html
> //
> //     http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
> //
> const cms::DeliveryMode::DELIVERY_MODE
> AMQ_Producer::m_delivery_mode = cms::DeliveryMode::NON_PERSISTENT;
> /****************************************************************************
> **
> ** Name:        AMQ_Producer::AMQ_Producer
> **
> ** Function:    Create the Active MQ Producer client object, storing the
> **              destination and creating the URI string from the provided host
> **              name and port number.
> **
> ** Input Parms: dest: The destination queue/topic to communicate with
> **
> **              host: The host to communicate with
> **
> **              port: The port on the host to communicate with
> **
> ** Return Parm: 
> **
> ****************************************************************************/
> AMQ_Producer::AMQ_Producer (const std::string    &dest,
>                             const std::string    &host,
>                             const std::string    &port) :
>     m_dest(dest),
>     m_broker_uri("tcp://" + host + ":" + port +
>                  "?connection.alwaysSyncSend=true")
> {
>     // add the send timeout if its environemtn varialbe is given and its a
>     // value other than the default, zero.
>     int    sendTimeout(0);
>     GetEnv("SEND_TO", sendTimeout, 0);
>     if (sendTimeout) {
>         std::stringstream    ss_sto;
>         ss_sto << "&connection.sendTimeout=" << sendTimeout;
>         m_broker_uri += ss_sto.str();
>     }
> }
> /****************************************************************************
> **
> ** Name:        AMQ_Producer::send
> **
> ** Function:    Cause the message to be sent by this producer to the
> **              established message queue, host and port.
> **
> **              This gets a cached (or if needed new) connection which has a
> **              session created for it, it then creates the destination,
> **              producer and message. Finally it adds all of the properties
> **              that we want to the message and sends it.
> **
> **              This model is best explained at the Active MQ CPP example page
> **              at: http://activemq.apache.org/cms/cms-api-overview.html
> **
> ** Input Parms: msg - The message to send with this producer
> **
> **              type - (optional) The CMSType/JMSType to set the message to. 
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> AMQ_Producer::send (const std::string    &msg,
>                     const std::string    &type)
> {
>     std::string    s_where;
>     try {
>         ConnSessionManager::ConnSession    conSesn;
>         // Create the Connection + Session
>         s_where = "creating connection/session";
>         ConnSessionManager::getConnSession(conSesn, m_broker_uri,
>                                            cms::Session::AUTO_ACKNOWLEDGE);
>         // Create the Destination (queue/topic)
>         s_where = "creating destination";
>         boost::shared_ptr<cms::Destination>
>             p_destination(conSesn.p_session->createQueue(m_dest));
>         // Create the Producer
>         s_where = "creating message producer";
>         boost::shared_ptr<cms::MessageProducer>
>             p_producer(conSesn.p_session->createProducer(p_destination.get()));
>         p_producer->setDeliveryMode(m_delivery_mode);
>         // Create the Message
>         s_where = "creating connection";
>         boost::shared_ptr<cms::TextMessage>
>             p_msg(conSesn.p_session->createTextMessage(msg));
>         // Set the Message's JMSType/CMSType
>         if (!type.empty()) {
>             p_msg->setCMSType(type);
>         }
>         std::map<const std::string, std::string>::iterator it;
>         // run through all properties, adding them to the message
>         for (it = propertyInfo.begin(); it != propertyInfo.end(); ++it) {
>             s_where = "adding property " + (*it).first;
>             p_msg->setStringProperty((*it).first, (*it).second);
>         }
>         // Send it!
>         s_where = "sending message";
>         p_producer->send(p_msg.get());
>     }
>     catch (std::exception    &e) {
>         std::stringstream    ss_err;
>         ss_err << "Error while " << s_where << " [" << e.what() << "]";
>         throw ee_except(ss_err);
>     }
> }
> // ---------------------------------------------------------------------------
> const std::string    base_msg("1024 characters folowed by a message.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxHello world! ");
> void run_test ()
> {
>     std::string     host("127.0.0.1");
>     std::string     port("61616");
>     std::string     destURI("c.c.p.v.ms.events");
>     unsigned int    iteration(0);
>     unsigned int    ex_count(0);
>     // Loop, creating producers and sending the message
>     while (1) {
>         iteration++;
>         AMQ_Producer    producer(destURI, host, port);
>         std::stringstream    msg;
>         msg << base_msg << iteration;
>         printf("Sending message #%d\n", iteration);
>         try {
>             producer.send(msg.str(), "TEST_TYPE");
>         }
>         catch (std::exception    &e) {
>             ex_count++;
>             printf("\n\nException[%d]: %s\n", ex_count, e.what());
>             if (ex_count >= 10 ) {
>                 printf("\n\nException limit (%d) reached. Stopping test\n\n\n",
>                        ex_count);
>                 break;
>             }
>         }
>     }
> }
> ///////////////////////////////////////////////////////////////////////////////
> int main(int, char*) {
>     activemq::library::ActiveMQCPP::initializeLibrary();
>     std::cout << "=====================================================\n";
>     std::cout << "Starting the example:" << std::endl;
>     std::cout << "-----------------------------------------------------\n";
>     run_test();   
>     std::cout << "-----------------------------------------------------\n";
>     std::cout << "Finished with the example." << std::endl;
>     std::cout << "=====================================================\n";
>     activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

Mime
View raw message