activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Rocha (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQCPP-413) Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control.
Date Wed, 11 Jul 2012 18:29:37 GMT

    [ https://issues.apache.org/jira/browse/AMQCPP-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13411817#comment-13411817 ] 

John Rocha commented on AMQCPP-413:
-----------------------------------

Simple consumer attachment added, as per comments above.
                
> Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control.
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQCPP-413
>                 URL: https://issues.apache.org/jira/browse/AMQCPP-413
>             Project: ActiveMQ C++ Client
>          Issue Type: Bug
>          Components: CMS Impl
>    Affects Versions: 3.4.0, 3.4.4
>         Environment: Linux
>            Reporter: John Rocha
>            Assignee: Timothy Bish
>            Priority: Critical
>              Labels: cpp, producer_flow_control
>         Attachments: simple_consumer.cpp
>
>
> Producer connection that cause the broker to reach its memory/disk limits
> doesn't get the 'all full' exception even though the broker is configured to
> send it for Producer Flow Control.
> +Scenario #1+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the default sendTimeot of zero(0), it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will lock up and never recovers.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception.
> +Scenario #2+
> # Delete the broker data directory\\
> \\
> # Start the broker, that sends an exception if no space\\
> \\
> # DO *+NOT+* START A CONUMSER.\\
> \\
> # Run a producer that does synchronous sends, and has the sendTimeot of 500 ms, it uses one connection, and enters a loop that just sends messages.\\
> \\
> After awhile the producer will cause the broker to reach it's limit. And then the send method will start timing out. It never gets a 'broker full' exception.\\
> \\
> # Start another producer in another window.\\
> \\
> It immediate fails with a 'broker full' exception
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #1
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:#  /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K     /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#  
> ##
> ## View the activemq.xml configuration file used for startint active MQ
> ##
> root@psbu-jrr-lnx:#  cat /usr/BWhttpd/tomcat/webapps/amqbroker/WEB-INF/classes/conf/activemq.xml
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
>   xmlns:amq="http://activemq.apache.org/schema/core"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
>   http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
>   http://activemq.apache.org/schema/core 
>   http://activemq.apache.org/schema/core/activemq-core-5.3.2.xsd"
>   default-autowire="byName">
>   <!-- Allows to use system properties as variables in this configuration file -->
>     <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>     </bean>
>     <broker xmlns="http://activemq.apache.org/schema/core"
>             brokerName="localhost"
>             advisorySupport="true"
>             dataDirectory="${catalina.home}/activemq-data"
>             useJmx="false"
>             useShutdownHook="false">
>         <!-- Destination specific policies using destination names or wildcards
>         -->
>         <destinationPolicy>
>             <policyMap>
>                 <policyEntries>
>                     <policyEntry queue=">" memoryLimit="5mb" />
>                     <policyEntry topic=">" memoryLimit="5mb" />
>                 </policyEntries>
>             </policyMap>
>         </destinationPolicy>
>         <systemUsage>
>             <systemUsage sendFailIfNoSpace="true">
>                 <memoryUsage>
>                     <memoryUsage limit="5 mb" />
>                 </memoryUsage>
>                 <storeUsage>
>                     <storeUsage limit="4 mb" />
>                 </storeUsage>
>                 <tempUsage>
>                     <tempUsage limit="1 mb" />
>                 </tempUsage>
>                 </systemUsage>
>         </systemUsage>
>         <!-- The transport connectors ActiveMQ will listen to -->
>         <transportConnectors>
>             <transportConnector name="tcp"
>              uri="tcp://0.0.0.0:61616?wireFormat.maxInactivityDuration=0" />
>         </transportConnectors>
>     </broker>
> </beans>
> ##
> ## View the runtime environment to validate the library is 3.4.4, the latests.
> ## I cannot explain why the number is 14.0.4, but I observed that 3.4.0 used
> ## 14.0.0
> ##
> 242(TEST)jrr@[SUSE10.1]>  ls $LD_LIBRARY_PATH/libactive*
> /usr/BWhttpd/lib//libactivemq-cpp.so*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14*
> /usr/BWhttpd/lib//libactivemq-cpp.so.14.0.4*
> 243(TEST)jrr@[SUSE10.1]>  
> ##
> ## Compile the simple producer
> ##
> Compiling simple_producer.o
> g++ -g -c -MD -Wall -Werror -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/activemq-cpp/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/apr-1  simple_producer.cpp
> g++ -o simple_producer.exe simple_producer.o -lc -lrt \
>     -lactivemq-cpp \
>     -lboost_thread \
>     -L /usr/BWhttpd/lib
> Compilation finished at Mon Jul  9 15:32:48
> ##
> ## Execute the test program. It creates a connection and then loops forever
> ## sending messages.  Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## It locks up at 1775 messages
> ##
> 257(TEST)jrr@[SUSE10.1]>  simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Sending message #2
> Sending message #3
> Sending message #4
> Sending message #5
> ...
> ...
> ...
> Sending message #1772
> Sending message #1773
> Sending message #1774
> Sending message #1775
> Sending message #1776
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> du -sh activemq-data/
> 12M     activemq-data/
> root@psbu-jrr-lnx:#  
> ##
> ## Use GDB to see where the simple_producer is blocked
> ##
> psbu-jrr-lnx[SUSE10.1]:176>  ps auxw | egrep simple_producer.exe
> jrr      16974  1.3  0.6  53980  6724 pts/12   Sl+  17:58   0:00 simple_producer.exe
> jrr      16997  0.0  0.0   1864   660 pts/18   S+   17:59   0:00 /bin/grep -E simple_producer.exe
> psbu-jrr-lnx[SUSE10.1]:177>  
> psbu-jrr-lnx[SUSE10.1]:177>  gdb simple_producer.exe 
> GNU gdb 6.6
> Copyright (C) 2006 Free Software Foundation, Inc.
> GDB is free software, covered by the GNU General Public License, and you are
> welcome to change it and/or distribute copies of it under certain conditions.
> Type "show copying" to see the conditions.
> There is absolutely no warranty for GDB.  Type "show warranty" for details.
> This GDB was configured as "i586-suse-linux"...
> Using host libthread_db library "/lib/libthread_db.so.1".
> (gdb) set print pretty
> (gdb) set pagination off
> (gdb) attach 16974
> Attaching to program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> Reading symbols from /lib/librt.so.1...done.
> Loaded symbols for /lib/librt.so.1
> Reading symbols from /usr/BWhttpd/lib/libactivemq-cpp.so.14...done.
> Loaded symbols for /usr/BWhttpd/lib/libactivemq-cpp.so.14
> Reading symbols from /usr/BWhttpd/lib/libboost_thread.so.1.43.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libboost_thread.so.1.43.0
> Reading symbols from /usr/lib/libstdc++.so.6...done.
> Loaded symbols for /usr/lib/libstdc++.so.6
> Reading symbols from /lib/libm.so.6...done.
> Loaded symbols for /lib/libm.so.6
> Reading symbols from /lib/libc.so.6...done.
> Loaded symbols for /lib/libc.so.6
> Reading symbols from /lib/libgcc_s.so.1...done.
> Loaded symbols for /lib/libgcc_s.so.1
> Reading symbols from /lib/libpthread.so.0...done.
> [Thread debugging using libthread_db enabled]
> [New Thread -1221044560 (LWP 16974)]
> [New Thread -1254835296 (LWP 16981)]
> [New Thread -1246442592 (LWP 16978)]
> [New Thread -1238049888 (LWP 16977)]
> [New Thread -1229657184 (LWP 16976)]
> [New Thread -1221264480 (LWP 16975)]
> Loaded symbols for /lib/libpthread.so.0
> Reading symbols from /lib/ld-linux.so.2...done.
> Loaded symbols for /lib/ld-linux.so.2
> Reading symbols from /usr/BWhttpd/lib/libapr-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libapr-1.so.0
> Reading symbols from /lib/libuuid.so.1...done.
> Loaded symbols for /lib/libuuid.so.1
> Reading symbols from /lib/libcrypt.so.1...done.
> Loaded symbols for /lib/libcrypt.so.1
> Reading symbols from /usr/BWhttpd/lib/libaprutil-1.so.0...done.
> Loaded symbols for /usr/BWhttpd/lib/libaprutil-1.so.0
> Reading symbols from /usr/BWhttpd/lib/libexpat.so.1...done.
> Loaded symbols for /usr/BWhttpd/lib/libexpat.so.1
> Reading symbols from /usr/lib/libssl.so.0.9.8...done.
> Loaded symbols for /usr/lib/libssl.so.0.9.8
> Reading symbols from /usr/lib/libcrypto.so.0.9.8...done.
> Loaded symbols for /usr/lib/libcrypto.so.0.9.8
> Reading symbols from /lib/libdl.so.2...done.
> Loaded symbols for /lib/libdl.so.2
> 0xffffe410 in __kernel_vsyscall ()
> (gdb) thread apply all where
> Thread 6 (Thread -1221264480 (LWP 16975)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b120) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806ae5c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806ae58) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806ae50) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806aff8) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806aff8) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 5 (Thread -1229657184 (LWP 16976)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806b368) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806b1fc) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806b1f8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806b1f0) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806b240) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806b240) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 4 (Thread -1238049888 (LWP 16977)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806c718) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806c2ec) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::wait (this=0x806c2e8) at decaf/internal/util/concurrent/SynchronizableImpl.cpp:48
> #6  0xb7de6ba9 in decaf::util::TimerImpl::run (this=0x806c2e0) at decaf/util/Timer.cpp:81
> #7  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c5c0) at decaf/lang/Thread.cpp:137
> #8  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c5c0) at decaf/lang/Thread.cpp:190
> #9  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #10 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 3 (Thread -1246442592 (LWP 16978)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb765ba8b in std::exception::what () from /lib/libc.so.6
> #2  0xb7580b7a in apr_socket_recv (sock=0x80649e8, buf=0x8066c10 "", len=0xb5b4c040) at network_io/unix/sendrecv.c:81
> #3  0xb7d49ebd in decaf::internal::net::tcp::TcpSocket::read (this=0x8064898, buffer=0x8066c10 "", size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocket.cpp:649
> #4  0xb7d4d1c0 in decaf::internal::net::tcp::TcpSocketInputStream::doReadArrayBounded (this=0x8066c10, buffer=0x2000 <Address 0x2000 out of bounds>, size=8192, offset=0, length=8192) at decaf/internal/net/tcp/TcpSocketInputStream.cpp:108
> #5  0xb7d91d1f in decaf::io::InputStream::doReadArray (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:138
> #6  0xb7d92333 in decaf::io::InputStream::read (this=0x8066998, buffer=0x8066c10 "", size=8192) at decaf/io/InputStream.cpp:72
> #7  0xb7d866ef in decaf::io::BufferedInputStream::bufferData (this=0x8066b60, inputStream=0x8066998, buffer=@0xb5b4c1f8) at decaf/io/BufferedInputStream.cpp:326
> #8  0xb7d86d18 in decaf::io::BufferedInputStream::doReadArrayBounded (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/BufferedInputStream.cpp:228
> #9  0xb7d92191 in decaf::io::InputStream::read (this=0x8066b60, buffer=0x80669ca "", size=4, offset=0, length=4) at decaf/io/InputStream.cpp:84
> #10 0xb7d8a757 in decaf::io::DataInputStream::readAllData (this=0x80669b8, buffer=0x80669ca "", length=4) at decaf/io/DataInputStream.cpp:492
> #11 0xb7d8c684 in decaf::io::DataInputStream::readInt (this=0x80669b8) at decaf/io/DataInputStream.cpp:124
> #12 0xb7cae1b8 in activemq::wireformat::openwire::OpenWireFormat::unmarshal (this=0x8063e10, transport=0x8064790, dis=0x80669b8) at activemq/wireformat/openwire/OpenWireFormat.cpp:245
> #13 0xb7c2c9f7 in activemq::transport::IOTransport::run (this=0x8064790) at activemq/transport/IOTransport.cpp:246
> #14 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806c088) at decaf/lang/Thread.cpp:137
> #15 0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806c088) at decaf/lang/Thread.cpp:190
> #16 0xb75912ab in start_thread () from /lib/libpthread.so.0
> #17 0xb766aa4e in clone () from /lib/libc.so.6
> Thread 2 (Thread -1254835296 (LWP 16981)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806cde8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806d24c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7c1fada in activemq::threads::CompositeTaskRunner::run (this=0x806d208) at activemq/threads/CompositeTaskRunner.cpp:115
> #6  0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties=0x806cbc0) at decaf/lang/Thread.cpp:137
> #7  0xb7daa47c in (anonymous namespace)::threadWorker (arg=0x806cbc0) at decaf/lang/Thread.cpp:190
> #8  0xb75912ab in start_thread () from /lib/libpthread.so.0
> #9  0xb766aa4e in clone () from /lib/libc.so.6
> Thread 1 (Thread -1221044560 (LWP 16974)):
> #0  0xffffe410 in __kernel_vsyscall ()
> #1  0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.so.0
> #2  0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6
> #3  0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x806f1a8) at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #4  0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=0x806f13c) at decaf/util/concurrent/Mutex.cpp:126
> #5  0xb7de9395 in decaf::util::concurrent::CountDownLatch::await (this=0x806f134) at decaf/util/concurrent/CountDownLatch.cpp:53
> #6  0xb7c36025 in activemq::transport::correlator::FutureResponse::getResponse (this=0x806f130) at ./activemq/transport/correlator/FutureResponse.h:62
> #7  0xb7c33aa2 in activemq::transport::correlator::ResponseCorrelator::request (this=0x806b740, command=@0xbf9301cc) at activemq/transport/correlator/ResponseCorrelator.cpp:120
> #8  0xb7b649fa in activemq::core::ActiveMQConnection::syncRequest (this=0x806b858, command=@0xbf9301cc, timeout=0) at activemq/core/ActiveMQConnection.cpp:896
> #9  0xb7baf1d8 in activemq::core::ActiveMQSession::send (this=0x80635b8, message=0x806e640, producer=0x806e530, usage=0x0) at activemq/core/ActiveMQSession.cpp:921
> #10 0xb7ba1e71 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640, deliveryMode=1, priority=4, timeToLive=0) at activemq/core/ActiveMQProducer.cpp:211
> #11 0xb7ba2b07 in activemq::core::ActiveMQProducer::send (this=0x806e530, destination=0x806e3ec, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:152
> #12 0xb7ba3cab in activemq::core::ActiveMQProducer::send (this=0x806e530, message=0x806e640) at activemq/core/ActiveMQProducer.cpp:128
> #13 0x0804c5ac in AMQ_Producer::send (this=0xbf9305c8, msg=@0xbf930600, type=@0xbf9305f8) at simple_producer.cpp:720
> #14 0x0804ca45 in run_test () at simple_producer.cpp:767
> #15 0x0804cd47 in main () at simple_producer.cpp:798
> #0  0xffffe410 in __kernel_vsyscall ()
> (gdb) detach
> Detaching from program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 16974
> (gdb) quit
> psbu-jrr-lnx[SUSE10.1]:178>  
> ##
> ## If we kill the program and execute it again, then this time it will
> ## terminate with the expected exception.
> ##
> 258(TEST)jrr@[SUSE10.1]>  simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622326f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Sending message #2
> ...
> ...
> ...
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622426f 1048576). Stopping producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> 259(TEST)jrr@[SUSE10.1]>  
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {noformat}
> ######################################################################
> ## SCENARIO #2
> ######################################################################
> ##
> ## Restart our tomcat service which restarts the AMQ Broker and view the
> ## directory size.
> ##
> root@psbu-jrr-lnx:#  /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWhttpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/BWhttpd/tomcat/activemq-data
> Stopping tomcat ... done
> Killing tomcat ... done.
> Starting tomcat ... done
> 44K     /usr/BWhttpd/tomcat/activemq-data
> root@psbu-jrr-lnx:#  
> ##
> ## Execute the test program, this time pass in variables to cause the sendTime
> ## to be set to 500ms. It creates a connection and then loops forever sending
> ## messages.  Where for each message it creates sessions, destiation,
> ## producer,etc. sends the message and then destructs all of those pieces.
> ##
> ## Now at 1775 messages the send times out. Moreover, the test moves on trying
> ## more sends, and they all time out. After 10 failures the test exits. This
> ## sender that causes the broker memory limmit to be reached never gets a
> ## 'broker full' exception.
> ##
> 262(TEST)jrr@[SUSE10.1]>  env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> SEND_TO set to 500
> Sending message #2
> SEND_TO set to 500
> Sending message #3
> SEND_TO set to 500
> Sending message #4
> ...
> ...
> ...
> Sending message #1776
> Exception[1]: Error while sending message [No valid response received for command: Message { commandId = 8879, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882406582, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1776, check broker.]
> SEND_TO set to 500
> Sending message #1777
> ...
> ...
> ...
> Sending message #1785
> Exception[10]: Error while sending message [No valid response received for command: Message { commandId = 8924, responseRequired = true, ProducerId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0, Destination = queue://c.c.p.v.ms.events, TransactionId = NULL, OriginalDestination = NULL, MessageId = ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0:0:0, OriginalTransactionId = NULL, GroupID = , GroupSequence = 0, CorrelationId = , Persistent = false, Expiration = 0, Priority = 4, ReplyTo = NULL, Timestamp = 1341882411136, Type = TEST_TYPE, Content = [size=1045], MarshalledProperties = NULL, DataStructure = NULL, TargetConsumerId = NULL, Compressed = false, RedeliveryCounter = 0, BrokerPath = NULL, Arrival = 0, UserID = , RecievedByDFBridge = false, Droppable = false, Cluster = NULL, BrokerInTime = 0, BrokerOutTime = 0 }Text = 1024 characters folowed by a message.xxxxxxxx... world! 1785, check broker.]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> ##
> ##
> ## View the ActiveMQ disk usage during the lock up
> ##
> root@psbu-jrr-lnx:#  !du
> du -sh activemq-data/
> 12M     activemq-data/
> root@psbu-jrr-lnx:#  
> ##
> ## If we re-execute the test now that the broker is full, then this time it
> ## will terminate with the expected broker full exception.
> ##
> 263(TEST)jrr@[SUSE10.1]>  env SEND_TO=500 simple_producer.exe
> =====================================================
> Starting the example:
> -----------------------------------------------------
> SEND_TO set to 500
> Sending message #1
> Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend=true&connection.sendTimeout=500
> Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001622756f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:0:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> SEND_TO set to 500
> ...
> ...
> ...
> SEND_TO set to 500
> Sending message #10
> Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK TRACE ***
> Message: Usage Manager Temp Store is Full (01001623156f 1048576). Stopping producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:9:0) to prevent flooding queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-control.html for more info
> Exception Class javax.jms.ResourceAllocationException
>         [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.activemq.broker.region.BaseDestination.waitForSpace
>         [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.broker.region.Queue.checkUsage
>         [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.broker.region.Queue.doMessageSend
>         [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.broker.region.Queue.send
>         [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.activemq.broker.region.AbstractRegion.send
>         [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.activemq.broker.region.RegionBroker.send
>         [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.activemq.broker.BrokerFilter.send
>         [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: org.apache.activemq.broker.CompositeDestinationBroker.send
>         [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache.activemq.broker.TransactionBroker.send
>         [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apache.activemq.broker.MutableBrokerFilter.send
>         [FILE: TransportConnection.java, LINE: 458] occurred in: org.apache.activemq.broker.TransportConnection.processMessage
>         [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.activemq.command.ActiveMQMessage.visit
>         [FILE: TransportConnection.java, LINE: 306] occurred in: org.apache.activemq.broker.TransportConnection.service
>         [FILE: TransportConnection.java, LINE: 179] occurred in: org.apache.activemq.broker.TransportConnection$1.onCommand
>         [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.activemq.transport.TransportFilter.onCommand
>         [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apache.activemq.transport.WireFormatNegotiator.onCommand
>         [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache.activemq.transport.InactivityMonitor.onCommand
>         [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.activemq.transport.TransportSupport.doConsume
>         [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.activemq.transport.tcp.TcpTransport.doRun
>         [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.activemq.transport.tcp.TcpTransport.run
>         [FILE: , LINE: -1] occurred in: java.lang.Thread.run
> *** END SERVER-SIDE STACK TRACE ***]
> Exception limit (10) reached. Stopping test
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> terminate called after throwing an instance of 'decaf::lang::exceptions::RuntimeException'
>   what():  Unlock Failed, this thread is not the Lock Owner!
> Abort
> 264(TEST)jrr@[SUSE10.1]>  
> {noformat}
> \\
> \\
> ----
> \\
> \\
> {code}
> //////////////////////////////////////////////////////////////////////
> // Not so simple producer.
> //
> // The Active MQ Client's simple producer code has been modified to allow for
> // investigation into client behaviour. I've integrated the products engine for
> // obtaining a connection, session, queue, destination, sender and message.
> //
> // The code basically does the following:
> //       - create a connection to be used over and over for the test
> //       - loop forever
> //         - create a session, queue, destination, sender and message
> //         - send the message
> //         - destroy the message, sender, destination, topic and session.
> //
> // ----------------------------------------
> // ENVIRONMENT VARIABLE OVERRIDES
> // ----------------------------------------
> //
> // SEND_TO : default is 0. If non-zero then it appends connection.sendTimeout
> //           to the destination URI using the value set for SEND_TO. Refer to
> //           the connection options at
> //           http://activemq.apache.org/cms/configuring.html for details
> //           regarding its values.
> //
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <decaf/lang/Long.h>
> #include <decaf/util/Date.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Config.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> #include <memory>
> #include <boost/scoped_ptr.hpp>
> #include <boost/thread/thread.hpp>
> #include <boost/lexical_cast.hpp>
> using namespace activemq;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> //
> // Override values with environement values if they exist
> //
> template <class DEST>
> bool GetEnv (const std::string    var,
>              DEST                 &dst,
>              const DEST           &def_value)
> {
>     const char *val(::getenv(var.c_str()));
>     dst = def_value;
>     if (NULL == val) {
>         return (false);
>     }
>     try {
>         dst = boost::lexical_cast<DEST>(val);
>         printf("%s set to %s\n", var.c_str(), val);
>         return(true);
>     }
>     catch (std::exception    &e) {
>         printf("%s\n", e.what());
>         printf("Cannot convert '%s' to the desired output format\n", val);
>     }
>     return(false);
> }
> // ---------------------------------------------------------------------------
> // EventEngineUtil.hxx
> // ---------------------------------------------------------------------------
> class ee_except : public std::exception
> {
> public:
>     // ee_except(std::string)
>     //
>     //    Create an exception using the specified string
>     //
>     explicit ee_except (const std::string    &_msg) :
>         msg(_msg)
>         {};
>     explicit ee_except (const std::stringstream    &_msg) :
>         msg(_msg.str())
>         {};
>     // ee_except(std::string, errNum)
>     //
>     //    Create an excpetion using the specified string, and then append the
>     //    numeric and string representation of the specified errno
>     //
>     explicit ee_except (const std::string    &_msg,
>                         int                  errNum) {
>         ee_except_helper(_msg, errNum);
>     }
>     explicit ee_except (const std::stringstream    &_msg,
>                        int                         errNum) {
>         ee_except_helper(_msg.str(), errNum);
>     }
>     // destroy the exception
>     ~ee_except() throw() {};
>     // return the message, overloading the standard what method
>     virtual const char *what() const throw() {
>         return msg.c_str();
>     }
> protected:
>     void ee_except_helper (const std::string    &_msg,
>                            int                  errNum) {
>         std::stringstream    ss_error;
>         if (errNum) {
>             ss_error << _msg << ", errno: " << errNum
>                      << ", " << strerror(errNum);
>             msg = ss_error.str(); 
>         } else {
>             msg = _msg;
>         }
>     }
> private:
>     std::string msg;
> };
> // ---------------------------------------------------------------------------
> // From EventEngineAMQProducer
> // ---------------------------------------------------------------------------
> #include <boost/thread/thread.hpp>
> #include <boost/shared_ptr.hpp>
> #include <cms/DeliveryMode.h>
> #include <cms/Connection.h>
> #include <activemq/library/ActiveMQCPP.h>
> // CLASS: ConnSessionManager
> //
> //    Singleton class to manage Active MQ (AMQ) connections.
> //
> // getConnSession():
> //
> //    This method gets a connection for the specified broker, creates a session
> //    with the specified ackMode and populates the connSesn parameter with
> //    shared pointers to the alloated resources.
> //
> //    If a connection already exists for the brokerURI, it creates a session
> //    for it and returns them, if the connection does not exist it makes one
> //    attaches a session and returns them. If it cannot create the connection
> //    (such as the broker is not up), it throws an exception.
> //
> //    The connection manager can manage multiple connections based on the
> //    brokerURI. Each unique broker URI will be a different connection managed
> //    by the connection manager.
> //
> //
> class ConnSessionManager {
> public:
>     typedef boost::shared_ptr<cms::Connection>    P_SHR_CONN_T;
>     typedef boost::shared_ptr<cms::Session>       P_SHR_SESSION_T;
>     struct ConnSession {
>         P_SHR_CONN_T       p_conn;
>         P_SHR_SESSION_T    p_session;
>     };
>     static void getConnSession(ConnSession                    &connSesn,
>                                const std::string              &brokerURI,
>                                cms::Session::AcknowledgeMode  ackMode);
> protected:
>     // ----------------------------------------------------------------------
>     // P R O T E C T E D   M E M B E R   V A R I A B L E S
>     typedef std::map<const std::string, P_SHR_CONN_T>     CM_MAP_T;
>     CM_MAP_T               cm_map;
>     static boost::mutex    access_mtx;
>     // ----------------------------------------------------------------------
>     // P R O T E C T E D   M E M B E R   F U N C T I O N
>     ConnSessionManager();
>     ~ConnSessionManager();
>     static ConnSessionManager &Singleton(void);
>     void addConnection(ConnSession                    &connSesn,
>                        const std::string              &brokerURI,
>                        cms::Session::AcknowledgeMode  ackMode);
> };
> // CLASS: AMQ_Producer
> //
> //    Active MQ (AMQ) message producer.
> //
> // The constructor creates an AMQ Producer object that has the ability to send
> // messages to the specified DESTination, HOST and PORT.
> //
> // The send() method causes the message to be sent. While creating the message,
> // the send() method will apply the properites (if any) that have been set.
> //
> // The initProperty() method clears the list of properties that have been
> // established for this producer. 
> //
> // The setProperty() method sets an AMQ message property for the message to be
> // sent. A new property is added each time this command is invoked, unless it's
> // for an existing property, in which case the old property is overwritten with
> // the new property.
> //
> class AMQ_Producer {
> public:
>     AMQ_Producer(const std::string    &dest,
>                  const std::string    &host,
>                  const std::string    &port);
>     ~AMQ_Producer() {};
>     void send(const std::string    &msg,
>               const std::string    &type = "");
> protected:
>     const std::string    m_dest;
>     std::string          m_broker_uri;
>     std::map<const std::string, std::string>    propertyInfo;
>     static const cms::DeliveryMode::DELIVERY_MODE    m_delivery_mode;
> };
> // ---------------------------------------------------------------------------
> // ---------------------------------------------------------------------------
> #include <cms/ConnectionFactory.h>
> #include <boost/thread/once.hpp>
> // access control to the singleton.
> boost::mutex    ConnSessionManager::access_mtx;
> static boost::once_flag    init_flag = BOOST_ONCE_INIT;
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::ConnSessionManager
> **
> ** Function:    Create the connection manage object (singleton) and initialize
> **              the Active MQ library that we use, once.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::ConnSessionManager () :
>     cm_map()
> {
>     // only call the library initialization ONCE.
>     boost::call_once(activemq::library::ActiveMQCPP::initializeLibrary,
>                      init_flag);
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::~ConnSessionManager
> **
> ** Function:    Destroty the connection manager
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager::~ConnSessionManager ()
> {
>     activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::Singleton
> **
> ** Function:    Return a referece to our singleton.
> **
> ** Input Parms: None
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> ConnSessionManager &
> ConnSessionManager::Singleton (void)
> {
>     static ConnSessionManager    singleton;
>     return (singleton);
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::addConnection
> **
> ** Function:    Add, or re-add a connection to the connection manager.
> **              Create a session on the connection and return them.
> **
> **              An exception is thrown if we cannot get a connection.
> **
> ** Input Parms: connSesn  - ConnSession reference that is populated with a
> **                          connections shared pointer and session shared
> **                          pointer.
> **
> **              brokerURI - The Active MQ broker URI string to use to create
> **                          the connection.
> **
> **                  http://activemq.apache.org/cms/cms-api-overview.html
> **                  http://activemq.apache.org/cms/configuring.html
> **
> **              ackMode - The kind of acknowledgement we want the session to
> **                         have
> **
> **                  http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> ConnSessionManager::addConnection (ConnSessionManager::ConnSession  &connSesn,
>                                    const std::string                &brokerURI,
>                                    cms::Session::AcknowledgeMode    ackMode)
> {
>     CM_MAP_T           &map_ref = Singleton().cm_map;
>     P_SHR_CONN_T       &p_conn(connSesn.p_conn);
>     P_SHR_SESSION_T    &p_session(connSesn.p_session);
>     // Create a ConnectionFactory that we automatically dealloate itself
>     std::auto_ptr<cms::ConnectionFactory>
>         connectionFactory(cms::ConnectionFactory::
>                           createCMSConnectionFactory(brokerURI));
>     // Create the connection and attach to shared pointer
>     p_conn.reset(connectionFactory->createConnection());
>     // Start the connection
>     p_conn->start();
>     // Create the session
>     p_session.reset(p_conn->createSession(ackMode));
>     // Update the map
>     map_ref.erase(brokerURI);
>     map_ref[brokerURI] = p_conn;
> }
> /****************************************************************************
> **
> ** Name:        ConnSessionManager::getConnSession
> **
> ** Function:    Get a working Active MQ connection for the specified brokerURI
> **              and return it along with an allocated session or thrown an
> **              exception.
> **
> **              This finds the existing AMQ connection for the specified
> **              brokerURI, validates that it's still connected, creates a
> **              session on it and returns them. If the connection isn't valid
> **              anymore (i.e. the broker restarted) or if the connection
> **              doesn't exist yet, we create it.
> **
> **              If we cannot create the connection we throw an exception.
> **
> **              WHY DO WE ALLOCATE A SESSION TOO? Why don't we just get the
> **              conection and return it? It's possible that the connection was
> **              reset since the last time we used it (i.e the broker
> **              restarted). The only way we can tell this is if the
> **              createSession call fails. Since we have to create a session to
> **              prove that the connection works, lets use it. The caller would
> **              do it once it had a connection anyway!
> **
> **              Why don't we just return a shared pointer to the session, why
> **              do we also return a shared pointer to the connection? Sessions
> **              are built on top of connections. It would be BAD if the
> **              connection was deleted before the session. So we keep a shared
> **              pointer of both together to ensure that they have the same
> **              lifecycle.
> **
> ** Input Parms: connSesn  - ConnSession reference that is populated with a
> **                          connections shared pointer and session shared
> **                          pointer.
> **
> **              brokerURI - The Active MQ broker URI string to use to create
> **                          the connection.
> **
> **                  http://activemq.apache.org/cms/cms-api-overview.html
> **                  http://activemq.apache.org/cms/configuring.html
> **
> **              ackMode - The kind of acknowledgement we want the session to
> **                         have
> **
> **                  http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185
> **
> **
> ** Return Parm: P_SHR_CONN_T - A shared pointer to the connection to use
> **
> ****************************************************************************/
> void
> ConnSessionManager::getConnSession (ConnSessionManager::ConnSession &connSesn,
>                                     const std::string               &brokerURI,
>                                     cms::Session::AcknowledgeMode   ackMode)
> {
>     P_SHR_CONN_T          &p_conn(connSesn.p_conn);
>     P_SHR_SESSION_T       &p_session(connSesn.p_session);
>     ConnSessionManager    &me = Singleton();
>     CM_MAP_T              &map_ref = me.cm_map;
>     // restrict access
>     boost::lock_guard<boost::mutex>    lock(access_mtx);
>     // get the entry
>     p_conn = map_ref[brokerURI];
>     // if it's not allocated yet, then allocate it, attach it and return it
>     if (NULL == p_conn) {
>         printf("Creating connection for %s\n",
>                brokerURI.c_str());
>         return (me.addConnection(connSesn, brokerURI, ackMode));
>     }
>     // Check if the broker restarted and reset our connection. If it has,
>     // then create session will generate an exception and we know we then
>     // need to make a new connection. If it it works then use the session,
>     // passing it back in the connSession
>     try {
>         p_session.reset(p_conn->createSession(ackMode));
>     }
>     catch (std::exception    &e) {
>         printf("Re-creating connection for %s: %s\n",
>                brokerURI.c_str(), e.what());
>         return (me.addConnection(connSesn, brokerURI, ackMode));
>     }
> }
> // **********************************************************************
> // **********************************************************************
> //                               AMQ_Producer
> // **********************************************************************
> // **********************************************************************
> // m_delivery_mode
> //
> //     The producer's delivery mode is set to NON_PERSISTENT. When persistent
> //     it causes the broker to save the message to disk so that it is available
> //     even if the broker restarts. However, this has a price, it greatly slows
> //     down the send (220,475 microseconds with persistance, vs. 226
> //     microseconds without.) These measurements where taken WITH synchronous
> //     sends. We could have investigated and used async sends to speed up the
> //     send call and keep persistence, however, this wasn't deemed necessary.
> //
> //     http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/classcms_1_1_delivery_mode.html
> //
> //     http://activemq.apache.org/persistence-questions.html
> //
> //     http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
> //
> const cms::DeliveryMode::DELIVERY_MODE
> AMQ_Producer::m_delivery_mode = cms::DeliveryMode::NON_PERSISTENT;
> /****************************************************************************
> **
> ** Name:        AMQ_Producer::AMQ_Producer
> **
> ** Function:    Create the Active MQ Producer client object, storing the
> **              destination and creating the URI string from the provided host
> **              name and port number.
> **
> ** Input Parms: dest: The destination queue/topic to communicate with
> **
> **              host: The host to communicate with
> **
> **              port: The port on the host to communicate with
> **
> ** Return Parm: 
> **
> ****************************************************************************/
> AMQ_Producer::AMQ_Producer (const std::string    &dest,
>                             const std::string    &host,
>                             const std::string    &port) :
>     m_dest(dest),
>     m_broker_uri("tcp://" + host + ":" + port +
>                  "?connection.alwaysSyncSend=true")
> {
>     // add the send timeout if its environemtn varialbe is given and its a
>     // value other than the default, zero.
>     int    sendTimeout(0);
>     GetEnv("SEND_TO", sendTimeout, 0);
>     if (sendTimeout) {
>         std::stringstream    ss_sto;
>         ss_sto << "&connection.sendTimeout=" << sendTimeout;
>         m_broker_uri += ss_sto.str();
>     }
> }
> /****************************************************************************
> **
> ** Name:        AMQ_Producer::send
> **
> ** Function:    Cause the message to be sent by this producer to the
> **              established message queue, host and port.
> **
> **              This gets a cached (or if needed new) connection which has a
> **              session created for it, it then creates the destination,
> **              producer and message. Finally it adds all of the properties
> **              that we want to the message and sends it.
> **
> **              This model is best explained at the Active MQ CPP example page
> **              at: http://activemq.apache.org/cms/cms-api-overview.html
> **
> ** Input Parms: msg - The message to send with this producer
> **
> **              type - (optional) The CMSType/JMSType to set the message to. 
> **
> ** Return Parm: None
> **
> ****************************************************************************/
> void
> AMQ_Producer::send (const std::string    &msg,
>                     const std::string    &type)
> {
>     std::string    s_where;
>     try {
>         ConnSessionManager::ConnSession    conSesn;
>         // Create the Connection + Session
>         s_where = "creating connection/session";
>         ConnSessionManager::getConnSession(conSesn, m_broker_uri,
>                                            cms::Session::AUTO_ACKNOWLEDGE);
>         // Create the Destination (queue/topic)
>         s_where = "creating destination";
>         boost::shared_ptr<cms::Destination>
>             p_destination(conSesn.p_session->createQueue(m_dest));
>         // Create the Producer
>         s_where = "creating message producer";
>         boost::shared_ptr<cms::MessageProducer>
>             p_producer(conSesn.p_session->createProducer(p_destination.get()));
>         p_producer->setDeliveryMode(m_delivery_mode);
>         // Create the Message
>         s_where = "creating connection";
>         boost::shared_ptr<cms::TextMessage>
>             p_msg(conSesn.p_session->createTextMessage(msg));
>         // Set the Message's JMSType/CMSType
>         if (!type.empty()) {
>             p_msg->setCMSType(type);
>         }
>         std::map<const std::string, std::string>::iterator it;
>         // run through all properties, adding them to the message
>         for (it = propertyInfo.begin(); it != propertyInfo.end(); ++it) {
>             s_where = "adding property " + (*it).first;
>             p_msg->setStringProperty((*it).first, (*it).second);
>         }
>         // Send it!
>         s_where = "sending message";
>         p_producer->send(p_msg.get());
>     }
>     catch (std::exception    &e) {
>         std::stringstream    ss_err;
>         ss_err << "Error while " << s_where << " [" << e.what() << "]";
>         throw ee_except(ss_err);
>     }
> }
> // ---------------------------------------------------------------------------
> const std::string    base_msg("1024 characters folowed by a message.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxHello world! ");
> void run_test ()
> {
>     std::string     host("127.0.0.1");
>     std::string     port("61616");
>     std::string     destURI("c.c.p.v.ms.events");
>     unsigned int    iteration(0);
>     unsigned int    ex_count(0);
>     // Loop, creating producers and sending the message
>     while (1) {
>         iteration++;
>         AMQ_Producer    producer(destURI, host, port);
>         std::stringstream    msg;
>         msg << base_msg << iteration;
>         printf("Sending message #%d\n", iteration);
>         try {
>             producer.send(msg.str(), "TEST_TYPE");
>         }
>         catch (std::exception    &e) {
>             ex_count++;
>             printf("\n\nException[%d]: %s\n", ex_count, e.what());
>             if (ex_count >= 10 ) {
>                 printf("\n\nException limit (%d) reached. Stopping test\n\n\n",
>                        ex_count);
>                 break;
>             }
>         }
>     }
> }
> ///////////////////////////////////////////////////////////////////////////////
> int main(int, char*) {
>     activemq::library::ActiveMQCPP::initializeLibrary();
>     std::cout << "=====================================================\n";
>     std::cout << "Starting the example:" << std::endl;
>     std::cout << "-----------------------------------------------------\n";
>     run_test();   
>     std::cout << "-----------------------------------------------------\n";
>     std::cout << "Finished with the example." << std::endl;
>     std::cout << "=====================================================\n";
>     activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

Mime
View raw message