activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From osian <os...@osian.me.uk>
Subject Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2
Date Fri, 05 May 2006 06:51:31 GMT

Hi all,

I am currently looking at using ActiveMQ as our message broker but It seems
to hang on a regular basis.  In my test environment I have 2 brokers
clustered together, an oracle DB behind them for the journaling, and then 3
consumers 2 doing specific queues, and another being able to process any
queue. Also, on one of the machines, it scans a directory for files and then
converts the found files into a JMS message to be processed.
On the first run through, it processed a 1000 files and it seemed ok, I then
ran multiple threads to process multiple queues on each consumer machine,
and it seemed to hang intermittently, due to this I abandoned this idea and
went back to the first scenario, so to test it fully, I put 10,000 files in
the directory and left it running overnight, I came in to find that it had
only picked up 3,000 files, processed 177 messages, and there are 2,958
messages sitting in ACTIVEMQ_MSGS table, and the consumers are sitting there
doing nothing.  If I stop and start the consumers, they process one message,
and then hang again, but if I only run one consumer, it starts processing
messages for a while, and then hangs again.
I believe that this must be a setup problem and ActiveMQ has everything that
I need so I would love to use it. If anyone has any ActiveMQ configuration
suggestions or code samples for the consumers, producers, etc. I would be
very greatful,

Kind regards,
Osian

Here is my activemq.xml file:
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:amq="http://activemq.org/config/1.0">

    <amq:broker brokerName="ProactJMSBroker" useJmx="true"
useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false">

		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616"
discoveryUri="multicast://ProactJMSService"/>
		</amq:transportConnectors>
 
		<amq:networkConnectors>
			<amq:networkConnector uri="multicast://ProactJMSService"/>
		</amq:networkConnectors>

		<amq:persistenceAdapter>
			<amq:jdbcPersistenceAdapter>
				<property name="cleanupPeriod" value="600000"/>
				<property name="dataSource" ref="oracle-ds"/>
			</amq:jdbcPersistenceAdapter>
		</amq:persistenceAdapter>
    </amq:broker>

    <!--
==================================================================== -->
    <!-- JDBC DataSource Configurations -->
    <!--
==================================================================== -->

    <!-- The Datasource that will be used by the Broker -->
	<bean id="oracle-ds" class="net.proact.scm.sql.ProactPoolingDataSource">
		<property name="url" value="jdbc:oracle:oci:@CNHDEV"/>
		<property name="userName" value="CNHDEV"/>
		<property name="password" value="CNHDEV"/>
	</bean>

</beans>

Here is some sample code for the consumer:
	public void runConsumer() {
		try {
			Connection connection = createConnection(getURL());
			connection.setExceptionListener(this);
			session = createSession(connection);
			MessageConsumer consumer = session.createConsumer(getDestination(session,
getSubject()));
			
			consumeMessagesAndClose(connection, consumer, timeOut);
		}
		catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
			System.exit(-1);
		}
	}

    public static Connection createConnection(String url) throws
JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(getUser(), getPassword(), url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        return connection;
    }

    public static Session createSession(Connection connection) throws
Exception {
        Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
        return session;
    }

    public Destination getDestination(Session session, String queueName)
throws Exception {
    	if (destination == null) {
    		destination = createQueue(session, queueName);
    	}
    	return destination;
    }

    private void consumeMessagesAndClose(Connection connection,
MessageConsumer consumer, long timeout) throws JMSException {
        System.out.println("Consumer (" + myConsumerName + ") will consume
messages for queue '" + getSubject() + "' while they continue to be
delivered within: " + timeout + " ms");

        Message message;
        while (true) {
        	if ((message = consumer.receive(timeout)) != null) {
        		onMessage(message);
        		message.acknowledge();
        		session.commit();
        	}
        	System.gc();
        }
    }

	public void onMessage(Message arg0) {
		if (arg0 instanceof ActiveMQObjectMessage) {
			long start = System.currentTimeMillis();
			ActiveMQObjectMessage message = (ActiveMQObjectMessage) arg0;
			
			try {
				if (message.getObject() instanceof JMSMessageInterface) {
					JMSMessageInterface myMessage = (JMSMessageInterface)
message.getObject();
					boolean success = myMessage.processMessage(getEditingContext());
					if (success) {
						System.err.println("Success : " + ModelConstants.LINE_SEPARATOR);
					}
					else {
						System.err.println("Failed : " + ModelConstants.LINE_SEPARATOR);
					}
					System.err.println(myMessage.toStringDescription());
					long complete = System.currentTimeMillis();
				}
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (UnknownHostException uhe) {
				uhe.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}


And for the producer:
    public void run() {
        try {
            
        	File baseDirectory = new File(Config.getEDIBaseDir(
getEditingContext() ));
        	
        	if (!baseDirectory.exists()) {
        		baseDirectory.mkdirs();
        	}
        	File inDirectory = new File(baseDirectory, "In");
        	File pickedUpDirectory = new File(baseDirectory, "PickedUp");
        	if (!inDirectory.exists()) {
        		inDirectory.mkdirs();
        	}
        	if (!pickedUpDirectory.exists()) {
        		pickedUpDirectory.mkdirs();
        	}
        	
            Connection connection = createConnection(getURL());
            Session session = createSession(connection);
            MessageProducer producer = createProducer(timeToLive, session,
getDestination(session, getSubject()));
            //sendLoop(session, producer);
            
            while (connection != null) {
                try {
                	
                	File[] filesFound = inDirectory.listFiles();
                	Arrays.sort(filesFound, DATE_COMPARE);
                	
                    for (File foundFile : filesFound) {
                    	File pickedUpFile = new File(pickedUpDirectory,
foundFile.getName());
                    	EDIFile ediFile =
EDIManager.getEDIFileForImport(getEditingContext(), foundFile.getName());
                    	if (ediFile != null) {
                    		sendMessage(session, producer, ediFile, new
LineNumberReader(new FileReader(foundFile)), foundFile.getName());
                    	}
                        foundFile.renameTo(pickedUpFile);
                    }        
                }
                catch (Exception e) {
                    CoreLogger.println("Exception : "+e);
                    e.printStackTrace();
                }
                Thread.sleep(500);
            }

            System.out.println("Done.");
            close(connection, session);
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }x
--
View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4242459
Sent from the ActiveMQ - User forum at Nabble.com.


Mime
View raw message