activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Method setMessagePrioritySupported(true) called on ActiveMQConnectionFactory doesn't work
Date Fri, 20 Aug 2010 10:24:34 GMT
priority support needs to be enabled for a destination with a policy
entry, prioritizedMessages
see: http://activemq.apache.org/per-destination-policies.html
The new (5.4) attributes are in red.

To see the programatic setup check out the source of the one of the
priority tests:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?view=markup

Note, with the kahaDB store, there are three levels of priority
supported (low, normal, high). With the JDBCStore, the full
complement, 0-9 are supported.

On 20 August 2010 10:16, sonicBasher <sonicBasher@gmail.com> wrote:
>
> Hi,
>
> According to activeMQ website, the latest, 5.4 version supports message
> priority handling:
> http://activemq.apache.org/new-features-in-54.html
>
> What's more, according to this website
> https://issues.apache.org/activemq/browse/AMQ-2790
> handling priorities takes place by calling the
> setMessagePrioritySupported(true) on an ActiveMQConnectionFactory object.
> Besides, it's the default setting anyways.
>
> Now, I tested this using the latest 5.4 version, but the messages don't seem
> to get ordered by the priority.
> Here's how I do it (it's really straightforward):
>
>
> --------------------------------------------------------
> PRODUCER CLASS:
>
> import java.util.LinkedList;
> import java.util.List;
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.MapMessage;
> import javax.jms.Queue;
> import javax.jms.Session;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import javax.jms.DeliveryMode;
>
> public final class Main {
>
>    public static void main(String[] args) throws JMSException {
>        String brokerURL = "tcp://localhost:61616";
>        String pendingQueueName = "pendingQ";
>        String configID = "12350";
>        int transactionID = 2;
>        final int repetitions = 10;
>
>        ActiveMQConnectionFactory cf = new
> ActiveMQConnectionFactory(brokerURL);
>        Connection connection = cf.createConnection();
>        connection.start();
>        Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>        Queue queue = session.createQueue(pendingQueueName);
>        javax.jms.MessageProducer producer = session.createProducer(queue);
>
>        for (int i = 0; i < repetitions; i++) {
>            MapMessage mapMessage = session.createMapMessage();
>            mapMessage.setStringProperty("configID", configID);
>            mapMessage.setIntProperty("transactionID", transactionID);
>            List<String> ls2 = new LinkedList<String>();
>            ls2.add("abc" + i);
>            mapMessage.setObject("customerBeanList", ls2);
>            producer.send(mapMessage, DeliveryMode.PERSISTENT, i, 0);
>        }
>        session.commit();
>        connection.stop();
>        connection.close();
>    }
> }
>
>
> --------------------------------------------------------
> CONSUMER CLASS:
>
> import java.util.LinkedList;
> import java.util.List;
>
>
> public class Main {
>
>    public static void main(String[] args) {
>        AccountMessageListener aml = null;
>        String queueName = "pendingQ";
>        String brokerURL = "tcp://localhost:61616";
>        String configID = "12350";
>        int transactionID = 2;
>        try {
>            List<String> completeListOfReceivedStrings = new
> LinkedList<String>();
>            List<String> tempReceivedStrings = null;
>            aml = new AccountMessageListener(brokerURL, queueName);
>            while ((tempReceivedStrings = aml.receiveMessage(configID,
> transactionID, 0)) != null) {
>                completeListOfReceivedStrings.addAll(tempReceivedStrings);
>            }
>            aml.closeConnection();
>            String[] arrayOfStrings = null;
>            if (completeListOfReceivedStrings != null) {
>                arrayOfStrings = new
> String[completeListOfReceivedStrings.size()];
>                for (int i = 0; i < completeListOfReceivedStrings.size();
> i++) {
>                    arrayOfStrings[i] =
> completeListOfReceivedStrings.get(i);
>                }
>            }
>            for (int i = 0; i < arrayOfStrings.length; i++) {
>                System.out.println(arrayOfStrings[i]);
>            }
>        } catch (Exception e) {
>            e.printStackTrace();
>            aml.closeConnection();
>        }
>    }
> }
>
>
> --------------------------------------------------------
> CONSUMER HELPER CLASS:
>
> import java.util.List;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MapMessage;
> import javax.jms.MessageConsumer;
> import javax.jms.Session;
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> public class AccountMessageListener {
>
>    private ActiveMQConnectionFactory connectionFactory = null;
>    private Connection connection = null;
>    private Session session = null;
>    private Destination destination = null;
>    private String brokerURL = null;
>    private String queueName = null;
>    private MessageConsumer confirmConsumer = null;
>
>    /**
>     * Constructor.
>     * @param brokerURL broker url
>     * @param queueName queue name
>     */
>    public AccountMessageListener(String brokerURL, String queueName) {
>        try {
>            this.brokerURL = brokerURL;
>            this.queueName = queueName;
>            createConnection();
>            session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>            destination = session.createQueue(queueName);
>        } catch (JMSException e) {
>            e.printStackTrace();
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
>
>    /**
>     * Connects to message queue.
>     */
>    private void createConnection() {
>        try {
>            connectionFactory = new ActiveMQConnectionFactory(brokerURL);
>            connectionFactory.setMessagePrioritySupported(true);
>            connection = connectionFactory.createConnection();
>            connection.start();
>        } catch (JMSException e) {
>            closeConnection();
>            e.printStackTrace();
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
>
>    /**
>     * Receives message from queue.
>     * @param configID config id
>     * @param transactionID transaction id
>     * @return message
>     */
>    public List<String> receiveMessage(String configID, int transactionID) {
>        try {
>            String messageSelector = "configID = '" + configID + "' AND
> transactionID = " + transactionID;
>            confirmConsumer = session.createConsumer(destination,
> messageSelector);
>            MapMessage mapMessage = null;
>            List<String> ls = null;
>            mapMessage = (MapMessage) confirmConsumer.receive(250);
>            if (mapMessage != null) {
>                if (mapMessage.getObject("customerBeanList") != null &&
> mapMessage.getObject("customerBeanList") instanceof List) {
>                    ls = (List<String>)
> mapMessage.getObject("customerBeanList");
>                }
>            }
>            confirmConsumer.close();
>            return ls;
>        } catch (JMSException ex) {
>            ex.printStackTrace();
>            closeConnection();
>            return null;
>        } catch (Exception e) {
>            e.printStackTrace();
>            closeConnection();
>            return null;
>        }
>    }
>
>    /**
>     * Closes connection.
>     */
>    public void closeConnection() {
>        try {
>            session.close();
>            connection.close();
>
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
>
>    /**
>     * Queue name getter.
>     * @return queue name
>     */
>    public String getQueueName() {
>        return queueName;
>    }
> }
>
> ----------------------------------------------------------------
>
> Obviously I use ActiveMQ 5.4 on localhost as a message broker. I'll be
> grateful for any hints.
> Regards
>
>
>
> --
> View this message in context: http://old.nabble.com/Method-setMessagePrioritySupported%28true%29-called-on-ActiveMQConnectionFactory-doesn%27t-work-tp29490144p29490144.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Mime
View raw message