activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nico <nico.roga...@gmail.com>
Subject Re: Real Priority Message Consuming in ActiveMQ
Date Fri, 04 May 2012 11:41:22 GMT
Thanks for the quick response.
So your saying that activemq does support my use case right?
Well the Junit Test looks okay too, I just read some blog entries and jira
bugs of activemq, that suggested that prioritizedQueues will only be
supported in 6.0, but I guess those posts where outdated.

I had the prefetch down to 0 already, since we only have a small amount of
long lasting tasks promoted by the queue.
So then I guess it must be some configuration issue.
I'm using activemq 5.5.1 as a web application war on JBoss 7.1. and I
configure the queues etc. via Spring in my application.
I see the messages landing in the queue with the correct priorities set,
via the activemq web console, but my application shows me that they are
consumed in order as they arrived the queue.
Switching to hornetQ with only minimal adjustments in Spring to the
ConnectionFactory and the priorized consume works, so I guess the
production of priorized JMS messages works.

So how can I check if the priorization of a queue is really activated?
Cause it seems to make no difference at all including the policyEntry with
prioritizedMessages="true" or not.

Here's my configuration:

activemq.xml

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:osgi="http://www.springframework.org/schema/osgi"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
  http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
  http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd">

  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

  <broker brokerName="web-console" start="false" persistent="true"
useJmx="true" xmlns="http://activemq.apache.org/schema/core">

    <destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">" prioritizedMessages="true"/>
        </policyEntries>
    </policyMap>
    </destinationPolicy>

     <managementContext>
    <managementContext createConnector="false"/>
    </managementContext>

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#jms-ds"/>
    </persistenceAdapter>

    <plugins>
    <simpleAuthenticationPlugin anonymousAccessAllowed="false">
        <users>
            <authenticationUser username="****" password="****"
groups="users,admins" />
        </users>
    </simpleAuthenticationPlugin>
    <authorizationPlugin>
        <map>
            <authorizationMap>
                <authorizationEntries>
                    <authorizationEntry queue=">" read="users"
write="users" admin="admins" />
                    <authorizationEntry topic=">" read="users"
write="users" admin="admins" />
                    <authorizationEntry topic="ActiveMQ.Advisory.>"
read="users" write="users" admin="users" />
                </authorizationEntries>
                <tempDestinationAuthorizationEntry>
                    <tempDestinationAuthorizationEntry read="users"
write="users" admin="admins" />
                </tempDestinationAuthorizationEntry>
            </authorizationMap>
        </map>
    </authorizationPlugin>
    </plugins>

    <transportConnectors>
      <transportConnector name="default"
uri="tcp://${activemq.transport.host}:#{${activemq.transport.port} +
${jboss.socket.binding.port-offset}}"/>
    </transportConnectors>
  </broker>

  <bean id="jms-ds" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="java:jboss/datasources/GXLE"/>
  </bean>

</beans>


And the Spring config in my application:

<!-- Pool Connection Factory ActiveMQ-->
    <bean id="jmsConnectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL">
        <value>${ACTIVEMQ_TRANSPORT}</value>
        </property>
        <property name="userName" value="****" />
        <property name="password" value="****" />
        <property name="prefetchPolicy" >
            <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
                <property name="all" value="0"></property>
            </bean>
        </property>
        <property name="messagePrioritySupported" value="true" />
    </bean>

<bean id="highlyPrioritizedPublisherTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="explicitQosEnabled" value="true" />
        <property name="priority" value="9" />
    </bean>
    <bean id="lowPrioritizedPublisherTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="sessionTransacted" value="true" />
        <property name="explicitQosEnabled" value="true" />
        <property name="priority" value="0" />
    </bean>
<bean id="berechnungJobQueue"
class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0"
            value="gasx.grid.berechnungJobQueue" />
    </bean>


On Fri, May 4, 2012 at 12:34 PM, gtully [via ActiveMQ] <
ml-node+s2283324n4608310h13@n4.nabble.com> wrote:

> hmm. prefetch may be causing you some trouble, use prefetch=0 to
> ensure each receive asks the broker for the next highest priority
> message, otherwise a newly created high priority message could be
> backed up behind prefetched low priority ones.
>
> Note the test for priority: org.apache.activemq.store.MessagePriorityTest
>
> http://svn.apache.org/repos/asf/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
>
> If you can provide some variation of that that that demonstrates a
> problem, we can easily look into it.
>
> Try a recent fuse release[1] or the current apache snapshot[2] or the
> 5.6 release candidate[3], all should suffice
>
> [1]
> http://repo.fusesource.com/nexus/content/repositories/releases/org/apache/activemq/apache-activemq/5.5.1-fuse-04-01/
> [2]
> https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.7-SNAPSHOT/
> [3]
> https://repository.apache.org/content/repositories/orgapacheactivemq-025/org/apache/activemq/apache-activemq/5.6.0/
>
>
> On 4 May 2012 11:02, nico <[hidden email]<http://user/SendEmail.jtp?type=node&node=4608310&i=0>>
> wrote:
>
> > Hi there,
> > we are using activeMQ 5.5.2 as message broker for means of job
> distribution
> > across systems and as safeguard against failures.
> >
> > Now a new reqiurement rouse in the means of consuming messages in order
> of
> > their priority, so that important jobs are processed before unimportant
> > ones.
> >
> > Looking into the documentation and the all so often recited faq about
> "How
> > can I support priority queues?" it seems quite easy to just activate
> > priority message consuming with the boolean prioritizedMessages and a
> little
> > configuration on the publisher side to include priorities in your jms
> > messages.
> > But it doesn't work. Not as I would expect real priority message
> consuming
> > expect to be working, by always pulling the highest prioritized message
> out
> > of the queue. And looking into several blog entries and jira bugs it
> evens
> > shows that it's not even supposed to work. Maybe on a entry buffer level
> of
> > messages being ordered when they enter the queue simultaneously, but not
> > when they are persistet in the queue.
> >
> > So then I looked into the also stated "Alternatives" for priority
> message
> > consuming.
> > The Selector Approach doesn't suite our needs, cause we only have a
> little
> > number of consumers, that all have to process high and low priority
> jobs.
> > The Resequencer Approach sounded promising too, until I realized that it
> is
> > not intended to resequence messages within a queue, but just in the
> > transition from one queue to another. I implemented the 2 queues with
> > producer and consumer being connected by a BatchResequencer approach,
> but
> > that doesn't work very well.
> > The problem is this: say there are a 100 low priority messages and 1
> high
> > priority message coming in in the first minute, the resequencer takes
> those
> > and writes them in the consumer queue with the high priority message
> being
> > first. Fine. But then in the seconde minute 50 more high priority
> messages
> > enter the producer queue. The Resequencer takes those messages and puts
> them
> > behind the low priorized messages in the consumer queue. So no more
> > priorization. One can of course play around with the timeouts and batch
> > sizes, but you will never reach "real priority message consuming" where
> a
> > higher priorized message is always consumed before a lower priorized
> > message.
> >
> > So I have two issues I would like to discuss:
> > 1. Does activeMQ support real prioritized message consuming, where a
> high
> > priority message is always consumed before a low prio message, even if
> they
> > entered the queue minutes apart?
> > 2. If it doesn't, why does the faq "How can I support priority queues?"
> > state that it does? I didn't see one Remark on there like "Oh and if you
> > mean 'high prio before low prio always' with message priority, then
> sorry no
> > we don't support that".
> >
> > btw: I know the JMS specification doesn't require priority messaging
> order,
> > or any order, but as I looked into hornetQ for example, they have a
> perfect
> > implementation of real message priority consuming. So it should be
> possible.
> >
> > Hope this wasn't too long for anyone to answer ;o)
> > Thanks in advance!
> >
> > Nico
> >
> >
> >
> > --
> > View this message in context:
> http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262.html
> > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
> --
> http://fusesource.com
> http://blog.garytully.com
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262p4608310.html
>  To unsubscribe from Real Priority Message Consuming in ActiveMQ, click
> here<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=4608262&code=bmljby5yb2dhc2NoQGdtYWlsLmNvbXw0NjA4MjYyfDc0Mjk4MDE3Nw==>
> .
> NAML<http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>


--
View this message in context: http://activemq.2283324.n4.nabble.com/Real-Priority-Message-Consuming-in-ActiveMQ-tp4608262p4608418.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message