activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jandeclercq <jan.decle...@alsic.be>
Subject activemq-cpp-library-3.2.2 - reconnect fails
Date Wed, 17 Nov 2010 11:56:07 GMT

Hey,

When I disconnected and closed down a producer. And afterwards I'm trying to
reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.

See the code below (choose option R)



#include <iostream>
#include <stdio.h>
#include <string.h>

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>

using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

using namespace std;

class SimpleProducer: public ExceptionListener, public
DefaultTransportListener {
private:

	Connection* connection;
	Session* session;
	Destination* destination;
	MessageProducer* producer;
	bool useTopic;
	bool clientAck;
	std::string brokerURI;
	std::string destURI;
	bool isClosed;

public:
	SimpleProducer(const std::string& brokerURI, const std::string& destURI,
bool useTopic, bool clientAck) {
		activemq::library::ActiveMQCPP::initializeLibrary();
		this->connection = NULL;
		this->session = NULL;
		this->destination = NULL;
		this->producer = NULL;
		this->useTopic = useTopic;
		this->brokerURI = brokerURI;
		this->destURI = destURI;
		this->clientAck = clientAck;
		this->isClosed = true;
	}

	virtual ~SimpleProducer() {
		close();
	}

	void close() {
		if (!isClosed) {
			isClosed = true;
			this->cleanup();
		}
	}

	virtual void connect() {
		try {
			try {
				this->isClosed = false;
				// Create a ConnectionFactory
				ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory(this->brokerURI);
				// Create a Connection
				this->connection = connectionFactory->createConnection();
				delete connectionFactory;
				ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>
(connection);
				if (amqConnection != NULL) {
					amqConnection->addTransportListener(this);
				}
				//This will crash the 2nd time!:
				this->connection->start();
				this->connection->setExceptionListener(this);

			} catch (Exception& e) {
				throw e;
			}

			// Create a Session
			if (this->clientAck) {
				this->session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
			} else {
				this->session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
			}

			// Create the destination (Topic or Queue)
			if (this->useTopic) {
				this->destination = this->session->createTopic(this->destURI);
			} else {
				this->destination = this->session->createQueue(this->destURI);
			}

			// Create a MessageProducer from the Session to the Topic or Queue
			this->producer = this->session->createProducer(this->destination);

			this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

			fprintf(stdout, "Producer connected to %s", this->brokerURI.c_str());
			fflush(stdout);

		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}

	virtual void send(const char *msg) {
		try {

			string text(msg);
			TextMessage* message = this->session->createTextMessage(text);

			char logMessage[256];

			this->producer->send(message);

			//logging:
			sprintf(logMessage, "Message '%s' send to serviceBus", msg);

			//end logging
			delete message;
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
	}
	//Exception Listener
	virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
		fprintf(stdout, "!! CMS lib Exception occured !!");
		fflush(stdout);
	}
	//Transport Listener
	virtual void transportInterrupted() {
		fprintf(stdout, "The Connection's Transport has been Interrupted.");
		fflush(stdout);
	}
	virtual void transportResumed() {
		fprintf(stdout, "The Connection's Transport has been Restored.");
		fflush(stdout);
	}
private:

	void cleanup() {
		//Producer
		try {
			if (producer != NULL) {
				producer->close();
				delete producer;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);

		}
		producer = NULL;

		// Destination
		try {
			if (destination != NULL) {
				delete destination;
			}
		} catch (CMSException& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		destination = NULL;

		//SESSION
		try {
			if (session != NULL) {
				session->close();
				delete session;
			}
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		session = NULL;

		//CONNECTION
		try {
			if (connection != NULL) {
				connection->close();
				delete connection;
			}
			connection = NULL;
		} catch (Exception& e) {
			fprintf(stdout, e.getMessage().c_str());
			fflush(stdout);
		}
		connection = NULL;
		activemq::library::ActiveMQCPP::shutdownLibrary();
	}

};//class SimpleProducer


SimpleProducer *amq_producer;
void connect(void) {
	std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
	std::string _destURI = "TEST.FOO";
	amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
	amq_producer->connect();
}

void disconnect(void) {
	if(amq_producer != NULL){
		amq_producer->close();
	}
	delete amq_producer;
}
void send(void) {
	amq_producer->send("TEST MESSAGE");
}

int main(int argc, const char* argv[]) {
	string str;
	connect();
	while (str.compare("Q") != 0) {
		printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = quit\n");
		getline(cin, str);

		if (str.compare("P") == 0) {
			send();
		} else if (str.compare("R") == 0) {
			disconnect();
			connect();
		}
	}
	disconnect();
	return 0;
}



-- 
View this message in context: http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message