activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nielsbaloe <ni...@geoxplore.nl>
Subject 100% CPU
Date Wed, 03 Apr 2013 12:51:04 GMT
Hi all,

We are using activeMQ successfully for two projects now, but we accidentely
discovered that both the broker and the worker/consumer machines are hitting
100% CPU continuously. We do not have this issue on our developers machines
(all Windows 7 laptops). This occurs even when no events are being
processed.

I couldn't find any clues for this issue, except setting the prefetch size.
I've set the prefetch size to 10, as we have 10 consumers at the
worker/consumer machine. We have a broker machine and a worker/consumer
machine, which are both configured like below. In the near future we will
add more worker/consumer machines.

OS: Ubuntu Linux 12.04 (headless)
Memory: 12GB and 2GB
CPU: Intel Xeon 3.06GHz 4core
Java: "1.7.0_03", OpenJDK Runtime Environment (IcedTea7 2.1.1pre)
(7~u3-2.1.1~pre1-1ubuntu3)
Webcontainer: none, java-standalone
ActiveMQ: 5.6.0

The broker uses the internal KahaDB database.

We are using one queue, to which the worker/consumer machine is listening
and posting to, say about 100 messages a day. We also use about 4 scheduled
messages for every 'modem' (our internal subject) which results in about 40
scheduled messages or so which generates an event once every 30 minuts.
Nothing spectacular so to say.

Thanks for any clues in advance. For completeness, I will post our Broker,
Consumer and Producer code (without comments), this might show any wrong
assumptions on our side.

Best,
Niels Baloe


>-----------------------

public class Consumer implements ExceptionListener {

	private Session session;
	private MessageConsumer messageConsumer;
	private static Logger LOG = Logger.getLogger(Consumer.class.getName());

	public Consumer(String brokerServer, String queueName,
			MessageListener messageListener) throws JMSException,
			FileNotFoundException, IOException {
		this(Broker.getSession(brokerServer), queueName, messageListener);
	}

	public Consumer(Session session, String queueName,
			MessageListener messageListener) throws JMSException {
		this.session = session;

		Queue queue = session.createQueue(queueName);
		messageConsumer = session.createConsumer(queue);
		messageConsumer.setMessageListener(messageListener);
	}

	public void close() {
		try {
			messageConsumer.close();
		} catch (JMSException e) {
		}
		try {
			session.close();
		} catch (JMSException e) {
		}
	}

	@Override
	public void onException(JMSException je) {
		LOG.log(Level.SEVERE, je.getMessage(), je);
	}

}

public class Producer {

	private Session session;
	private MessageProducer producer;
	public Producer(String brokerUrl, String queue) throws JMSException,
			FileNotFoundException, IOException {
		this(Broker.getSession(brokerUrl), queue);
	}

	public Producer(Session session, String queue) throws JMSException {
		this.session = session;
		Destination destination = session.createQueue(queue);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	}

	public void close() {
		try {
			producer.close();
		} catch (JMSException e) {
		}
		try {
			session.close();
		} catch (JMSException je) {
		}
	}

	public Message getMessageText(String text) throws JMSException {
		return session.createTextMessage(text);
	}

	public Message getMessageObject() throws JMSException {
		return session.createObjectMessage();
	}

	public void send(Message message) throws JMSException {
		producer.send(message);
	}

	public void sendScheduled(Message message, String cron) throws JMSException
{
		message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cron);
		producer.send(message);
	}

}

public class Broker {

	private BrokerService broker;

	public Broker(String host, int port, String brokerName) throws Exception {
		broker = new BrokerService();
		broker.setUseJmx(true);
		broker.setBrokerName(brokerName);
		broker.addConnector("tcp://" + host + ":" + port);
		broker.setSchedulerSupport(true);
		broker.start();
	}

	public URI getNameTCP() {
		return broker.getVmConnectorURI();
	}

	public void close() {
		try {
			broker.stop();
			broker.waitUntilStopped();
		} catch (Exception e) {
		}
	}

	public static void closeConnection() {
		if (connection != null) {
			try {
				connection.close();
			} catch (JMSException e) {
			}
		}
	}

	private static Connection connection;

	private static Session getSessionWithoutRetry(String brokerServer)
			throws JMSException, FileNotFoundException, IOException {
		if (connection == null) { // does not work when broker is local
			ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(
					brokerServer);
			connectionFactory.setAlwaysSessionAsync(true);

			// Prefetch size
			String prefetch = NoImportUtils.getSettings().getProperty(
					"broker.prefetchSize");

			connectionFactory.getPrefetchPolicy().setAll(
					Integer.parseInt(prefetch));
			connection = connectionFactory.createConnection();
			connection.start();
		}
		return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	}

	public static Session getSession(String brokerServer) throws JMSException,
			FileNotFoundException, IOException {
		try {
			return getSessionWithoutRetry(brokerServer);
		} catch (ConnectionFailedException e) {
			// Retry once when connection failed
			closeConnection();
			return getSessionWithoutRetry(brokerServer);
		}
	}

}











--
View this message in context: http://activemq.2283324.n4.nabble.com/100-CPU-tp4665414.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message