activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From osian <>
Subject Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2
Date Fri, 05 May 2006 06:51:31 GMT

Hi all,

I am currently looking at using ActiveMQ as our message broker but It seems
to hang on a regular basis.  In my test environment I have 2 brokers
clustered together, an oracle DB behind them for the journaling, and then 3
consumers 2 doing specific queues, and another being able to process any
queue. Also, on one of the machines, it scans a directory for files and then
converts the found files into a JMS message to be processed.
On the first run through, it processed a 1000 files and it seemed ok, I then
ran multiple threads to process multiple queues on each consumer machine,
and it seemed to hang intermittently, due to this I abandoned this idea and
went back to the first scenario, so to test it fully, I put 10,000 files in
the directory and left it running overnight, I came in to find that it had
only picked up 3,000 files, processed 177 messages, and there are 2,958
messages sitting in ACTIVEMQ_MSGS table, and the consumers are sitting there
doing nothing.  If I stop and start the consumers, they process one message,
and then hang again, but if I only run one consumer, it starts processing
messages for a while, and then hangs again.
I believe that this must be a setup problem and ActiveMQ has everything that
I need so I would love to use it. If anyone has any ActiveMQ configuration
suggestions or code samples for the consumers, producers, etc. I would be
very greatful,

Kind regards,

Here is my activemq.xml file:
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:amq="">

    <amq:broker brokerName="ProactJMSBroker" useJmx="true"
useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false">

			<amq:transportConnector uri="tcp://localhost:61616"
			<amq:networkConnector uri="multicast://ProactJMSService"/>

				<property name="cleanupPeriod" value="600000"/>
				<property name="dataSource" ref="oracle-ds"/>

==================================================================== -->
    <!-- JDBC DataSource Configurations -->
==================================================================== -->

    <!-- The Datasource that will be used by the Broker -->
	<bean id="oracle-ds" class="net.proact.scm.sql.ProactPoolingDataSource">
		<property name="url" value="jdbc:oracle:oci:@CNHDEV"/>
		<property name="userName" value="CNHDEV"/>
		<property name="password" value="CNHDEV"/>


Here is some sample code for the consumer:
	public void runConsumer() {
		try {
			Connection connection = createConnection(getURL());
			session = createSession(connection);
			MessageConsumer consumer = session.createConsumer(getDestination(session,
			consumeMessagesAndClose(connection, consumer, timeOut);
		catch (Exception e) {
			System.out.println("Caught: " + e);

    public static Connection createConnection(String url) throws
JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(getUser(), getPassword(), url);
        Connection connection = connectionFactory.createConnection();
        return connection;

    public static Session createSession(Connection connection) throws
Exception {
        Session session = connection.createSession(true,
        return session;

    public Destination getDestination(Session session, String queueName)
throws Exception {
    	if (destination == null) {
    		destination = createQueue(session, queueName);
    	return destination;

    private void consumeMessagesAndClose(Connection connection,
MessageConsumer consumer, long timeout) throws JMSException {
        System.out.println("Consumer (" + myConsumerName + ") will consume
messages for queue '" + getSubject() + "' while they continue to be
delivered within: " + timeout + " ms");

        Message message;
        while (true) {
        	if ((message = consumer.receive(timeout)) != null) {

	public void onMessage(Message arg0) {
		if (arg0 instanceof ActiveMQObjectMessage) {
			long start = System.currentTimeMillis();
			ActiveMQObjectMessage message = (ActiveMQObjectMessage) arg0;
			try {
				if (message.getObject() instanceof JMSMessageInterface) {
					JMSMessageInterface myMessage = (JMSMessageInterface)
					boolean success = myMessage.processMessage(getEditingContext());
					if (success) {
						System.err.println("Success : " + ModelConstants.LINE_SEPARATOR);
					else {
						System.err.println("Failed : " + ModelConstants.LINE_SEPARATOR);
					long complete = System.currentTimeMillis();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
			} catch (UnknownHostException uhe) {
			} catch (Exception e) {

And for the producer:
    public void run() {
        try {
        	File baseDirectory = new File(Config.getEDIBaseDir(
getEditingContext() ));
        	if (!baseDirectory.exists()) {
        	File inDirectory = new File(baseDirectory, "In");
        	File pickedUpDirectory = new File(baseDirectory, "PickedUp");
        	if (!inDirectory.exists()) {
        	if (!pickedUpDirectory.exists()) {
            Connection connection = createConnection(getURL());
            Session session = createSession(connection);
            MessageProducer producer = createProducer(timeToLive, session,
getDestination(session, getSubject()));
            //sendLoop(session, producer);
            while (connection != null) {
                try {
                	File[] filesFound = inDirectory.listFiles();
                	Arrays.sort(filesFound, DATE_COMPARE);
                    for (File foundFile : filesFound) {
                    	File pickedUpFile = new File(pickedUpDirectory,
                    	EDIFile ediFile =
EDIManager.getEDIFileForImport(getEditingContext(), foundFile.getName());
                    	if (ediFile != null) {
                    		sendMessage(session, producer, ediFile, new
LineNumberReader(new FileReader(foundFile)), foundFile.getName());
                catch (Exception e) {
                    CoreLogger.println("Exception : "+e);

            close(connection, session);
        catch (Exception e) {
            System.out.println("Caught: " + e);
View this message in context:
Sent from the ActiveMQ - User forum at

View raw message