activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sonicBasher <sonicBas...@gmail.com>
Subject Method setMessagePrioritySupported(true) called on ActiveMQConnectionFactory doesn't work
Date Fri, 20 Aug 2010 09:16:15 GMT

Hi,

According to activeMQ website, the latest, 5.4 version supports message
priority handling:
http://activemq.apache.org/new-features-in-54.html

What's more, according to this website
https://issues.apache.org/activemq/browse/AMQ-2790
handling priorities takes place by calling the
setMessagePrioritySupported(true) on an ActiveMQConnectionFactory object.
Besides, it's the default setting anyways.

Now, I tested this using the latest 5.4 version, but the messages don't seem
to get ordered by the priority.
Here's how I do it (it's really straightforward):


--------------------------------------------------------
PRODUCER CLASS:

import java.util.LinkedList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.DeliveryMode;

public final class Main {

    public static void main(String[] args) throws JMSException {
        String brokerURL = "tcp://localhost:61616";
        String pendingQueueName = "pendingQ";
        String configID = "12350";
        int transactionID = 2;
        final int repetitions = 10;

        ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(brokerURL);
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue(pendingQueueName);
        javax.jms.MessageProducer producer = session.createProducer(queue);

        for (int i = 0; i < repetitions; i++) {
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setStringProperty("configID", configID);
            mapMessage.setIntProperty("transactionID", transactionID);
            List<String> ls2 = new LinkedList<String>();
            ls2.add("abc" + i);
            mapMessage.setObject("customerBeanList", ls2);
            producer.send(mapMessage, DeliveryMode.PERSISTENT, i, 0);
        }
        session.commit();
        connection.stop();
        connection.close();
    }
}


--------------------------------------------------------
CONSUMER CLASS:

import java.util.LinkedList;
import java.util.List;


public class Main {

    public static void main(String[] args) {
        AccountMessageListener aml = null;
        String queueName = "pendingQ";
        String brokerURL = "tcp://localhost:61616";
        String configID = "12350";
        int transactionID = 2;
        try {
            List<String> completeListOfReceivedStrings = new
LinkedList<String>();
            List<String> tempReceivedStrings = null;
            aml = new AccountMessageListener(brokerURL, queueName);
            while ((tempReceivedStrings = aml.receiveMessage(configID,
transactionID, 0)) != null) {
                completeListOfReceivedStrings.addAll(tempReceivedStrings);
            }
            aml.closeConnection();
            String[] arrayOfStrings = null;
            if (completeListOfReceivedStrings != null) {
                arrayOfStrings = new
String[completeListOfReceivedStrings.size()];
                for (int i = 0; i < completeListOfReceivedStrings.size();
i++) {
                    arrayOfStrings[i] =
completeListOfReceivedStrings.get(i);
                }
            }
            for (int i = 0; i < arrayOfStrings.length; i++) {
                System.out.println(arrayOfStrings[i]);
            }
        } catch (Exception e) {
            e.printStackTrace();
            aml.closeConnection();
        }
    }
}


--------------------------------------------------------
CONSUMER HELPER CLASS:

import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class AccountMessageListener {

    private ActiveMQConnectionFactory connectionFactory = null;
    private Connection connection = null;
    private Session session = null;
    private Destination destination = null;
    private String brokerURL = null;
    private String queueName = null;
    private MessageConsumer confirmConsumer = null;

    /**
     * Constructor.
     * @param brokerURL broker url
     * @param queueName queue name
     */
    public AccountMessageListener(String brokerURL, String queueName) {
        try {
            this.brokerURL = brokerURL;
            this.queueName = queueName;
            createConnection();
            session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Connects to message queue.
     */
    private void createConnection() {
        try {
            connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            connectionFactory.setMessagePrioritySupported(true);
            connection = connectionFactory.createConnection();
            connection.start();
        } catch (JMSException e) {
            closeConnection();
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Receives message from queue.
     * @param configID config id
     * @param transactionID transaction id
     * @return message
     */
    public List<String> receiveMessage(String configID, int transactionID) {
        try {
            String messageSelector = "configID = '" + configID + "' AND
transactionID = " + transactionID;
            confirmConsumer = session.createConsumer(destination,
messageSelector);
            MapMessage mapMessage = null;
            List<String> ls = null;
            mapMessage = (MapMessage) confirmConsumer.receive(250);
            if (mapMessage != null) {
                if (mapMessage.getObject("customerBeanList") != null &&   
mapMessage.getObject("customerBeanList") instanceof List) {
                    ls = (List<String>)
mapMessage.getObject("customerBeanList");
                }
            }
            confirmConsumer.close();
            return ls;
        } catch (JMSException ex) {
            ex.printStackTrace();
            closeConnection();
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            closeConnection();
            return null;
        }
    }

    /**
     * Closes connection.
     */
    public void closeConnection() {
        try {
            session.close();
            connection.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * Queue name getter.
     * @return queue name
     */
    public String getQueueName() {
        return queueName;
    }
}

----------------------------------------------------------------

Obviously I use ActiveMQ 5.4 on localhost as a message broker. I'll be
grateful for any hints.
Regards



-- 
View this message in context: http://old.nabble.com/Method-setMessagePrioritySupported%28true%29-called-on-ActiveMQConnectionFactory-doesn%27t-work-tp29490144p29490144.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message