activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: activemq-cpp-library-3.2.2 - reconnect fails
Date Wed, 17 Nov 2010 15:30:57 GMT
On Wed, 2010-11-17 at 03:56 -0800, jandeclercq wrote:
> Hey,
> 
> When I disconnected and closed down a producer. And afterwards I'm trying to
> reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.
> 
> See the code below (choose option R)
> 
> 

The problem most likely results from the fact that you are calling the
library init and shutdown methods more than once per application run by
placing them in the constructor and cleanup methods of the class.  Try
placing them in the main method at start and end.


Regards

> 
> #include <iostream>
> #include <stdio.h>
> #include <string.h>
> 
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> 
> using namespace activemq;
> using namespace activemq::transport;
> using namespace activemq::core;
> using namespace decaf;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> 
> using namespace std;
> 
> class SimpleProducer: public ExceptionListener, public
> DefaultTransportListener {
> private:
> 
> 	Connection* connection;
> 	Session* session;
> 	Destination* destination;
> 	MessageProducer* producer;
> 	bool useTopic;
> 	bool clientAck;
> 	std::string brokerURI;
> 	std::string destURI;
> 	bool isClosed;
> 
> public:
> 	SimpleProducer(const std::string& brokerURI, const std::string& destURI,
> bool useTopic, bool clientAck) {
> 		activemq::library::ActiveMQCPP::initializeLibrary();
> 		this->connection = NULL;
> 		this->session = NULL;
> 		this->destination = NULL;
> 		this->producer = NULL;
> 		this->useTopic = useTopic;
> 		this->brokerURI = brokerURI;
> 		this->destURI = destURI;
> 		this->clientAck = clientAck;
> 		this->isClosed = true;
> 	}
> 
> 	virtual ~SimpleProducer() {
> 		close();
> 	}
> 
> 	void close() {
> 		if (!isClosed) {
> 			isClosed = true;
> 			this->cleanup();
> 		}
> 	}
> 
> 	virtual void connect() {
> 		try {
> 			try {
> 				this->isClosed = false;
> 				// Create a ConnectionFactory
> 				ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory(this->brokerURI);
> 				// Create a Connection
> 				this->connection = connectionFactory->createConnection();
> 				delete connectionFactory;
> 				ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
> (connection);
> 				if (amqConnection != NULL) {
> 					amqConnection->addTransportListener(this);
> 				}
> 				//This will crash the 2nd time!:
> 				this->connection->start();
> 				this->connection->setExceptionListener(this);
> 
> 			} catch (Exception& e) {
> 				throw e;
> 			}
> 
> 			// Create a Session
> 			if (this->clientAck) {
> 				this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
> 			} else {
> 				this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
> 			}
> 
> 			// Create the destination (Topic or Queue)
> 			if (this->useTopic) {
> 				this->destination = this->session->createTopic(this->destURI);
> 			} else {
> 				this->destination = this->session->createQueue(this->destURI);
> 			}
> 
> 			// Create a MessageProducer from the Session to the Topic or Queue
> 			this->producer = this->session->createProducer(this->destination);
> 
> 			this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
> 
> 			fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
> 			fflush(stdout);
> 
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 	}
> 
> 	virtual void send(const char *msg) {
> 		try {
> 
> 			string text(msg);
> 			TextMessage* message = this->session->createTextMessage(text);
> 
> 			char logMessage[256];
> 
> 			this->producer->send(message);
> 
> 			//logging:
> 			sprintf(logMessage, "Message '%s' send to serviceBus", msg);
> 
> 			//end logging
> 			delete message;
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 	}
> 	//Exception Listener
> 	virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
> 		fprintf(stdout, "!! CMS lib Exception occured !!");
> 		fflush(stdout);
> 	}
> 	//Transport Listener
> 	virtual void transportInterrupted() {
> 		fprintf(stdout, "The Connection's Transport has been Interrupted.");
> 		fflush(stdout);
> 	}
> 	virtual void transportResumed() {
> 		fprintf(stdout, "The Connection's Transport has been Restored.");
> 		fflush(stdout);
> 	}
> private:
> 
> 	void cleanup() {
> 		//Producer
> 		try {
> 			if (producer != NULL) {
> 				producer->close();
> 				delete producer;
> 			}
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 
> 		}
> 		producer = NULL;
> 
> 		// Destination
> 		try {
> 			if (destination != NULL) {
> 				delete destination;
> 			}
> 		} catch (CMSException& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		destination = NULL;
> 
> 		//SESSION
> 		try {
> 			if (session != NULL) {
> 				session->close();
> 				delete session;
> 			}
> 		} catch (Exception& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		session = NULL;
> 
> 		//CONNECTION
> 		try {
> 			if (connection != NULL) {
> 				connection->close();
> 				delete connection;
> 			}
> 			connection = NULL;
> 		} catch (Exception& e) {
> 			fprintf(stdout, e.getMessage().c_str());
> 			fflush(stdout);
> 		}
> 		connection = NULL;
> 		activemq::library::ActiveMQCPP::shutdownLibrary();
> 	}
> 
> };//class SimpleProducer
> 
> 
> SimpleProducer *amq_producer;
> void connect(void) {
> 	std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
> 	std::string _destURI = "TEST.FOO";
> 	amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
> 	amq_producer->connect();
> }
> 
> void disconnect(void) {
> 	if(amq_producer != NULL){
> 		amq_producer->close();
> 	}
> 	delete amq_producer;
> }
> void send(void) {
> 	amq_producer->send("TEST MESSAGE");
> }
> 
> int main(int argc, const char* argv[]) {
> 	string str;
> 	connect();
> 	while (str.compare("Q") != 0) {
> 		printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
> 		getline(cin, str);
> 
> 		if (str.compare("P") == 0) {
> 			send();
> 		} else if (str.compare("R") == 0) {
> 			disconnect();
> 			connect();
> 		}
> 	}
> 	disconnect();
> 	return 0;
> }
> 
> 
> 

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/



Mime
View raw message