activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <Juergen.Schumac...@empolis.com>
Subject RE: High message frequency causes ActiveMQ to freeze
Date Thu, 17 Jan 2008 14:06:03 GMT
Hello,

ok, attaching the source code file does apparently not work here.
So here are the relevant code parts from the producer and processor classes,
which create the connections to ActiveMQ and the messages.

Sorry for the trouble...

Regards,
J├╝rgen Schumacher

----------------
Procucer.java

  private void initializeBrokerConnection() throws JMSException {
    LOG.info("Connecting to " + brokerUrl);
    ActiveMQConnectionFactory connectionFactory =
      new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
brokerUrl);
    connection = connectionFactory.createQueueConnection();
    connection.start();
    session = connection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
    destination = session.createQueue("queue1");
    producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  }

  private void startIteration() throws Exception {
    for (int i = 0; i < numberOfMessages; i++) {
      LOG.info("Creating message " + i);
      TextMessage message = session.createTextMessage("Message #" + i);
      LOG.info("Sending message " + i);
      producer.send(message);
      session.commit();
      producedCount++;
      LOG.info("Produced " + producedCount + " messages.");
    }
  }

---------------
Processor.java

  private void initializeBrokerConnection() throws JMSException {
    LOG.info("Connecting to " + brokerUrl);
    ActiveMQConnectionFactory connectionFactory =
      new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD,
brokerUrl);
    sourceConnection = connectionFactory.createQueueConnection();
    destinationConnection = connectionFactory.createQueueConnection();
    sourceConnection.start();
    destinationConnection.start();
    sourceSession = sourceConnection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
    destinationSession = destinationConnection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
    source = sourceSession.createQueue("queue1");
    consumer = sourceSession.createConsumer(source);
    destination = destinationSession.createQueue("queue2");
    producer = destinationSession.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  }

  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        String id = ((TextMessage) message).getText();
        LOG.info("Processing id = " + id);
        MapMessage map = destinationSession.createMapMessage();
        map.setString("id", id);
        map.setString("text", messageText);
        LOG.info("Send message");
        producer.send(map);
        LOG.info("Acknowledge message");
        message.acknowledge();
        LOG.info("Commit source");
        sourceSession.commit();
        LOG.info("Commit destination");
        destinationSession.commit();
        producedCount++;
        LOG.info("Produced " + producedCount + " messages.");
      } catch (Exception ex) {
        LOG.error("Error during processing", ex);
      }

    }
  }




Mime
View raw message