activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kondro <wayne.robin...@gmail.com>
Subject Re: Method setMessagePrioritySupported(true) called on ActiveMQConnectionFactory doesn't work
Date Wed, 25 Aug 2010 13:13:47 GMT

If we can set the priority by using setJMSPriority to 0 - 9 for the
JDBCStore, how do we set the priority to low/normal/high for kahaDB? Are
these constants somewhere?


Gary Tully wrote:
> 
> priority support needs to be enabled for a destination with a policy
> entry, prioritizedMessages
> see: http://activemq.apache.org/per-destination-policies.html
> The new (5.4) attributes are in red.
> 
> To see the programatic setup check out the source of the one of the
> priority tests:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?view=markup
> 
> Note, with the kahaDB store, there are three levels of priority
> supported (low, normal, high). With the JDBCStore, the full
> complement, 0-9 are supported.
> 
> On 20 August 2010 10:16, sonicBasher <sonicBasher@gmail.com> wrote:
>>
>> Hi,
>>
>> According to activeMQ website, the latest, 5.4 version supports message
>> priority handling:
>> http://activemq.apache.org/new-features-in-54.html
>>
>> What's more, according to this website
>> https://issues.apache.org/activemq/browse/AMQ-2790
>> handling priorities takes place by calling the
>> setMessagePrioritySupported(true) on an ActiveMQConnectionFactory object.
>> Besides, it's the default setting anyways.
>>
>> Now, I tested this using the latest 5.4 version, but the messages don't
>> seem
>> to get ordered by the priority.
>> Here's how I do it (it's really straightforward):
>>
>>
>> --------------------------------------------------------
>> PRODUCER CLASS:
>>
>> import java.util.LinkedList;
>> import java.util.List;
>> import javax.jms.Connection;
>> import javax.jms.JMSException;
>> import javax.jms.MapMessage;
>> import javax.jms.Queue;
>> import javax.jms.Session;
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import javax.jms.DeliveryMode;
>>
>> public final class Main {
>>
>>    public static void main(String[] args) throws JMSException {
>>        String brokerURL = "tcp://localhost:61616";
>>        String pendingQueueName = "pendingQ";
>>        String configID = "12350";
>>        int transactionID = 2;
>>        final int repetitions = 10;
>>
>>        ActiveMQConnectionFactory cf = new
>> ActiveMQConnectionFactory(brokerURL);
>>        Connection connection = cf.createConnection();
>>        connection.start();
>>        Session session = connection.createSession(true,
>> Session.SESSION_TRANSACTED);
>>        Queue queue = session.createQueue(pendingQueueName);
>>        javax.jms.MessageProducer producer =
>> session.createProducer(queue);
>>
>>        for (int i = 0; i < repetitions; i++) {
>>            MapMessage mapMessage = session.createMapMessage();
>>            mapMessage.setStringProperty("configID", configID);
>>            mapMessage.setIntProperty("transactionID", transactionID);
>>            List<String> ls2 = new LinkedList<String>();
>>            ls2.add("abc" + i);
>>            mapMessage.setObject("customerBeanList", ls2);
>>            producer.send(mapMessage, DeliveryMode.PERSISTENT, i, 0);
>>        }
>>        session.commit();
>>        connection.stop();
>>        connection.close();
>>    }
>> }
>>
>>
>> --------------------------------------------------------
>> CONSUMER CLASS:
>>
>> import java.util.LinkedList;
>> import java.util.List;
>>
>>
>> public class Main {
>>
>>    public static void main(String[] args) {
>>        AccountMessageListener aml = null;
>>        String queueName = "pendingQ";
>>        String brokerURL = "tcp://localhost:61616";
>>        String configID = "12350";
>>        int transactionID = 2;
>>        try {
>>            List<String> completeListOfReceivedStrings = new
>> LinkedList<String>();
>>            List<String> tempReceivedStrings = null;
>>            aml = new AccountMessageListener(brokerURL, queueName);
>>            while ((tempReceivedStrings = aml.receiveMessage(configID,
>> transactionID, 0)) != null) {
>>                completeListOfReceivedStrings.addAll(tempReceivedStrings);
>>            }
>>            aml.closeConnection();
>>            String[] arrayOfStrings = null;
>>            if (completeListOfReceivedStrings != null) {
>>                arrayOfStrings = new
>> String[completeListOfReceivedStrings.size()];
>>                for (int i = 0; i < completeListOfReceivedStrings.size();
>> i++) {
>>                    arrayOfStrings[i] =
>> completeListOfReceivedStrings.get(i);
>>                }
>>            }
>>            for (int i = 0; i < arrayOfStrings.length; i++) {
>>                System.out.println(arrayOfStrings[i]);
>>            }
>>        } catch (Exception e) {
>>            e.printStackTrace();
>>            aml.closeConnection();
>>        }
>>    }
>> }
>>
>>
>> --------------------------------------------------------
>> CONSUMER HELPER CLASS:
>>
>> import java.util.List;
>> import javax.jms.Connection;
>> import javax.jms.Destination;
>> import javax.jms.JMSException;
>> import javax.jms.MapMessage;
>> import javax.jms.MessageConsumer;
>> import javax.jms.Session;
>> import org.apache.activemq.ActiveMQConnectionFactory;
>>
>> public class AccountMessageListener {
>>
>>    private ActiveMQConnectionFactory connectionFactory = null;
>>    private Connection connection = null;
>>    private Session session = null;
>>    private Destination destination = null;
>>    private String brokerURL = null;
>>    private String queueName = null;
>>    private MessageConsumer confirmConsumer = null;
>>
>>    /**
>>     * Constructor.
>>     * @param brokerURL broker url
>>     * @param queueName queue name
>>     */
>>    public AccountMessageListener(String brokerURL, String queueName) {
>>        try {
>>            this.brokerURL = brokerURL;
>>            this.queueName = queueName;
>>            createConnection();
>>            session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>            destination = session.createQueue(queueName);
>>        } catch (JMSException e) {
>>            e.printStackTrace();
>>        } catch (Exception e) {
>>            e.printStackTrace();
>>        }
>>    }
>>
>>    /**
>>     * Connects to message queue.
>>     */
>>    private void createConnection() {
>>        try {
>>            connectionFactory = new ActiveMQConnectionFactory(brokerURL);
>>            connectionFactory.setMessagePrioritySupported(true);
>>            connection = connectionFactory.createConnection();
>>            connection.start();
>>        } catch (JMSException e) {
>>            closeConnection();
>>            e.printStackTrace();
>>        } catch (Exception e) {
>>            e.printStackTrace();
>>        }
>>    }
>>
>>    /**
>>     * Receives message from queue.
>>     * @param configID config id
>>     * @param transactionID transaction id
>>     * @return message
>>     */
>>    public List<String> receiveMessage(String configID, int transactionID)
>> {
>>        try {
>>            String messageSelector = "configID = '" + configID + "' AND
>> transactionID = " + transactionID;
>>            confirmConsumer = session.createConsumer(destination,
>> messageSelector);
>>            MapMessage mapMessage = null;
>>            List<String> ls = null;
>>            mapMessage = (MapMessage) confirmConsumer.receive(250);
>>            if (mapMessage != null) {
>>                if (mapMessage.getObject("customerBeanList") != null &&
>> mapMessage.getObject("customerBeanList") instanceof List) {
>>                    ls = (List<String>)
>> mapMessage.getObject("customerBeanList");
>>                }
>>            }
>>            confirmConsumer.close();
>>            return ls;
>>        } catch (JMSException ex) {
>>            ex.printStackTrace();
>>            closeConnection();
>>            return null;
>>        } catch (Exception e) {
>>            e.printStackTrace();
>>            closeConnection();
>>            return null;
>>        }
>>    }
>>
>>    /**
>>     * Closes connection.
>>     */
>>    public void closeConnection() {
>>        try {
>>            session.close();
>>            connection.close();
>>
>>        } catch (Exception e) {
>>            e.printStackTrace();
>>        }
>>    }
>>
>>    /**
>>     * Queue name getter.
>>     * @return queue name
>>     */
>>    public String getQueueName() {
>>        return queueName;
>>    }
>> }
>>
>> ----------------------------------------------------------------
>>
>> Obviously I use ActiveMQ 5.4 on localhost as a message broker. I'll be
>> grateful for any hints.
>> Regards
>>
>>
>>
>> --
>> View this message in context:
>> http://old.nabble.com/Method-setMessagePrioritySupported%28true%29-called-on-ActiveMQConnectionFactory-doesn%27t-work-tp29490144p29490144.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> 
> -- 
> http://blog.garytully.com
> 
> Open Source Integration
> http://fusesource.com
> 
> 

-- 
View this message in context: http://old.nabble.com/Method-setMessagePrioritySupported%28true%29-called-on-ActiveMQConnectionFactory-doesn%27t-work-tp29490144p29532113.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message