activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Brad Willard (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2745) Deadlock or Performance Bottleneck when reading messages with Correlation
Date Wed, 14 Jul 2010 19:41:52 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=60662#action_60662
] 

Brad Willard commented on AMQ-2745:
-----------------------------------

I have created two classes to show this problem:  test.PutMessages and test.ReadMessages that
show the problem.  Steps to reproduce

1) Start a 5.3.0 broker

2) Start two messages reader for two different correlations on the same queue
java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader1
java -cp <yourclasspath> test.ReadMessages tcp://localhost:61616 TestQueue ForReader2

3) Start two messages producers for the two different correlations
java -cp <yourclasspath> tesPutMessages tcp://localhost:61616 TestQueue ForReader1
java -cp <yourclasspath> test.PutMessages tcp://localhost:61616 TestQueue ForReader2

4) Looking at the output of the readers you started on step two, you will both read the messages
for the correlation with the time on the broker about 1ms.

5) Stop the reader ForReader1, you will notice that the program ForReader2 is uneffected.
 Messages with corrlations "ForReader1" backup on the queue, and the program ForReader2 continues
reading normally.

6) stop all classes, and stop 5.3.0 broker.  Start a 5.3.2  broker.

7) Repeat steps 1-5.  Except you'll notice that once you stop ForReader1, ForReader2 is effected
which is shouldn't be.  ForReader2 will basically stop being able to read messages until you
start ForReader1 again.  ForReader2 will occasionally get messages, but incredibly slowly
and performance is ruined.

package test;

import java.net.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 *
 * @author bwillard
 */
public class PutMessages extends Thread {

    final private MessageProducer producer;
    final private String correlationID;
    final private Session session;

    public PutMessages(URI uri, String queueName, String correlationID) throws Exception {

        this.correlationID = correlationID;

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);

        producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    public void run() {

        ObjectMessage message;
        String text;
        long counter = 0;

        while (true) {

            try {

                this.sleep(5);

                counter++;

                message = session.createObjectMessage();
                message.setJMSCorrelationID(correlationID);
                text = "Message " + counter + " for consumer " + correlationID;
                message.setObject(text);

                producer.send(message);

            } catch (Exception exc) {
                System.err.println("Error sending message");
                exc.printStackTrace(System.err);
            }

        }
    }

    public static void main(String[] args) {

        try {

            URI uri = URI.create(args[0]);
            String queueName = args[1];
            String correlationID = args[2];

            new PutMessages(uri, queueName, correlationID).start();

        } catch (Exception exc) {
            exc.printStackTrace();
        }

    }
}







package test;

import java.net.*;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 *
 * @author bwillard
 */
public class ReadMessages implements MessageListener {

    final private MessageConsumer consumer;
    final private String correlationID;
    final private Session session;

    public ReadMessages(URI uri, String queueName, String correlationID) throws Exception
{

        this.correlationID = correlationID;

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);

        consumer = session.createConsumer(queue, "JMSCorrelationID='" + correlationID + "'");
        consumer.setMessageListener(this);
    }

    public void onMessage(Message msg) {
        long inTime, outTime, brokerTime;

        try {

            if (msg instanceof ObjectMessage) {
                ObjectMessage txt = (ObjectMessage) msg;
                inTime = txt.getLongProperty("JMSActiveMQBrokerInTime");
                outTime = txt.getLongProperty("JMSActiveMQBrokerOutTime");

                brokerTime = outTime - inTime;

                System.out.println("Message waited " + brokerTime + "ms : " + txt.getObject().toString());
            }

        } catch (Exception exc) {
            System.err.println("Error reading message");
            exc.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {

            URI uri = URI.create(args[0]);
            String queueName = args[1];
            String correlationID = args[2];

            new ReadMessages(uri, queueName, correlationID);

        } catch (Exception exc) {
            exc.printStackTrace();
        }

    }
}






> Deadlock or Performance Bottleneck when reading messages with Correlation
> -------------------------------------------------------------------------
>
>                 Key: AMQ-2745
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2745
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.1
>         Environment: Java 64-bit, Windows 2008 Server
>            Reporter: Brad Willard
>            Priority: Minor
>             Fix For: 5.4.1
>
>
> We have a situation where we are posting messages to a queue with two different correlation
ids specifically intended to reach two different clients who subscribe with message selectors
for the appropriate correlation.  The clients are reading with message listeners.  When one
client stops reading the expected behavior, and the behavior we saw on 5.3.0, is that the
messages with the correlation for the stopped client will backup on the queue and will not
effect the performance of the second client who is still reading the messages with the other
correlation.  With our memory config messages can backup into the hundreds of thousands before
noticing any performance impact on the active client.
> However this is not the case in 5.3.1.  With 5.3.1 once enough messages backup for the
stopped client, suddenly the active client's performance drops drastically 20 ms reads to
30,000ms reads.  We will see this within a few hundred messages.  I believe there is some
kind of deadlock, or buffering bottleneck that was introduced on the client side.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message