activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2
Date Fri, 05 May 2006 06:57:10 GMT
Does your code work OK if you just use one broker? When things appear
to 'hang' how do things look in JMX? Is there any warnings/errors in
the log of the clients or brokers?

On 5/5/06, osian <osian@osian.me.uk> wrote:
>
> Hi all,
>
> I am currently looking at using ActiveMQ as our message broker but It seems
> to hang on a regular basis.  In my test environment I have 2 brokers
> clustered together, an oracle DB behind them for the journaling, and then 3
> consumers 2 doing specific queues, and another being able to process any
> queue. Also, on one of the machines, it scans a directory for files and then
> converts the found files into a JMS message to be processed.
> On the first run through, it processed a 1000 files and it seemed ok, I then
> ran multiple threads to process multiple queues on each consumer machine,
> and it seemed to hang intermittently, due to this I abandoned this idea and
> went back to the first scenario, so to test it fully, I put 10,000 files in
> the directory and left it running overnight, I came in to find that it had
> only picked up 3,000 files, processed 177 messages, and there are 2,958
> messages sitting in ACTIVEMQ_MSGS table, and the consumers are sitting there
> doing nothing.  If I stop and start the consumers, they process one message,
> and then hang again, but if I only run one consumer, it starts processing
> messages for a while, and then hangs again.
> I believe that this must be a setup problem and ActiveMQ has everything that
> I need so I would love to use it. If anyone has any ActiveMQ configuration
> suggestions or code samples for the consumers, producers, etc. I would be
> very greatful,
>
> Kind regards,
> Osian
>
> Here is my activemq.xml file:
> <?xml version="1.0" encoding="UTF-8"?>
>
> <beans xmlns:amq="http://activemq.org/config/1.0">
>
>     <amq:broker brokerName="ProactJMSBroker" useJmx="true"
> useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false">
>
>                 <amq:transportConnectors>
>                         <amq:transportConnector uri="tcp://localhost:61616"
> discoveryUri="multicast://ProactJMSService"/>
>                 </amq:transportConnectors>
>
>                 <amq:networkConnectors>
>                         <amq:networkConnector uri="multicast://ProactJMSService"/>
>                 </amq:networkConnectors>
>
>                 <amq:persistenceAdapter>
>                         <amq:jdbcPersistenceAdapter>
>                                 <property name="cleanupPeriod" value="600000"/>
>                                 <property name="dataSource" ref="oracle-ds"/>
>                         </amq:jdbcPersistenceAdapter>
>                 </amq:persistenceAdapter>
>     </amq:broker>
>
>     <!--
> ==================================================================== -->
>     <!-- JDBC DataSource Configurations -->
>     <!--
> ==================================================================== -->
>
>     <!-- The Datasource that will be used by the Broker -->
>         <bean id="oracle-ds" class="net.proact.scm.sql.ProactPoolingDataSource">
>                 <property name="url" value="jdbc:oracle:oci:@CNHDEV"/>
>                 <property name="userName" value="CNHDEV"/>
>                 <property name="password" value="CNHDEV"/>
>         </bean>
>
> </beans>
>
> Here is some sample code for the consumer:
>         public void runConsumer() {
>                 try {
>                         Connection connection = createConnection(getURL());
>                         connection.setExceptionListener(this);
>                         session = createSession(connection);
>                         MessageConsumer consumer = session.createConsumer(getDestination(session,
> getSubject()));
>
>                         consumeMessagesAndClose(connection, consumer, timeOut);
>                 }
>                 catch (Exception e) {
>                         System.out.println("Caught: " + e);
>                         e.printStackTrace();
>                         System.exit(-1);
>                 }
>         }
>
>     public static Connection createConnection(String url) throws
> JMSException, Exception {
>         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(getUser(), getPassword(), url);
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         return connection;
>     }
>
>     public static Session createSession(Connection connection) throws
> Exception {
>         Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
>         return session;
>     }
>
>     public Destination getDestination(Session session, String queueName)
> throws Exception {
>         if (destination == null) {
>                 destination = createQueue(session, queueName);
>         }
>         return destination;
>     }
>
>     private void consumeMessagesAndClose(Connection connection,
> MessageConsumer consumer, long timeout) throws JMSException {
>         System.out.println("Consumer (" + myConsumerName + ") will consume
> messages for queue '" + getSubject() + "' while they continue to be
> delivered within: " + timeout + " ms");
>
>         Message message;
>         while (true) {
>                 if ((message = consumer.receive(timeout)) != null) {
>                         onMessage(message);
>                         message.acknowledge();
>                         session.commit();
>                 }
>                 System.gc();
>         }
>     }
>
>         public void onMessage(Message arg0) {
>                 if (arg0 instanceof ActiveMQObjectMessage) {
>                         long start = System.currentTimeMillis();
>                         ActiveMQObjectMessage message = (ActiveMQObjectMessage) arg0;
>
>                         try {
>                                 if (message.getObject() instanceof JMSMessageInterface)
{
>                                         JMSMessageInterface myMessage = (JMSMessageInterface)
> message.getObject();
>                                         boolean success = myMessage.processMessage(getEditingContext());
>                                         if (success) {
>                                                 System.err.println("Success : " + ModelConstants.LINE_SEPARATOR);
>                                         }
>                                         else {
>                                                 System.err.println("Failed : " + ModelConstants.LINE_SEPARATOR);
>                                         }
>                                         System.err.println(myMessage.toStringDescription());
>                                         long complete = System.currentTimeMillis();
>                                 }
>                         } catch (JMSException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         } catch (UnknownHostException uhe) {
>                                 uhe.printStackTrace();
>                         } catch (Exception e) {
>                                 e.printStackTrace();
>                         }
>                 }
>         }
>
>
> And for the producer:
>     public void run() {
>         try {
>
>                 File baseDirectory = new File(Config.getEDIBaseDir(
> getEditingContext() ));
>
>                 if (!baseDirectory.exists()) {
>                         baseDirectory.mkdirs();
>                 }
>                 File inDirectory = new File(baseDirectory, "In");
>                 File pickedUpDirectory = new File(baseDirectory, "PickedUp");
>                 if (!inDirectory.exists()) {
>                         inDirectory.mkdirs();
>                 }
>                 if (!pickedUpDirectory.exists()) {
>                         pickedUpDirectory.mkdirs();
>                 }
>
>             Connection connection = createConnection(getURL());
>             Session session = createSession(connection);
>             MessageProducer producer = createProducer(timeToLive, session,
> getDestination(session, getSubject()));
>             //sendLoop(session, producer);
>
>             while (connection != null) {
>                 try {
>
>                         File[] filesFound = inDirectory.listFiles();
>                         Arrays.sort(filesFound, DATE_COMPARE);
>
>                     for (File foundFile : filesFound) {
>                         File pickedUpFile = new File(pickedUpDirectory,
> foundFile.getName());
>                         EDIFile ediFile =
> EDIManager.getEDIFileForImport(getEditingContext(), foundFile.getName());
>                         if (ediFile != null) {
>                                 sendMessage(session, producer, ediFile, new
> LineNumberReader(new FileReader(foundFile)), foundFile.getName());
>                         }
>                         foundFile.renameTo(pickedUpFile);
>                     }
>                 }
>                 catch (Exception e) {
>                     CoreLogger.println("Exception : "+e);
>                     e.printStackTrace();
>                 }
>                 Thread.sleep(500);
>             }
>
>             System.out.println("Done.");
>             close(connection, session);
>         }
>         catch (Exception e) {
>             System.out.println("Caught: " + e);
>             e.printStackTrace();
>         }
>     }x
> --
> View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4242459
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>


--

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message