Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 26596 invoked from network); 9 Sep 2009 10:07:35 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 9 Sep 2009 10:07:35 -0000 Received: (qmail 52339 invoked by uid 500); 9 Sep 2009 10:07:34 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 52269 invoked by uid 500); 9 Sep 2009 10:07:34 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 52259 invoked by uid 99); 9 Sep 2009 10:07:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2009 10:07:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2009 10:07:32 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 5A790234C1EC for ; Wed, 9 Sep 2009 03:07:12 -0700 (PDT) Message-ID: <1513612454.1252490832369.JavaMail.jira@brutus> Date: Wed, 9 Sep 2009 03:07:12 -0700 (PDT) From: "Rob Davies (JIRA)" To: dev@activemq.apache.org Subject: [jira] Resolved: (AMQ-852) Incorrect handling of disconecting client in ClientAckMode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: ae95407df07c98740808b2ef9da0087c X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/activemq/browse/AMQ-852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Davies resolved AMQ-852. ---------------------------- Resolution: Fixed Fix Version/s: (was: 5.4.0) 5.3.0 > Incorrect handling of disconecting client in ClientAckMode > ---------------------------------------------------------- > > Key: AMQ-852 > URL: https://issues.apache.org/activemq/browse/AMQ-852 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 4.0.1 > Environment: Linux IA32/RHEL AS 4.0 ( Broker version 4.0.1 ) and client program uses OpenWire protocol (C++) CMS devel version. > Reporter: Radek Sedmak > Fix For: 5.3.0 > > Original Estimate: 11 weeks, 1 day, 2 hours > Remaining Estimate: 11 weeks, 1 day, 2 hours > > When you run "my test program" and end this process with Ctrl+C/ kill/ kill -9 ( i mostly used Ctrl+C) sometimes during let say 100 iteration "communication between client and broker is damaged". This results in situation when after restart ( new pid ) client program is unable to create producer. call to consumer = rec_session->createConsumer( rec_queue ); > hangs client program. Sometimes this results with exception as you can see in output of my program. I think that this only occurs when the session is in ClienAckMode. From my point of view is there incorrect handling of situation when client receives message from broker and than terminates connection without ack/nack this receive in certatin circumstances ... > output: > ./amqtest tcp://lxstaflik:61616 PH_Q_IN_10 PH_Q_OUT_10 > Setting connection URL to 'tcp://lxstaflik:61616' > Setting receive queue name to 'PH_Q_IN_10' > Setting send queue name to 'PH_Q_OUT_10' > Init ... > Creating connection factory ... > Connection factory created, creating connection ... > Connection created, creating receive session... > receive session created, creating sending session ... > send session created, creating receive queue...'PH_Q_IN_10' > receive queue created, creating consumer for this queue ... > setting listener ... > Error on connection Unmarshal failed; unknown data structure type 49, at src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711Exiting read loop due to exception: Unmarshal failed; unknown data structure type 49, at src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711 > Here is source code of my example: > #include > #include > #include > #include > #include > #include > #include "cms/IConnection.hpp" > #include "cms/IConnectionFactory.hpp" > #include "activemq/ConnectionFactory.hpp" > #include "activemq/Connection.hpp" > #include "activemq/Session.hpp" > #include "ppr/TraceException.hpp" > #include "ppr/net/Uri.hpp" > #include "ppr/util/ifr/p" > using namespace apache::activemq; > using namespace apache::cms; > using namespace apache::ppr; > using namespace apache::ppr::net; > using namespace ifr; > using namespace std; > class ActiveMQTest : public IExceptionListener, public IMessageListener { > private: > p uri; > p factory; > p connection; > p rec_session; > p snd_session; > p rec_queue; > p snd_queue; > p producer; > p consumer; > p txtmsg; > char szRecQueue[128]; > char szSndQueue[128]; > public: > ActiveMQTest(); > virtual ~ActiveMQTest(); > virtual void setUri(const char *); > virtual void init(); > virtual void done(); > virtual void onException(exception& error); > virtual void onMessage(p message); > virtual void ActiveMQTest::setSndQueue(const char *szQueue); > virtual void ActiveMQTest::setRecQueue(const char *szQueue); > }; > void ActiveMQTest::onMessage( p message) { > p snd_message; > this->txtmsg = p_dyncast(message); > p string_request = txtmsg->getText(); > if (string_request != NULL ) { > printf("Received message : %s",string_request->c_str()); > } > sleep(10); > message->acknowledge(); > snd_message = snd_session->createTextMessage() ; > snd_message->setText("TEST\n") ; > snd_message->setJMSPersistent(1); > // Send message > producer->send(message) ; > } > void ActiveMQTest::done() { > rec_session->close(); > snd_session->close(); > } > void ActiveMQTest::setUri(const char * uri) { > this->uri = new Uri(uri) ; > } > void ActiveMQTest::setSndQueue(const char *szQueue) { > strcpy(szSndQueue,szQueue); > } > void ActiveMQTest::setRecQueue(const char *szQueue) { > strcpy(szRecQueue,szQueue); > } > ActiveMQTest::ActiveMQTest() { > this->connection = NULL; > this->rec_session = NULL; > this->snd_session = NULL; > memset(szRecQueue,0x0,sizeof(szRecQueue)); > memset(szSndQueue,0x0,sizeof(szSndQueue)); > } > ActiveMQTest::~ActiveMQTest() { > } > void ActiveMQTest::init() { > try { > cout.rdbuf(cerr.rdbuf()); > printf("1 Creating connection factory ... \n"); > factory = new ConnectionFactory( uri ); > printf("Connection factory created, creating connection ...\n"); > connection = factory->createConnection(); > printf("Connection created, creating receive session...\n"); > p_cast(connection)->setExceptionListener( smartify( this ) ); > //rec_session = connection->createSession(AutoAckMode); > rec_session = connection->createSession(ClientAckMode); > printf("receive session created, creating sending session ...\n"); > snd_session = connection->createSession(AutoAckMode); > printf("send session created, creating receive queue...'%s'\n",szRecQueue); > rec_queue = rec_session->getQueue( szRecQueue ); > printf("receive queue created, creating consumer for this queue ...\n"); > consumer = rec_session->createConsumer( rec_queue ); > printf("setting listener ...\n"); > snd_queue = snd_session->getQueue( szSndQueue ); > consumer->setMessageListener( smartify( this ) ); > producer = snd_session->createProducer( snd_queue ); > (p_dyncast(rec_session))->dispatch(0); > printf("Init Ok.\n"); > } catch ( TraceException& e ) { > printf("Error during init ... \n"); > } > } > void ActiveMQTest::onException( exception& error ) { > printf("Error on connection %s", error.what() ); > } > int main(int argc,char *argv[]) { > ActiveMQTest* myAq; > if ( argc < 3 ) { > printf("Usage: amqtest \n"); > exit(0); > } > myAq = new ActiveMQTest; > printf("Setting connection URL to '%s'\n",argv[1]); > myAq->setUri(argv[1]); > printf("Setting receive queue name to '%s'\n",argv[2]); > myAq->setRecQueue( argv[2] ); > printf("Setting send queue name to '%s'\n",argv[3]); > myAq->setSndQueue( argv[3] ); > printf("Init ...\n"); > myAq->init(); > int i=0; > while( i < 3000 ) { > printf("Idle ... \n"); > sleep( 1 ); > i++; > } > myAq->done(); > return 0; > } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.