qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Sorted Queue
Date Tue, 15 Nov 2011 11:15:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/21/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/qpid/Sorted+Queue">Sorted
Queue</a></h2>
    <h4>Page  <b>added</b> by             <a href="https://cwiki.apache.org/confluence/display/~alex.rufous">Alex
Rudyy</a>
    </h4>
         <br/>
    <div class="notificationGreySide">
         <h3><a name="SortedQueue-SortedQueue"></a>Sorted Queue </h3>

<p>This article provides an introduction into a sorted queue functionality implemented
in Java Broker.</p>

<p>Sorted queue is a special type of queue which can sort messages using special message
property predefined on queue creation. </p>

<p>The sorting performed is based on alphanumeric ascending string sort order and cannot
be altered or overridden. </p>

<p>The attribute defined as the sortKey for a queue should be present in the custom
properties of the message sent i.e. a value stored by a call to Message.setStringProperty(myProperty,
"abc123") and retrieved with Message.getStringProperty(myProperty). </p>

<p>The sort key value used must be of java type type String. JMS pre-defined properties
can-not be used for sorting.</p>

<p>Messages sent to a Sorted Queue without the specified attribute provided will be
added to the end of the queue.</p>

<h3><a name="SortedQueue-DefiningSortedQueues"></a>Defining Sorted Queues</h3>

<p>The sorted queue must be defined before it can be used. Existing queue cannot be
re-declared or converted into sorted queue without deleting it.</p>

<p>The sorted queue can be created via address based URL using 'x-declare' 'arguments'
'qpid.queue_sort_key' property in a node declaration like in example below</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
my-sorted-queue;
{
    create: always,
    node:
    {
       durable: <span class="code-keyword">true</span>,
       x-declare:
       {
           arguments:
           {
                'qpid.queue_sort_key': 'my-sort-key'
           }
       }
    }
}
</pre>
</div></div>

<p>Alternatively, sorted queue can be defined in broker virtualhost configuration file.
When defining the queue, add a &lt;sortKey/&gt; element that will determine which
message attribute to use the value of when sorting the messages put onto the queue.</p>

<div class='panelMacro'><table class='noteMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/warning.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td>You cannot create a sorted version
of any other queue type, priority or conflation. If you specify the sortKey for a queue then
priority or last value parameters will be ignored.</td></tr></table></div>


<h4><a name="SortedQueue-BrokerConfiguration"></a>Broker Configuration</h4>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Sorted Queue Broker Config</b></div><div
class="codeContent panelContent">
<pre class="code-xml">
<span class="code-tag">&lt;queues&gt;</span>
  <span class="code-tag">&lt;exchange&gt;</span>amq.direct<span class="code-tag">&lt;/exchange&gt;</span>
  <span class="code-tag">&lt;queue&gt;</span>
     <span class="code-tag">&lt;name&gt;</span>example-sorted-queue<span
class="code-tag">&lt;/name&gt;</span>
     <span class="code-tag">&lt;example-sorted-queue&gt;</span>
       <span class="code-tag">&lt;durable&gt;</span>true<span class="code-tag">&lt;/durable&gt;</span>
       <span class="code-tag">&lt;sortKey&gt;</span>message-attribute-to-sort-by<span
class="code-tag">&lt;/sortKey&gt;</span>
     <span class="code-tag">&lt;/example-sorted-queue&gt;</span>
  <span class="code-tag">&lt;/queue&gt;</span>
<span class="code-tag">&lt;/queues&gt;</span>
</pre>
</div></div>

<h4><a name="SortedQueue-Declaringofsortedqueueprogrammatically"></a>Declaring
of sorted queue programmatically</h4>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Sorted Queue Creation Code</b></div><div
class="codeContent panelContent">
<pre class="code-java">
Map&lt;<span class="code-object">String</span>,<span class="code-object">Object</span>&gt;
arguments = <span class="code-keyword">new</span> HashMap&lt;<span class="code-object">String</span>,
<span class="code-object">Object</span>&gt;();
arguments.put(<span class="code-quote">"qpid.queue_sort_key"</span>,messageAttributeToSortBy);
((AMQSession) session).createQueue(queueName, autoDelete, durable, exclusive, arguments);
</pre>
</div></div>

<div class='panelMacro'><table class='noteMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/warning.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td>This API is likely to be changed
in version 1.0 of qpid client.</td></tr></table></div>

<h4><a name="SortedQueue-DeclaringofsortedqueueprogrammaticallyviaJMX"></a>Declaring
of sorted queue programmatically via JMX</h4>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Sorted Queue Creation via JMX</b></div><div
class="codeContent panelContent">
<pre class="code-java">
JMXConnector jmxc;
MBeanServerConnection mbsc;
ObjectName queueObjectName;
ManagedBroker amqbmb;
<span class="code-keyword">try</span>
{
  jmxc = JMXConnectorFactory.connect(jmxUrl, env);
  mbsc = jmxc.getMBeanServerConnection();
  queueObjectName = <span class="code-keyword">new</span> ObjectName(mbeanName);
  amqbmb = JMX.newMBeanProxy(mbsc, queueObjectName, ManagedBroker.class,<span class="code-keyword">true</span>);
  Map&lt;<span class="code-object">String</span>,<span class="code-object">Object</span>&gt;
arguments = <span class="code-keyword">new</span> HashMap&lt;<span class="code-object">String</span>,
<span class="code-object">Object</span>&gt;();
  arguments.put(<span class="code-quote">"qpid.queue_sort_key"</span>,messageAttributeToSortBy);
  amqbmb.createNewQueue(queueName, queueOwner, queueDurable, arguments);
  jmxc.close();
} 
<span class="code-keyword">catch</span> (Exception e)
{
  e.printStackTrace();
}
</pre>
</div></div>

<h3><a name="SortedQueue-ReceivingMessages"></a>Receiving Messages</h3>


<h4><a name="SortedQueue-SettingtheMessageAttribute"></a>Setting the Message
Attribute</h4>

<p>The following is an example of how to set the the sort attribute on a message being
sent to a sorted queue.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Setting Message Sort Attribute</b></div><div
class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">final</span> <span class="code-object">String</span>
messageAttributeToSortBy = <span class="code-quote">"sortKey"</span>;
<span class="code-keyword">final</span> <span class="code-object">String</span>
sortValue = myTradeStep.getID();<span class="code-comment">//alphanumeric like abc123
</span>
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

<span class="code-keyword">final</span> Message msg = session.createTextMessage(messageBodyText);
msg.setStringProperty(messageAttributeToSortBy, sortValue);
producer.send(msg);
</pre>
</div></div>

<h4><a name="SortedQueue-SetLowPreFetch"></a>Set Low Pre-Fetch</h4>

<p>Qpid clients receive buffered messages in batches, sized according to the pre-fetch
value. The current default is 500. However, if message order is important for your application
and you wish messages to be delivered to consumers in the correct sorted order then you must
use a low pre-fetch setting for your clients. </p>

<p>With a low pre-fetch value the broker has an opportunity order messages correctly
before sending to the client. With a larger (or default) pre-fetch value message will be sent
from the queue as soon as they arrive, and then be buffered on the client. This may lead to
cases where messages sorted "higher" are buffered behind "lower" messages within the client.
</p>

<p>So, you need to set the prefetch values for your client (consumer) to make this sensible.
To do this set the java system property max_prefetch on the client environment (using -D)
before creating your consumer.</p>

<p>Setting the Qpid pre-fetch to 1 for your client means that message sort attribute
will be honoured by the Qpid broker as it dispatches messages to your client. </p>

<p>A default for all client connections can be set via a system property:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Max Prefetch System Property</b></div><div
class="codeContent panelContent">
<pre class="code-java">
-Dmax_prefetch=1
</pre>
</div></div>

<p>The prefetch can be also be adjusted on a per connection basis by adding a 'maxprefetch'
value to the connection url</p>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Max Prefetch in Connection Url</b></div><div
class="codeContent panelContent">
<pre class="code-java">
amqp:<span class="code-comment">//guest:guest@client1/development?maxprefetch='1'&amp;brokerlist='tcp://localhost:5672'</span>
</pre>
</div></div>

<h4><a name="SortedQueue-Singleconsumerpersession"></a>Single consumer per
session</h4>

<p>If you are using the receive() method to consume messages then you <b>must</b>
also only use one consumer per session with sorted queues. If you're using OnMessage() then
this is not a concern.</p>

<h3><a name="SortedQueue-BrowsingandManipulating"></a>Browsing and Manipulating</h3>

<p>Browsing and manipulating of sorted queues can be done as per normal via the Qpid
JMX Management console.</p>

    </div>
    <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>
       <a href="https://cwiki.apache.org/confluence/display/qpid/Sorted+Queue">View
Online</a>
              |
       <a href="https://cwiki.apache.org/confluence/display/qpid/Sorted+Queue?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
           </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message