activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Radek Sedmak (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-852) Incorrect handling of disconecting client in ClientAckMode
Date Mon, 31 Jul 2006 14:18:23 GMT
Incorrect handling of disconecting client in ClientAckMode 
-----------------------------------------------------------

                 Key: AMQ-852
                 URL: https://issues.apache.org/activemq/browse/AMQ-852
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 4.0.1
         Environment: Linux IA32/RHEL AS 4.0 ( Broker version 4.0.1 ) and client program uses
OpenWire protocol (C++) CMS devel version. 
            Reporter: Radek Sedmak


When you run "my test program" and end this process  with Ctrl+C/ kill/ kill -9 ( i mostly
used Ctrl+C) sometimes during let say 100 iteration "communication between client and broker
is damaged". This results in situation when after restart ( new pid ) client program is unable
to create producer.  call to   consumer = rec_session->createConsumer( rec_queue );
 hangs client program. Sometimes this results with exception as you can see in output of my
program. I think that this only occurs when the session is in ClienAckMode. From my point
of view is there incorrect handling of situation when client receives message from broker
and than terminates connection without ack/nack this receive in certatin circumstances ...

output:
 ./amqtest tcp://lxstaflik:61616 PH_Q_IN_10 PH_Q_OUT_10
Setting connection URL to 'tcp://lxstaflik:61616'
Setting receive queue name to 'PH_Q_IN_10'
Setting send queue name to 'PH_Q_OUT_10'
Init ...
Creating connection factory ...
Connection factory created, creating connection ...
Connection created, creating receive session...
receive session created, creating sending session ...
send session created, creating receive queue...'PH_Q_IN_10'
receive queue created, creating consumer for this queue ...
setting listener ...
Error on connection Unmarshal failed; unknown data structure type 49, at src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp
line 711Exiting read loop due to exception: Unmarshal failed; unknown data structure type
49, at src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711


Here is source code of my example:
#include <stdio.h>
#include <unistd.h>

#include <exception>
#include <iostream>
#include <map>
#include <string>

#include "cms/IConnection.hpp"
#include "cms/IConnectionFactory.hpp"
#include "activemq/ConnectionFactory.hpp"
#include "activemq/Connection.hpp"
#include "activemq/Session.hpp"
#include "ppr/TraceException.hpp"
#include "ppr/net/Uri.hpp"
#include "ppr/util/ifr/p"

using namespace apache::activemq;
using namespace apache::cms;
using namespace apache::ppr;
using namespace apache::ppr::net;
using namespace ifr;
using namespace std;


class ActiveMQTest : public IExceptionListener, public IMessageListener {
  private:
        p<Uri>                  uri;
        p<IConnectionFactory>   factory;
        p<IConnection>          connection;
        p<ISession>             rec_session;
        p<ISession>             snd_session;
        p<IQueue>               rec_queue;
        p<IQueue>               snd_queue;
        p<IMessageProducer>     producer;
        p<IMessageConsumer>     consumer;
        p<ITextMessage>         txtmsg;
        char szRecQueue[128];
        char szSndQueue[128];
  public:
        ActiveMQTest();
        virtual ~ActiveMQTest();
        virtual void setUri(const char *);
        virtual void init();
        virtual void done();
        virtual void onException(exception& error);
        virtual void onMessage(p<IMessage> message);
        virtual void ActiveMQTest::setSndQueue(const char *szQueue);
        virtual void ActiveMQTest::setRecQueue(const char *szQueue);
};

void ActiveMQTest::onMessage( p<IMessage> message) {
   p<ITextMessage>      snd_message;

   this->txtmsg = p_dyncast<ITextMessage>(message);

   p<string> string_request = txtmsg->getText();

   if (string_request != NULL ) {
     printf("Received message : %s",string_request->c_str());
   }

   sleep(10);

   message->acknowledge();

   snd_message = snd_session->createTextMessage() ;
   snd_message->setText("TEST\n") ;
   snd_message->setJMSPersistent(1);
   // Send message
   producer->send(message) ;
}

void ActiveMQTest::done() {
   rec_session->close();
   snd_session->close();
}

void ActiveMQTest::setUri(const char * uri) {
    this->uri = new Uri(uri) ;
}

void ActiveMQTest::setSndQueue(const char *szQueue) {
  strcpy(szSndQueue,szQueue);
}

void ActiveMQTest::setRecQueue(const char *szQueue) {
  strcpy(szRecQueue,szQueue);
}

ActiveMQTest::ActiveMQTest() {
  this->connection = NULL;
  this->rec_session = NULL;
  this->snd_session = NULL;
  memset(szRecQueue,0x0,sizeof(szRecQueue));
  memset(szSndQueue,0x0,sizeof(szSndQueue));
}

ActiveMQTest::~ActiveMQTest() {
}

void ActiveMQTest::init() {
  try {
    cout.rdbuf(cerr.rdbuf());
    printf("1 Creating connection factory ... \n");
    factory = new ConnectionFactory( uri );

    printf("Connection factory created, creating connection ...\n");
    connection = factory->createConnection();

    printf("Connection created, creating receive session...\n");
    p_cast<Connection>(connection)->setExceptionListener( smartify( this ) );
    //rec_session = connection->createSession(AutoAckMode);
    rec_session = connection->createSession(ClientAckMode);

    printf("receive session created, creating sending session ...\n");
    snd_session = connection->createSession(AutoAckMode);

    printf("send session created, creating receive queue...'%s'\n",szRecQueue);
    rec_queue = rec_session->getQueue( szRecQueue );


    printf("receive queue created, creating consumer for this queue ...\n");
    consumer = rec_session->createConsumer( rec_queue );


    printf("setting listener ...\n");
    snd_queue = snd_session->getQueue( szSndQueue );
    consumer->setMessageListener( smartify( this ) );
    producer = snd_session->createProducer( snd_queue );

    (p_dyncast<apache::activemq::Session>(rec_session))->dispatch(0);

    printf("Init Ok.\n");

  } catch ( TraceException& e ) {
    printf("Error during init  ... \n");
  }
}

void ActiveMQTest::onException( exception& error ) {
  printf("Error on connection %s", error.what() );
}


int main(int argc,char *argv[]) {
  ActiveMQTest* myAq;


  if ( argc < 3 ) {
    printf("Usage: amqtest <connection_url> <receive_queue> <send_queue>\n");
    exit(0);
  }

  myAq = new ActiveMQTest;

  printf("Setting connection URL to '%s'\n",argv[1]);
  myAq->setUri(argv[1]);

  printf("Setting receive queue name to '%s'\n",argv[2]);
  myAq->setRecQueue( argv[2] );

  printf("Setting send queue name to '%s'\n",argv[3]);
  myAq->setSndQueue( argv[3] );

  printf("Init ...\n");
  myAq->init();


int i=0;
  while( i < 3000 ) {
    printf("Idle ... \n");
    sleep( 1 );
    i++;
  }

  myAq->done();

  return 0;
}




-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message