Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C923BD3D1 for ; Tue, 10 Jul 2012 20:52:36 +0000 (UTC) Received: (qmail 1823 invoked by uid 500); 10 Jul 2012 20:52:35 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 1734 invoked by uid 500); 10 Jul 2012 20:52:35 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 1309 invoked by uid 99); 10 Jul 2012 20:52:35 -0000 Received: from issues-vm.apache.org (HELO issues-vm) (140.211.11.160) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jul 2012 20:52:35 +0000 Received: from isssues-vm.apache.org (localhost [127.0.0.1]) by issues-vm (Postfix) with ESMTP id C8A86142861 for ; Tue, 10 Jul 2012 20:52:34 +0000 (UTC) Date: Tue, 10 Jul 2012 20:52:34 +0000 (UTC) From: "Timothy Bish (JIRA)" To: dev@activemq.apache.org Message-ID: <1509867177.31476.1341953554824.JavaMail.jiratomcat@issues-vm> In-Reply-To: <1913732981.26214.1341883714420.JavaMail.jiratomcat@issues-vm> Subject: [jira] [Commented] (AMQCPP-413) Producer connection that causes broker to reach its memory/disk limits doesn't get the 'all full' exception even though the broker is configured to send it for Producer Flow Control. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQCPP-413?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1341= 0881#comment-13410881 ]=20 Timothy Bish commented on AMQCPP-413: ------------------------------------- Sorry didn't realize they were in those comments, you can attach files to t= he issue as well in the future.=20 =20 > Producer connection that causes broker to reach its memory/disk limits do= esn't get the 'all full' exception even though the broker is configured to = send it for Producer Flow Control. > -------------------------------------------------------------------------= ---------------------------------------------------------------------------= ---------------------------------- > > Key: AMQCPP-413 > URL: https://issues.apache.org/jira/browse/AMQCPP-413 > Project: ActiveMQ C++ Client > Issue Type: Bug > Components: CMS Impl > Affects Versions: 3.4.0, 3.4.4 > Environment: Linux > Reporter: John Rocha > Assignee: Timothy Bish > Priority: Critical > Labels: cpp, producer_flow_control > > Producer connection that cause the broker to reach its memory/disk limits > doesn't get the 'all full' exception even though the broker is configured= to > send it for Producer Flow Control. > +Scenario #1+ > # Delete the broker data directory\\ > \\ > # Start the broker, that sends an exception if no space\\ > \\ > # DO *+NOT+* START A CONUMSER.\\ > \\ > # Run a producer that does synchronous sends, and has the default sendTim= eot of zero(0), it uses one connection, and enters a loop that just sends m= essages.\\ > \\ > After awhile the producer will lock up and never recovers.\\ > \\ > # Start another producer in another window.\\ > \\ > It immediate fails with a 'broker full' exception. > +Scenario #2+ > # Delete the broker data directory\\ > \\ > # Start the broker, that sends an exception if no space\\ > \\ > # DO *+NOT+* START A CONUMSER.\\ > \\ > # Run a producer that does synchronous sends, and has the sendTimeot of 5= 00 ms, it uses one connection, and enters a loop that just sends messages.\= \ > \\ > After awhile the producer will cause the broker to reach it's limit. And = then the send method will start timing out. It never gets a 'broker full' e= xception.\\ > \\ > # Start another producer in another window.\\ > \\ > It immediate fails with a 'broker full' exception > \\ > \\ > ---- > \\ > \\ > {noformat} > ###################################################################### > ## SCENARIO #1 > ###################################################################### > ## > ## Restart our tomcat service which restarts the AMQ Broker and view the > ## directory size. > ## > root@psbu-jrr-lnx:# /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWht= tpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/B= Whttpd/tomcat/activemq-data > Stopping tomcat ... done > Killing tomcat ... done. > Starting tomcat ... done > 44K /usr/BWhttpd/tomcat/activemq-data > root@psbu-jrr-lnx:# =20 > ## > ## View the activemq.xml configuration file used for startint active MQ > ## > root@psbu-jrr-lnx:# cat /usr/BWhttpd/tomcat/webapps/amqbroker/WEB-INF/cl= asses/conf/activemq.xml > > xmlns:amq=3D"http://activemq.apache.org/schema/core" > xmlns:xsi=3D"http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=3D"http://www.springframework.org/schema/beans=20 > http://www.springframework.org/schema/beans/spring-beans-2.5.xsd =20 > http://activemq.apache.org/schema/core=20 > http://activemq.apache.org/schema/core/activemq-core-5.3.2.xsd" > default-autowire=3D"byName"> > > > > brokerName=3D"localhost" > advisorySupport=3D"true" > dataDirectory=3D"${catalina.home}/activemq-data" > useJmx=3D"false" > useShutdownHook=3D"false"> > > > > > " memoryLimit=3D"5mb" /> > " memoryLimit=3D"5mb" /> > > > > > > > > > > > > > > > > > > > uri=3D"tcp://0.0.0.0:61616?wireFormat.maxInactivityDuration= =3D0" /> > > > > ## > ## View the runtime environment to validate the library is 3.4.4, the lat= ests. > ## I cannot explain why the number is 14.0.4, but I observed that 3.4.0 u= sed > ## 14.0.0 > ## > 242(TEST)jrr@[SUSE10.1]> ls $LD_LIBRARY_PATH/libactive* > /usr/BWhttpd/lib//libactivemq-cpp.so* > /usr/BWhttpd/lib//libactivemq-cpp.so.14* > /usr/BWhttpd/lib//libactivemq-cpp.so.14.0.4* > 243(TEST)jrr@[SUSE10.1]> =20 > ## > ## Compile the simple producer > ## > Compiling simple_producer.o > g++ -g -c -MD -Wall -Werror -I /views/LU-7.0-NEWAMQ/server/CommonLib/incl= ude/activemq-cpp/ -I /views/LU-7.0-NEWAMQ/server/CommonLib/include/ -I /vie= ws/LU-7.0-NEWAMQ/server/CommonLib/include/apr-1 simple_producer.cpp > g++ -o simple_producer.exe simple_producer.o -lc -lrt \ > -lactivemq-cpp \ > -lboost_thread \ > -L /usr/BWhttpd/lib > Compilation finished at Mon Jul 9 15:32:48 > ## > ## Execute the test program. It creates a connection and then loops forev= er > ## sending messages. Where for each message it creates sessions, destiat= ion, > ## producer,etc. sends the message and then destructs all of those pieces= . > ## > ## It locks up at 1775 messages > ## > 257(TEST)jrr@[SUSE10.1]> simple_producer.exe > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > Starting the example: > ----------------------------------------------------- > Sending message #1 > Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend= =3Dtrue > Sending message #2 > Sending message #3 > Sending message #4 > Sending message #5 > ... > ... > ... > Sending message #1772 > Sending message #1773 > Sending message #1774 > Sending message #1775 > Sending message #1776 > ## > ## View the ActiveMQ disk usage during the lock up > ## > du -sh activemq-data/ > 12M activemq-data/ > root@psbu-jrr-lnx:# =20 > ## > ## Use GDB to see where the simple_producer is blocked > ## > psbu-jrr-lnx[SUSE10.1]:176> ps auxw | egrep simple_producer.exe > jrr 16974 1.3 0.6 53980 6724 pts/12 Sl+ 17:58 0:00 simple_p= roducer.exe > jrr 16997 0.0 0.0 1864 660 pts/18 S+ 17:59 0:00 /bin/gre= p -E simple_producer.exe > psbu-jrr-lnx[SUSE10.1]:177> =20 > psbu-jrr-lnx[SUSE10.1]:177> gdb simple_producer.exe=20 > GNU gdb 6.6 > Copyright (C) 2006 Free Software Foundation, Inc. > GDB is free software, covered by the GNU General Public License, and you = are > welcome to change it and/or distribute copies of it under certain conditi= ons. > Type "show copying" to see the conditions. > There is absolutely no warranty for GDB. Type "show warranty" for detail= s. > This GDB was configured as "i586-suse-linux"... > Using host libthread_db library "/lib/libthread_db.so.1". > (gdb) set print pretty > (gdb) set pagination off > (gdb) attach 16974 > Attaching to program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process 1= 6974 > Reading symbols from /lib/librt.so.1...done. > Loaded symbols for /lib/librt.so.1 > Reading symbols from /usr/BWhttpd/lib/libactivemq-cpp.so.14...done. > Loaded symbols for /usr/BWhttpd/lib/libactivemq-cpp.so.14 > Reading symbols from /usr/BWhttpd/lib/libboost_thread.so.1.43.0...done. > Loaded symbols for /usr/BWhttpd/lib/libboost_thread.so.1.43.0 > Reading symbols from /usr/lib/libstdc++.so.6...done. > Loaded symbols for /usr/lib/libstdc++.so.6 > Reading symbols from /lib/libm.so.6...done. > Loaded symbols for /lib/libm.so.6 > Reading symbols from /lib/libc.so.6...done. > Loaded symbols for /lib/libc.so.6 > Reading symbols from /lib/libgcc_s.so.1...done. > Loaded symbols for /lib/libgcc_s.so.1 > Reading symbols from /lib/libpthread.so.0...done. > [Thread debugging using libthread_db enabled] > [New Thread -1221044560 (LWP 16974)] > [New Thread -1254835296 (LWP 16981)] > [New Thread -1246442592 (LWP 16978)] > [New Thread -1238049888 (LWP 16977)] > [New Thread -1229657184 (LWP 16976)] > [New Thread -1221264480 (LWP 16975)] > Loaded symbols for /lib/libpthread.so.0 > Reading symbols from /lib/ld-linux.so.2...done. > Loaded symbols for /lib/ld-linux.so.2 > Reading symbols from /usr/BWhttpd/lib/libapr-1.so.0...done. > Loaded symbols for /usr/BWhttpd/lib/libapr-1.so.0 > Reading symbols from /lib/libuuid.so.1...done. > Loaded symbols for /lib/libuuid.so.1 > Reading symbols from /lib/libcrypt.so.1...done. > Loaded symbols for /lib/libcrypt.so.1 > Reading symbols from /usr/BWhttpd/lib/libaprutil-1.so.0...done. > Loaded symbols for /usr/BWhttpd/lib/libaprutil-1.so.0 > Reading symbols from /usr/BWhttpd/lib/libexpat.so.1...done. > Loaded symbols for /usr/BWhttpd/lib/libexpat.so.1 > Reading symbols from /usr/lib/libssl.so.0.9.8...done. > Loaded symbols for /usr/lib/libssl.so.0.9.8 > Reading symbols from /usr/lib/libcrypto.so.0.9.8...done. > Loaded symbols for /usr/lib/libcrypto.so.0.9.8 > Reading symbols from /lib/libdl.so.2...done. > Loaded symbols for /lib/libdl.so.2 > 0xffffe410 in __kernel_vsyscall () > (gdb) thread apply all where > Thread 6 (Thread -1221264480 (LWP 16975)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.= so.0 > #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6 > #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait = (condition=3D0x806b120) at decaf/internal/util/concurrent/unix/ConditionImp= l.cpp:101 > #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=3D0x806ae5c)= at decaf/util/concurrent/Mutex.cpp:126 > #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::= wait (this=3D0x806ae58) at decaf/internal/util/concurrent/SynchronizableImp= l.cpp:48 > #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=3D0x806ae50) at decaf= /util/Timer.cpp:81 > #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties= =3D0x806aff8) at decaf/lang/Thread.cpp:137 > #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=3D0x806aff8) a= t decaf/lang/Thread.cpp:190 > #9 0xb75912ab in start_thread () from /lib/libpthread.so.0 > #10 0xb766aa4e in clone () from /lib/libc.so.6 > Thread 5 (Thread -1229657184 (LWP 16976)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.= so.0 > #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6 > #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait = (condition=3D0x806b368) at decaf/internal/util/concurrent/unix/ConditionImp= l.cpp:101 > #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=3D0x806b1fc)= at decaf/util/concurrent/Mutex.cpp:126 > #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::= wait (this=3D0x806b1f8) at decaf/internal/util/concurrent/SynchronizableImp= l.cpp:48 > #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=3D0x806b1f0) at decaf= /util/Timer.cpp:81 > #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties= =3D0x806b240) at decaf/lang/Thread.cpp:137 > #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=3D0x806b240) a= t decaf/lang/Thread.cpp:190 > #9 0xb75912ab in start_thread () from /lib/libpthread.so.0 > #10 0xb766aa4e in clone () from /lib/libc.so.6 > Thread 4 (Thread -1238049888 (LWP 16977)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.= so.0 > #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6 > #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait = (condition=3D0x806c718) at decaf/internal/util/concurrent/unix/ConditionImp= l.cpp:101 > #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=3D0x806c2ec)= at decaf/util/concurrent/Mutex.cpp:126 > #5 0xb7d77fc0 in decaf::internal::util::concurrent::SynchronizableImpl::= wait (this=3D0x806c2e8) at decaf/internal/util/concurrent/SynchronizableImp= l.cpp:48 > #6 0xb7de6ba9 in decaf::util::TimerImpl::run (this=3D0x806c2e0) at decaf= /util/Timer.cpp:81 > #7 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties= =3D0x806c5c0) at decaf/lang/Thread.cpp:137 > #8 0xb7daa47c in (anonymous namespace)::threadWorker (arg=3D0x806c5c0) a= t decaf/lang/Thread.cpp:190 > #9 0xb75912ab in start_thread () from /lib/libpthread.so.0 > #10 0xb766aa4e in clone () from /lib/libc.so.6 > Thread 3 (Thread -1246442592 (LWP 16978)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb765ba8b in std::exception::what () from /lib/libc.so.6 > #2 0xb7580b7a in apr_socket_recv (sock=3D0x80649e8, buf=3D0x8066c10 "", = len=3D0xb5b4c040) at network_io/unix/sendrecv.c:81 > #3 0xb7d49ebd in decaf::internal::net::tcp::TcpSocket::read (this=3D0x80= 64898, buffer=3D0x8066c10 "", size=3D8192, offset=3D0, length=3D8192) at de= caf/internal/net/tcp/TcpSocket.cpp:649 > #4 0xb7d4d1c0 in decaf::internal::net::tcp::TcpSocketInputStream::doRead= ArrayBounded (this=3D0x8066c10, buffer=3D0x2000
, size=3D8192, offset=3D0, length=3D8192) at decaf/internal/net/tcp/TcpS= ocketInputStream.cpp:108 > #5 0xb7d91d1f in decaf::io::InputStream::doReadArray (this=3D0x8066998, = buffer=3D0x8066c10 "", size=3D8192) at decaf/io/InputStream.cpp:138 > #6 0xb7d92333 in decaf::io::InputStream::read (this=3D0x8066998, buffer= =3D0x8066c10 "", size=3D8192) at decaf/io/InputStream.cpp:72 > #7 0xb7d866ef in decaf::io::BufferedInputStream::bufferData (this=3D0x80= 66b60, inputStream=3D0x8066998, buffer=3D@0xb5b4c1f8) at decaf/io/BufferedI= nputStream.cpp:326 > #8 0xb7d86d18 in decaf::io::BufferedInputStream::doReadArrayBounded (thi= s=3D0x8066b60, buffer=3D0x80669ca "", size=3D4, offset=3D0, length=3D4) at = decaf/io/BufferedInputStream.cpp:228 > #9 0xb7d92191 in decaf::io::InputStream::read (this=3D0x8066b60, buffer= =3D0x80669ca "", size=3D4, offset=3D0, length=3D4) at decaf/io/InputStream.= cpp:84 > #10 0xb7d8a757 in decaf::io::DataInputStream::readAllData (this=3D0x80669= b8, buffer=3D0x80669ca "", length=3D4) at decaf/io/DataInputStream.cpp:492 > #11 0xb7d8c684 in decaf::io::DataInputStream::readInt (this=3D0x80669b8) = at decaf/io/DataInputStream.cpp:124 > #12 0xb7cae1b8 in activemq::wireformat::openwire::OpenWireFormat::unmarsh= al (this=3D0x8063e10, transport=3D0x8064790, dis=3D0x80669b8) at activemq/w= ireformat/openwire/OpenWireFormat.cpp:245 > #13 0xb7c2c9f7 in activemq::transport::IOTransport::run (this=3D0x8064790= ) at activemq/transport/IOTransport.cpp:246 > #14 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties= =3D0x806c088) at decaf/lang/Thread.cpp:137 > #15 0xb7daa47c in (anonymous namespace)::threadWorker (arg=3D0x806c088) a= t decaf/lang/Thread.cpp:190 > #16 0xb75912ab in start_thread () from /lib/libpthread.so.0 > #17 0xb766aa4e in clone () from /lib/libc.so.6 > Thread 2 (Thread -1254835296 (LWP 16981)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.= so.0 > #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6 > #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait = (condition=3D0x806cde8) at decaf/internal/util/concurrent/unix/ConditionImp= l.cpp:101 > #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=3D0x806d24c)= at decaf/util/concurrent/Mutex.cpp:126 > #5 0xb7c1fada in activemq::threads::CompositeTaskRunner::run (this=3D0x8= 06d208) at activemq/threads/CompositeTaskRunner.cpp:115 > #6 0xb7dac25a in decaf::lang::ThreadProperties::runCallback (properties= =3D0x806cbc0) at decaf/lang/Thread.cpp:137 > #7 0xb7daa47c in (anonymous namespace)::threadWorker (arg=3D0x806cbc0) a= t decaf/lang/Thread.cpp:190 > #8 0xb75912ab in start_thread () from /lib/libpthread.so.0 > #9 0xb766aa4e in clone () from /lib/libc.so.6 > Thread 1 (Thread -1221044560 (LWP 16974)): > #0 0xffffe410 in __kernel_vsyscall () > #1 0xb7595056 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libpthread.= so.0 > #2 0xb767629d in pthread_cond_wait@@GLIBC_2.3.2 () from /lib/libc.so.6 > #3 0xb7d7841d in decaf::internal::util::concurrent::ConditionImpl::wait = (condition=3D0x806f1a8) at decaf/internal/util/concurrent/unix/ConditionImp= l.cpp:101 > #4 0xb7df07c3 in decaf::util::concurrent::Mutex::wait (this=3D0x806f13c)= at decaf/util/concurrent/Mutex.cpp:126 > #5 0xb7de9395 in decaf::util::concurrent::CountDownLatch::await (this=3D= 0x806f134) at decaf/util/concurrent/CountDownLatch.cpp:53 > #6 0xb7c36025 in activemq::transport::correlator::FutureResponse::getRes= ponse (this=3D0x806f130) at ./activemq/transport/correlator/FutureResponse.= h:62 > #7 0xb7c33aa2 in activemq::transport::correlator::ResponseCorrelator::re= quest (this=3D0x806b740, command=3D@0xbf9301cc) at activemq/transport/corre= lator/ResponseCorrelator.cpp:120 > #8 0xb7b649fa in activemq::core::ActiveMQConnection::syncRequest (this= =3D0x806b858, command=3D@0xbf9301cc, timeout=3D0) at activemq/core/ActiveMQ= Connection.cpp:896 > #9 0xb7baf1d8 in activemq::core::ActiveMQSession::send (this=3D0x80635b8= , message=3D0x806e640, producer=3D0x806e530, usage=3D0x0) at activemq/core/= ActiveMQSession.cpp:921 > #10 0xb7ba1e71 in activemq::core::ActiveMQProducer::send (this=3D0x806e53= 0, destination=3D0x806e3ec, message=3D0x806e640, deliveryMode=3D1, priority= =3D4, timeToLive=3D0) at activemq/core/ActiveMQProducer.cpp:211 > #11 0xb7ba2b07 in activemq::core::ActiveMQProducer::send (this=3D0x806e53= 0, destination=3D0x806e3ec, message=3D0x806e640) at activemq/core/ActiveMQP= roducer.cpp:152 > #12 0xb7ba3cab in activemq::core::ActiveMQProducer::send (this=3D0x806e53= 0, message=3D0x806e640) at activemq/core/ActiveMQProducer.cpp:128 > #13 0x0804c5ac in AMQ_Producer::send (this=3D0xbf9305c8, msg=3D@0xbf93060= 0, type=3D@0xbf9305f8) at simple_producer.cpp:720 > #14 0x0804ca45 in run_test () at simple_producer.cpp:767 > #15 0x0804cd47 in main () at simple_producer.cpp:798 > #0 0xffffe410 in __kernel_vsyscall () > (gdb) detach > Detaching from program: /views/TEST/AMQ/AMQ2/simple_producer.exe, process= 16974 > (gdb) quit > psbu-jrr-lnx[SUSE10.1]:178> =20 > ## > ## If we kill the program and execute it again, then this time it will > ## terminate with the expected exception. > ## > 258(TEST)jrr@[SUSE10.1]> simple_producer.exe > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > Starting the example: > ----------------------------------------------------- > Sending message #1 > Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend= =3Dtrue > Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TR= ACE *** > Message: Usage Manager Temp Store is Full (01001622326f 1048576). Stoppin= g producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:0:0) to prevent floodin= g queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-c= ontrol.html for more info > Exception Class javax.jms.ResourceAllocationException > [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.br= oker.region.Queue.checkUsage > [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.br= oker.region.Queue.doMessageSend > [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.br= oker.region.Queue.send > [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.ac= tivemq.broker.region.AbstractRegion.send > [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.acti= vemq.broker.region.RegionBroker.send > [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.acti= vemq.broker.BrokerFilter.send > [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: or= g.apache.activemq.broker.CompositeDestinationBroker.send > [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache= .activemq.broker.TransactionBroker.send > [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apac= he.activemq.broker.MutableBrokerFilter.send > [FILE: TransportConnection.java, LINE: 458] occurred in: org.apac= he.activemq.broker.TransportConnection.processMessage > [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.a= ctivemq.command.ActiveMQMessage.visit > [FILE: TransportConnection.java, LINE: 306] occurred in: org.apac= he.activemq.broker.TransportConnection.service > [FILE: TransportConnection.java, LINE: 179] occurred in: org.apac= he.activemq.broker.TransportConnection$1.onCommand > [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.ac= tivemq.transport.TransportFilter.onCommand > [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apa= che.activemq.transport.WireFormatNegotiator.onCommand > [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache= .activemq.transport.InactivityMonitor.onCommand > [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.a= ctivemq.transport.TransportSupport.doConsume > [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.doRun > [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.run > [FILE: , LINE: -1] occurred in: java.lang.Thread.run > *** END SERVER-SIDE STACK TRACE ***] > Sending message #2 > ... > ... > ... > Sending message #10 > Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK T= RACE *** > Message: Usage Manager Temp Store is Full (01001622426f 1048576). Stoppin= g producer (ID:psbu-jrr-lnx-53043-1341882066464-0:0:9:0) to prevent floodin= g queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-c= ontrol.html for more info > Exception Class javax.jms.ResourceAllocationException > [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.br= oker.region.Queue.checkUsage > [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.br= oker.region.Queue.doMessageSend > [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.br= oker.region.Queue.send > [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.ac= tivemq.broker.region.AbstractRegion.send > [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.acti= vemq.broker.region.RegionBroker.send > [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.acti= vemq.broker.BrokerFilter.send > [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: or= g.apache.activemq.broker.CompositeDestinationBroker.send > [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache= .activemq.broker.TransactionBroker.send > [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apac= he.activemq.broker.MutableBrokerFilter.send > [FILE: TransportConnection.java, LINE: 458] occurred in: org.apac= he.activemq.broker.TransportConnection.processMessage > [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.a= ctivemq.command.ActiveMQMessage.visit > [FILE: TransportConnection.java, LINE: 306] occurred in: org.apac= he.activemq.broker.TransportConnection.service > [FILE: TransportConnection.java, LINE: 179] occurred in: org.apac= he.activemq.broker.TransportConnection$1.onCommand > [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.ac= tivemq.transport.TransportFilter.onCommand > [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apa= che.activemq.transport.WireFormatNegotiator.onCommand > [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache= .activemq.transport.InactivityMonitor.onCommand > [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.a= ctivemq.transport.TransportSupport.doConsume > [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.doRun > [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.run > [FILE: , LINE: -1] occurred in: java.lang.Thread.run > *** END SERVER-SIDE STACK TRACE ***] > Exception limit (10) reached. Stopping test > ----------------------------------------------------- > Finished with the example. > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > terminate called after throwing an instance of 'decaf::lang::exceptions::= RuntimeException' > what(): Unlock Failed, this thread is not the Lock Owner! > Abort > 259(TEST)jrr@[SUSE10.1]> =20 > {noformat} > \\ > \\ > ---- > \\ > \\ > {noformat} > ###################################################################### > ## SCENARIO #2 > ###################################################################### > ## > ## Restart our tomcat service which restarts the AMQ Broker and view the > ## directory size. > ## > root@psbu-jrr-lnx:# /usr/BWhttpd/bin/init_tomcat stop; \rm -rf /usr/BWht= tpd/tomcat/activemq-data; /usr/BWhttpd/bin/init_tomcat start; du -sh /usr/B= Whttpd/tomcat/activemq-data > Stopping tomcat ... done > Killing tomcat ... done. > Starting tomcat ... done > 44K /usr/BWhttpd/tomcat/activemq-data > root@psbu-jrr-lnx:# =20 > ## > ## Execute the test program, this time pass in variables to cause the sen= dTime > ## to be set to 500ms. It creates a connection and then loops forever sen= ding > ## messages. Where for each message it creates sessions, destiation, > ## producer,etc. sends the message and then destructs all of those pieces= . > ## > ## Now at 1775 messages the send times out. Moreover, the test moves on t= rying > ## more sends, and they all time out. After 10 failures the test exits. T= his > ## sender that causes the broker memory limmit to be reached never gets a > ## 'broker full' exception. > ## > 262(TEST)jrr@[SUSE10.1]> env SEND_TO=3D500 simple_producer.exe > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > Starting the example: > ----------------------------------------------------- > SEND_TO set to 500 > Sending message #1 > Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend= =3Dtrue&connection.sendTimeout=3D500 > SEND_TO set to 500 > Sending message #2 > SEND_TO set to 500 > Sending message #3 > SEND_TO set to 500 > Sending message #4 > ... > ... > ... > Sending message #1776 > Exception[1]: Error while sending message [No valid response received for= command: Message { commandId =3D 8879, responseRequired =3D true, Producer= Id =3D ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0, Destination =3D queu= e://c.c.p.v.ms.events, TransactionId =3D NULL, OriginalDestination =3D NULL= , MessageId =3D ID:psbu-jrr-lnx-47169-1341882401921-0:0:1775:0:0:0, Origina= lTransactionId =3D NULL, GroupID =3D , GroupSequence =3D 0, CorrelationId = =3D , Persistent =3D false, Expiration =3D 0, Priority =3D 4, ReplyTo =3D N= ULL, Timestamp =3D 1341882406582, Type =3D TEST_TYPE, Content =3D [size=3D1= 045], MarshalledProperties =3D NULL, DataStructure =3D NULL, TargetConsumer= Id =3D NULL, Compressed =3D false, RedeliveryCounter =3D 0, BrokerPath =3D = NULL, Arrival =3D 0, UserID =3D , RecievedByDFBridge =3D false, Droppable = =3D false, Cluster =3D NULL, BrokerInTime =3D 0, BrokerOutTime =3D 0 }Text = =3D 1024 characters folowed by a message.xxxxxxxx... world! 1776, check bro= ker.] > SEND_TO set to 500 > Sending message #1777 > ... > ... > ... > Sending message #1785 > Exception[10]: Error while sending message [No valid response received fo= r command: Message { commandId =3D 8924, responseRequired =3D true, Produce= rId =3D ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0, Destination =3D que= ue://c.c.p.v.ms.events, TransactionId =3D NULL, OriginalDestination =3D NUL= L, MessageId =3D ID:psbu-jrr-lnx-47169-1341882401921-0:0:1784:0:0:0, Origin= alTransactionId =3D NULL, GroupID =3D , GroupSequence =3D 0, CorrelationId = =3D , Persistent =3D false, Expiration =3D 0, Priority =3D 4, ReplyTo =3D N= ULL, Timestamp =3D 1341882411136, Type =3D TEST_TYPE, Content =3D [size=3D1= 045], MarshalledProperties =3D NULL, DataStructure =3D NULL, TargetConsumer= Id =3D NULL, Compressed =3D false, RedeliveryCounter =3D 0, BrokerPath =3D = NULL, Arrival =3D 0, UserID =3D , RecievedByDFBridge =3D false, Droppable = =3D false, Cluster =3D NULL, BrokerInTime =3D 0, BrokerOutTime =3D 0 }Text = =3D 1024 characters folowed by a message.xxxxxxxx... world! 1785, check bro= ker.] > Exception limit (10) reached. Stopping test > ----------------------------------------------------- > Finished with the example. > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > terminate called after throwing an instance of 'decaf::lang::exceptions::= RuntimeException' > what(): Unlock Failed, this thread is not the Lock Owner! > Abort > ## > ## > ## View the ActiveMQ disk usage during the lock up > ## > root@psbu-jrr-lnx:# !du > du -sh activemq-data/ > 12M activemq-data/ > root@psbu-jrr-lnx:# =20 > ## > ## If we re-execute the test now that the broker is full, then this time = it > ## will terminate with the expected broker full exception. > ## > 263(TEST)jrr@[SUSE10.1]> env SEND_TO=3D500 simple_producer.exe > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > Starting the example: > ----------------------------------------------------- > SEND_TO set to 500 > Sending message #1 > Creating connection for tcp://127.0.0.1:61616?connection.alwaysSyncSend= =3Dtrue&connection.sendTimeout=3D500 > Exception[1]: Error while sending message [*** BEGIN SERVER-SIDE STACK TR= ACE *** > Message: Usage Manager Temp Store is Full (01001622756f 1048576). Stoppin= g producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:0:0) to prevent floodin= g queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-c= ontrol.html for more info > Exception Class javax.jms.ResourceAllocationException > [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.br= oker.region.Queue.checkUsage > [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.br= oker.region.Queue.doMessageSend > [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.br= oker.region.Queue.send > [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.ac= tivemq.broker.region.AbstractRegion.send > [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.acti= vemq.broker.region.RegionBroker.send > [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.acti= vemq.broker.BrokerFilter.send > [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: or= g.apache.activemq.broker.CompositeDestinationBroker.send > [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache= .activemq.broker.TransactionBroker.send > [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apac= he.activemq.broker.MutableBrokerFilter.send > [FILE: TransportConnection.java, LINE: 458] occurred in: org.apac= he.activemq.broker.TransportConnection.processMessage > [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.a= ctivemq.command.ActiveMQMessage.visit > [FILE: TransportConnection.java, LINE: 306] occurred in: org.apac= he.activemq.broker.TransportConnection.service > [FILE: TransportConnection.java, LINE: 179] occurred in: org.apac= he.activemq.broker.TransportConnection$1.onCommand > [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.ac= tivemq.transport.TransportFilter.onCommand > [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apa= che.activemq.transport.WireFormatNegotiator.onCommand > [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache= .activemq.transport.InactivityMonitor.onCommand > [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.a= ctivemq.transport.TransportSupport.doConsume > [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.doRun > [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.run > [FILE: , LINE: -1] occurred in: java.lang.Thread.run > *** END SERVER-SIDE STACK TRACE ***] > SEND_TO set to 500 > ... > ... > ... > SEND_TO set to 500 > Sending message #10 > Exception[10]: Error while sending message [*** BEGIN SERVER-SIDE STACK T= RACE *** > Message: Usage Manager Temp Store is Full (01001623156f 1048576). Stoppin= g producer (ID:psbu-jrr-lnx-32967-1341882542997-0:0:9:0) to prevent floodin= g queue://c.c.p.v.ms.events. See http://activemq.apache.org/producer-flow-c= ontrol.html for more info > Exception Class javax.jms.ResourceAllocationException > [FILE: BaseDestination.java, LINE: 579] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: BaseDestination.java, LINE: 573] occurred in: org.apache.a= ctivemq.broker.region.BaseDestination.waitForSpace > [FILE: Queue.java, LINE: 757] occurred in: org.apache.activemq.br= oker.region.Queue.checkUsage > [FILE: Queue.java, LINE: 674] occurred in: org.apache.activemq.br= oker.region.Queue.doMessageSend > [FILE: Queue.java, LINE: 653] occurred in: org.apache.activemq.br= oker.region.Queue.send > [FILE: AbstractRegion.java, LINE: 365] occurred in: org.apache.ac= tivemq.broker.region.AbstractRegion.send > [FILE: RegionBroker.java, LINE: 523] occurred in: org.apache.acti= vemq.broker.region.RegionBroker.send > [FILE: BrokerFilter.java, LINE: 129] occurred in: org.apache.acti= vemq.broker.BrokerFilter.send > [FILE: CompositeDestinationBroker.java, LINE: 96] occurred in: or= g.apache.activemq.broker.CompositeDestinationBroker.send > [FILE: TransactionBroker.java, LINE: 227] occurred in: org.apache= .activemq.broker.TransactionBroker.send > [FILE: MutableBrokerFilter.java, LINE: 135] occurred in: org.apac= he.activemq.broker.MutableBrokerFilter.send > [FILE: TransportConnection.java, LINE: 458] occurred in: org.apac= he.activemq.broker.TransportConnection.processMessage > [FILE: ActiveMQMessage.java, LINE: 681] occurred in: org.apache.a= ctivemq.command.ActiveMQMessage.visit > [FILE: TransportConnection.java, LINE: 306] occurred in: org.apac= he.activemq.broker.TransportConnection.service > [FILE: TransportConnection.java, LINE: 179] occurred in: org.apac= he.activemq.broker.TransportConnection$1.onCommand > [FILE: TransportFilter.java, LINE: 69] occurred in: org.apache.ac= tivemq.transport.TransportFilter.onCommand > [FILE: WireFormatNegotiator.java, LINE: 113] occurred in: org.apa= che.activemq.transport.WireFormatNegotiator.onCommand > [FILE: InactivityMonitor.java, LINE: 227] occurred in: org.apache= .activemq.transport.InactivityMonitor.onCommand > [FILE: TransportSupport.java, LINE: 83] occurred in: org.apache.a= ctivemq.transport.TransportSupport.doConsume > [FILE: TcpTransport.java, LINE: 220] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.doRun > [FILE: TcpTransport.java, LINE: 202] occurred in: org.apache.acti= vemq.transport.tcp.TcpTransport.run > [FILE: , LINE: -1] occurred in: java.lang.Thread.run > *** END SERVER-SIDE STACK TRACE ***] > Exception limit (10) reached. Stopping test > ----------------------------------------------------- > Finished with the example. > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > terminate called after throwing an instance of 'decaf::lang::exceptions::= RuntimeException' > what(): Unlock Failed, this thread is not the Lock Owner! > Abort > 264(TEST)jrr@[SUSE10.1]> =20 > {noformat} > \\ > \\ > ---- > \\ > \\ > {code} > ////////////////////////////////////////////////////////////////////// > // Not so simple producer. > // > // The Active MQ Client's simple producer code has been modified to allow= for > // investigation into client behaviour. I've integrated the products engi= ne for > // obtaining a connection, session, queue, destination, sender and messag= e. > // > // The code basically does the following: > // - create a connection to be used over and over for the test > // - loop forever > // - create a session, queue, destination, sender and message > // - send the message > // - destroy the message, sender, destination, topic and session. > // > // ---------------------------------------- > // ENVIRONMENT VARIABLE OVERRIDES > // ---------------------------------------- > // > // SEND_TO : default is 0. If non-zero then it appends connection.sendTim= eout > // to the destination URI using the value set for SEND_TO. Refe= r to > // the connection options at > // http://activemq.apache.org/cms/configuring.html for details > // regarding its values. > // > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.= 0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie= d. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > #include > using namespace activemq; > using namespace activemq::core; > using namespace decaf; > using namespace decaf::lang; > using namespace decaf::util; > using namespace decaf::util::concurrent; > using namespace cms; > using namespace std; > // > // Override values with environement values if they exist > // > template > bool GetEnv (const std::string var, > DEST &dst, > const DEST &def_value) > { > const char *val(::getenv(var.c_str())); > dst =3D def_value; > if (NULL =3D=3D val) { > return (false); > } > try { > dst =3D boost::lexical_cast(val); > printf("%s set to %s\n", var.c_str(), val); > return(true); > } > catch (std::exception &e) { > printf("%s\n", e.what()); > printf("Cannot convert '%s' to the desired output format\n", val)= ; > } > return(false); > } > // ----------------------------------------------------------------------= ----- > // EventEngineUtil.hxx > // ----------------------------------------------------------------------= ----- > class ee_except : public std::exception > { > public: > // ee_except(std::string) > // > // Create an exception using the specified string > // > explicit ee_except (const std::string &_msg) : > msg(_msg) > {}; > explicit ee_except (const std::stringstream &_msg) : > msg(_msg.str()) > {}; > // ee_except(std::string, errNum) > // > // Create an excpetion using the specified string, and then append= the > // numeric and string representation of the specified errno > // > explicit ee_except (const std::string &_msg, > int errNum) { > ee_except_helper(_msg, errNum); > } > explicit ee_except (const std::stringstream &_msg, > int errNum) { > ee_except_helper(_msg.str(), errNum); > } > // destroy the exception > ~ee_except() throw() {}; > // return the message, overloading the standard what method > virtual const char *what() const throw() { > return msg.c_str(); > } > protected: > void ee_except_helper (const std::string &_msg, > int errNum) { > std::stringstream ss_error; > if (errNum) { > ss_error << _msg << ", errno: " << errNum > << ", " << strerror(errNum); > msg =3D ss_error.str();=20 > } else { > msg =3D _msg; > } > } > private: > std::string msg; > }; > // ----------------------------------------------------------------------= ----- > // From EventEngineAMQProducer > // ----------------------------------------------------------------------= ----- > #include > #include > #include > #include > #include > // CLASS: ConnSessionManager > // > // Singleton class to manage Active MQ (AMQ) connections. > // > // getConnSession(): > // > // This method gets a connection for the specified broker, creates a s= ession > // with the specified ackMode and populates the connSesn parameter wit= h > // shared pointers to the alloated resources. > // > // If a connection already exists for the brokerURI, it creates a sess= ion > // for it and returns them, if the connection does not exist it makes = one > // attaches a session and returns them. If it cannot create the connec= tion > // (such as the broker is not up), it throws an exception. > // > // The connection manager can manage multiple connections based on the > // brokerURI. Each unique broker URI will be a different connection ma= naged > // by the connection manager. > // > // > class ConnSessionManager { > public: > typedef boost::shared_ptr P_SHR_CONN_T; > typedef boost::shared_ptr P_SHR_SESSION_T; > struct ConnSession { > P_SHR_CONN_T p_conn; > P_SHR_SESSION_T p_session; > }; > static void getConnSession(ConnSession &connSesn, > const std::string &brokerURI, > cms::Session::AcknowledgeMode ackMode); > protected: > // ------------------------------------------------------------------= ---- > // P R O T E C T E D M E M B E R V A R I A B L E S > typedef std::map CM_MAP_T; > CM_MAP_T cm_map; > static boost::mutex access_mtx; > // ------------------------------------------------------------------= ---- > // P R O T E C T E D M E M B E R F U N C T I O N > ConnSessionManager(); > ~ConnSessionManager(); > static ConnSessionManager &Singleton(void); > void addConnection(ConnSession &connSesn, > const std::string &brokerURI, > cms::Session::AcknowledgeMode ackMode); > }; > // CLASS: AMQ_Producer > // > // Active MQ (AMQ) message producer. > // > // The constructor creates an AMQ Producer object that has the ability to= send > // messages to the specified DESTination, HOST and PORT. > // > // The send() method causes the message to be sent. While creating the me= ssage, > // the send() method will apply the properites (if any) that have been se= t. > // > // The initProperty() method clears the list of properties that have been > // established for this producer.=20 > // > // The setProperty() method sets an AMQ message property for the message = to be > // sent. A new property is added each time this command is invoked, unles= s it's > // for an existing property, in which case the old property is overwritte= n with > // the new property. > // > class AMQ_Producer { > public: > AMQ_Producer(const std::string &dest, > const std::string &host, > const std::string &port); > ~AMQ_Producer() {}; > void send(const std::string &msg, > const std::string &type =3D ""); > protected: > const std::string m_dest; > std::string m_broker_uri; > std::map propertyInfo; > static const cms::DeliveryMode::DELIVERY_MODE m_delivery_mode; > }; > // ----------------------------------------------------------------------= ----- > // ----------------------------------------------------------------------= ----- > #include > #include > // access control to the singleton. > boost::mutex ConnSessionManager::access_mtx; > static boost::once_flag init_flag =3D BOOST_ONCE_INIT; > /************************************************************************= **** > ** > ** Name: ConnSessionManager::ConnSessionManager > ** > ** Function: Create the connection manage object (singleton) and initi= alize > ** the Active MQ library that we use, once. > ** > ** Input Parms: None > ** > ** Return Parm: None > ** > *************************************************************************= ***/ > ConnSessionManager::ConnSessionManager () : > cm_map() > { > // only call the library initialization ONCE. > boost::call_once(activemq::library::ActiveMQCPP::initializeLibrary, > init_flag); > } > /************************************************************************= **** > ** > ** Name: ConnSessionManager::~ConnSessionManager > ** > ** Function: Destroty the connection manager > ** > ** Input Parms: None > ** > ** Return Parm: None > ** > *************************************************************************= ***/ > ConnSessionManager::~ConnSessionManager () > { > activemq::library::ActiveMQCPP::shutdownLibrary(); > } > /************************************************************************= **** > ** > ** Name: ConnSessionManager::Singleton > ** > ** Function: Return a referece to our singleton. > ** > ** Input Parms: None > ** > ** Return Parm: None > ** > *************************************************************************= ***/ > ConnSessionManager & > ConnSessionManager::Singleton (void) > { > static ConnSessionManager singleton; > return (singleton); > } > /************************************************************************= **** > ** > ** Name: ConnSessionManager::addConnection > ** > ** Function: Add, or re-add a connection to the connection manager. > ** Create a session on the connection and return them. > ** > ** An exception is thrown if we cannot get a connection. > ** > ** Input Parms: connSesn - ConnSession reference that is populated with = a > ** connections shared pointer and session shared > ** pointer. > ** > ** brokerURI - The Active MQ broker URI string to use to cre= ate > ** the connection. > ** > ** http://activemq.apache.org/cms/cms-api-overview.html > ** http://activemq.apache.org/cms/configuring.html > ** > ** ackMode - The kind of acknowledgement we want the session= to > ** have > ** > ** http://activemq.apache.org/cms/api_docs/activemqcpp-3= .4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185 > ** > ** > ** Return Parm: None > ** > *************************************************************************= ***/ > void > ConnSessionManager::addConnection (ConnSessionManager::ConnSession &conn= Sesn, > const std::string &brok= erURI, > cms::Session::AcknowledgeMode ackMo= de) > { > CM_MAP_T &map_ref =3D Singleton().cm_map; > P_SHR_CONN_T &p_conn(connSesn.p_conn); > P_SHR_SESSION_T &p_session(connSesn.p_session); > // Create a ConnectionFactory that we automatically dealloate itself > std::auto_ptr > connectionFactory(cms::ConnectionFactory:: > createCMSConnectionFactory(brokerURI)); > // Create the connection and attach to shared pointer > p_conn.reset(connectionFactory->createConnection()); > // Start the connection > p_conn->start(); > // Create the session > p_session.reset(p_conn->createSession(ackMode)); > // Update the map > map_ref.erase(brokerURI); > map_ref[brokerURI] =3D p_conn; > } > /************************************************************************= **** > ** > ** Name: ConnSessionManager::getConnSession > ** > ** Function: Get a working Active MQ connection for the specified brok= erURI > ** and return it along with an allocated session or thrown a= n > ** exception. > ** > ** This finds the existing AMQ connection for the specified > ** brokerURI, validates that it's still connected, creates a > ** session on it and returns them. If the connection isn't v= alid > ** anymore (i.e. the broker restarted) or if the connection > ** doesn't exist yet, we create it. > ** > ** If we cannot create the connection we throw an exception. > ** > ** WHY DO WE ALLOCATE A SESSION TOO? Why don't we just get t= he > ** conection and return it? It's possible that the connectio= n was > ** reset since the last time we used it (i.e the broker > ** restarted). The only way we can tell this is if the > ** createSession call fails. Since we have to create a sessi= on to > ** prove that the connection works, lets use it. The caller = would > ** do it once it had a connection anyway! > ** > ** Why don't we just return a shared pointer to the session,= why > ** do we also return a shared pointer to the connection? Ses= sions > ** are built on top of connections. It would be BAD if the > ** connection was deleted before the session. So we keep a s= hared > ** pointer of both together to ensure that they have the sam= e > ** lifecycle. > ** > ** Input Parms: connSesn - ConnSession reference that is populated with = a > ** connections shared pointer and session shared > ** pointer. > ** > ** brokerURI - The Active MQ broker URI string to use to cre= ate > ** the connection. > ** > ** http://activemq.apache.org/cms/cms-api-overview.html > ** http://activemq.apache.org/cms/configuring.html > ** > ** ackMode - The kind of acknowledgement we want the session= to > ** have > ** > ** http://activemq.apache.org/cms/api_docs/activemqcpp-3= .4.0/html/classcms_1_1_session.html#ae2fd7b8b76928b465727760c78522185 > ** > ** > ** Return Parm: P_SHR_CONN_T - A shared pointer to the connection to use > ** > *************************************************************************= ***/ > void > ConnSessionManager::getConnSession (ConnSessionManager::ConnSession &conn= Sesn, > const std::string &brok= erURI, > cms::Session::AcknowledgeMode ackMo= de) > { > P_SHR_CONN_T &p_conn(connSesn.p_conn); > P_SHR_SESSION_T &p_session(connSesn.p_session); > ConnSessionManager &me =3D Singleton(); > CM_MAP_T &map_ref =3D me.cm_map; > // restrict access > boost::lock_guard lock(access_mtx); > // get the entry > p_conn =3D map_ref[brokerURI]; > // if it's not allocated yet, then allocate it, attach it and return = it > if (NULL =3D=3D p_conn) { > printf("Creating connection for %s\n", > brokerURI.c_str()); > return (me.addConnection(connSesn, brokerURI, ackMode)); > } > // Check if the broker restarted and reset our connection. If it has, > // then create session will generate an exception and we know we then > // need to make a new connection. If it it works then use the session= , > // passing it back in the connSession > try { > p_session.reset(p_conn->createSession(ackMode)); > } > catch (std::exception &e) { > printf("Re-creating connection for %s: %s\n", > brokerURI.c_str(), e.what()); > return (me.addConnection(connSesn, brokerURI, ackMode)); > } > } > // ********************************************************************** > // ********************************************************************** > // AMQ_Producer > // ********************************************************************** > // ********************************************************************** > // m_delivery_mode > // > // The producer's delivery mode is set to NON_PERSISTENT. When persis= tent > // it causes the broker to save the message to disk so that it is ava= ilable > // even if the broker restarts. However, this has a price, it greatly= slows > // down the send (220,475 microseconds with persistance, vs. 226 > // microseconds without.) These measurements where taken WITH synchro= nous > // sends. We could have investigated and used async sends to speed up= the > // send call and keep persistence, however, this wasn't deemed necess= ary. > // > // http://activemq.apache.org/cms/api_docs/activemqcpp-3.4.0/html/cla= sscms_1_1_delivery_mode.html > // > // http://activemq.apache.org/persistence-questions.html > // > // http://activemq.apache.org/what-is-the-difference-between-persiste= nt-and-non-persistent-delivery.html > // > const cms::DeliveryMode::DELIVERY_MODE > AMQ_Producer::m_delivery_mode =3D cms::DeliveryMode::NON_PERSISTENT; > /************************************************************************= **** > ** > ** Name: AMQ_Producer::AMQ_Producer > ** > ** Function: Create the Active MQ Producer client object, storing the > ** destination and creating the URI string from the provided= host > ** name and port number. > ** > ** Input Parms: dest: The destination queue/topic to communicate with > ** > ** host: The host to communicate with > ** > ** port: The port on the host to communicate with > ** > ** Return Parm:=20 > ** > *************************************************************************= ***/ > AMQ_Producer::AMQ_Producer (const std::string &dest, > const std::string &host, > const std::string &port) : > m_dest(dest), > m_broker_uri("tcp://" + host + ":" + port + > "?connection.alwaysSyncSend=3Dtrue") > { > // add the send timeout if its environemtn varialbe is given and its = a > // value other than the default, zero. > int sendTimeout(0); > GetEnv("SEND_TO", sendTimeout, 0); > if (sendTimeout) { > std::stringstream ss_sto; > ss_sto << "&connection.sendTimeout=3D" << sendTimeout; > m_broker_uri +=3D ss_sto.str(); > } > } > /************************************************************************= **** > ** > ** Name: AMQ_Producer::send > ** > ** Function: Cause the message to be sent by this producer to the > ** established message queue, host and port. > ** > ** This gets a cached (or if needed new) connection which ha= s a > ** session created for it, it then creates the destination, > ** producer and message. Finally it adds all of the properti= es > ** that we want to the message and sends it. > ** > ** This model is best explained at the Active MQ CPP example= page > ** at: http://activemq.apache.org/cms/cms-api-overview.html > ** > ** Input Parms: msg - The message to send with this producer > ** > ** type - (optional) The CMSType/JMSType to set the message = to.=20 > ** > ** Return Parm: None > ** > *************************************************************************= ***/ > void > AMQ_Producer::send (const std::string &msg, > const std::string &type) > { > std::string s_where; > try { > ConnSessionManager::ConnSession conSesn; > // Create the Connection + Session > s_where =3D "creating connection/session"; > ConnSessionManager::getConnSession(conSesn, m_broker_uri, > cms::Session::AUTO_ACKNOWLEDGE= ); > // Create the Destination (queue/topic) > s_where =3D "creating destination"; > boost::shared_ptr > p_destination(conSesn.p_session->createQueue(m_dest)); > // Create the Producer > s_where =3D "creating message producer"; > boost::shared_ptr > p_producer(conSesn.p_session->createProducer(p_destination.ge= t())); > p_producer->setDeliveryMode(m_delivery_mode); > // Create the Message > s_where =3D "creating connection"; > boost::shared_ptr > p_msg(conSesn.p_session->createTextMessage(msg)); > // Set the Message's JMSType/CMSType > if (!type.empty()) { > p_msg->setCMSType(type); > } > std::map::iterator it; > // run through all properties, adding them to the message > for (it =3D propertyInfo.begin(); it !=3D propertyInfo.end(); ++i= t) { > s_where =3D "adding property " + (*it).first; > p_msg->setStringProperty((*it).first, (*it).second); > } > // Send it! > s_where =3D "sending message"; > p_producer->send(p_msg.get()); > } > catch (std::exception &e) { > std::stringstream ss_err; > ss_err << "Error while " << s_where << " [" << e.what() << "]"; > throw ee_except(ss_err); > } > } > // ----------------------------------------------------------------------= ----- > const std::string base_msg("1024 characters folowed by a message.xxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx= xxxxxxxHello world! "); > void run_test () > { > std::string host("127.0.0.1"); > std::string port("61616"); > std::string destURI("c.c.p.v.ms.events"); > unsigned int iteration(0); > unsigned int ex_count(0); > // Loop, creating producers and sending the message > while (1) { > iteration++; > AMQ_Producer producer(destURI, host, port); > std::stringstream msg; > msg << base_msg << iteration; > printf("Sending message #%d\n", iteration); > try { > producer.send(msg.str(), "TEST_TYPE"); > } > catch (std::exception &e) { > ex_count++; > printf("\n\nException[%d]: %s\n", ex_count, e.what()); > if (ex_count >=3D 10 ) { > printf("\n\nException limit (%d) reached. Stopping test\n= \n\n", > ex_count); > break; > } > } > } > } > /////////////////////////////////////////////////////////////////////////= ////// > int main(int, char*) { > activemq::library::ActiveMQCPP::initializeLibrary(); > std::cout << "=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D\n"; > std::cout << "Starting the example:" << std::endl; > std::cout << "-----------------------------------------------------\n= "; > run_test(); =20 > std::cout << "-----------------------------------------------------\n= "; > std::cout << "Finished with the example." << std::endl; > std::cout << "=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D\n"; > activemq::library::ActiveMQCPP::shutdownLibrary(); > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrato= rs: https://issues.apache.org/jira/secure/ContactAdministrators!default.jsp= a For more information on JIRA, see: http://www.atlassian.com/software/jira