qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Qpid Design - Queue Implementation
Date Wed, 02 Sep 2009 15:47:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
     <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation">Qpid
Design - Queue Implementation</a></h2>
     <h4>Page <b>edited</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
     
          <br/>
     <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#QpidDesign-QueueImplementation-StrictOrdering'>Strict Ordering</a></li>
    <li><a href='#QpidDesign-QueueImplementation-Enqueing'>Enqueing</a></li>
    <li><a href='#QpidDesign-QueueImplementation-PriorityQueues'>Priority Queues</a></li>
</ul></div>

<h2><a name="QpidDesign-QueueImplementation-StrictOrdering"></a>Strict Ordering</h2>

<p>The fundamental principal of the Queuing model is that the queue provides a strict
order on the messages being enqueued.  Furthermore that order is maintained through the lifetime
of the entries on the queue: thus if a message is returned (e.g. the prefetched messages being
released upon the consumer closing) the order of that message with respect to other messages
on the queue is maintained.</p>

<p>The strict ordering is enforced by the use of a queue data-structure.  In order for
this to be performant, the data structure uses a lockless thread-safe designed based around
the same algorithm used in the java.util.concurrent.ConcurrentLinkedList (more precisely it
is based on the public domain implementation in the backport util concurrent project).  See
the section on Concurrent List implementations for more details.</p>

<p>Each subscription keeps a "pointer" into the list denoting the point at which that
particular subscription has reached.  A particular subscription will only deliver a message
if it is the next AVAILABLE entry on the queue after the pointer which it maintains which
matches any selection criteria the subscription may have.</p>

<p><img src="/confluence/download/attachments/2853062/broker-queue-subscription.png"
align="absmiddle" border="0" /></p>

<p>Thread safety is maintained by using the thread-safe atomic compare-and-swap operations
for maintaining queue entry state (as described above) and also for updating the pointer on
the subscription.  The queue is written so that many threads may be simultaneously attempting
to perform deliveries simultaneously on the same messages and/or subscriptions.</p>

<h2><a name="QpidDesign-QueueImplementation-Enqueing"></a>Enqueing</h2>

<p>When a message is enqueued (using the enqueue() method on the AMQQueue implementation)
it is first added to the tail of the list.  Then the code iterates over the subscriptions
(starting at the last subscription the queue was known to have delivered for reasons of fairness).
 For each subscription found it attempts delivery (details describe below).  If the message
cannot be delivered to any subscription then the "immediate" flag on the message is inspected.
 If the message required immediate delivery then the message is immediately dequeued, otherwise
an asynchronous job is created to attempt delivery at a later point.</p>

<p>(Note there is a "shortcut" path for queues which have an exclusive subscriber. 
In this case we know there is one and only one subscriber and so we can go directly to trying
to deliver to it without worrying about iterators, etc.)</p>

<p>Potential Issue:  Looking at the code which performs the check of the immediate flag
I believe there is a race condition:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
        <span class="code-keyword">if</span> (entry.immediateAndNotDelivered())
        {
            dequeue(storeContext, entry);
            entry.dispose(storeContext);
        }
</pre>
</div></div>

<p>This does not look to be safe in the case where there is a simultaneous execution
of an asynchronous deliver which may acquire the message between the check of immediateAndDelivered
and dequeue.  Instead of calling dequeue directly we should instead do a safe compare-and-swap
test to make sure the entry state is "AVAILABLE" before setting it to DEQUEUED.  The implementation
of this should probably look much like the implementation of entry.dequeue except for the
different expected starting state.<br/>
Immediate Delivery</p>

<p>For each subscription to the queue, we call the following code:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">private</span> void deliverToSubscription(<span
class="code-keyword">final</span> Subscription sub, <span class="code-keyword">final</span>
QueueEntry entry)
            <span class="code-keyword">throws</span> AMQException
    {

        sub.getSendLock();
        <span class="code-keyword">try</span>
        {
            <span class="code-keyword">if</span> (subscriptionReadyAndHasInterest(sub,
entry)
                &amp;&amp; !sub.isSuspended())
            {
                <span class="code-keyword">if</span> (!sub.wouldSuspend(entry))
                {
                    <span class="code-keyword">if</span> (!sub.isBrowser() &amp;&amp;
!entry.acquire(sub))
                    {
                        <span class="code-comment">// restore credit here that would
have been taken away by wouldSuspend since we didn't manage
</span>                        <span class="code-comment">// to acquire the entry
<span class="code-keyword">for</span> <span class="code-keyword">this</span>
subscription
</span>                        sub.restoreCredit(entry);
                    }
                    <span class="code-keyword">else</span>
                    {

                        deliverMessage(sub, entry);

                    }
                }
            }
        }
        <span class="code-keyword">finally</span>
        {
            sub.releaseSendLock();
        }
    }

</pre>
</div></div>

<p>This code first takes a lock on the subscriber (this prevents it being removed while
we are carrying out this operation).  It then tests if the given subscription can take this
message at the moment (see below for more details).  It then further tests if there is enough
flow control credit to send this message to this subscription.  If there is credit (and the
subscription is not a "browser" then is attempts to acquire the entry ( entry.acquire(sub)
).  If the acquisition is successful (or if the subscription is a browser and thus does not
need to acquire the entry) then the entry is delivered to the subscription, else the credit
that would have been used by the message if sent is restored.</p>

<p>The most interesting method called in the above is subscriptionReadyAndHasInterest(sub,
entry):</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">private</span> <span class="code-object">boolean</span>
subscriptionReadyAndHasInterest(<span class="code-keyword">final</span> Subscription
sub, <span class="code-keyword">final</span> QueueEntry entry)
    {
        <span class="code-comment">// We need to move <span class="code-keyword">this</span>
subscription on, past entries which are already acquired, or deleted or ones it has no
</span>        <span class="code-comment">// interest in.
</span>        QueueEntry node = sub.getLastSeenEntry();
        <span class="code-keyword">while</span> (node != <span class="code-keyword">null</span>
&amp;&amp; (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
        {

            QueueEntry newNode = _entries.next(node);
            <span class="code-keyword">if</span> (newNode != <span class="code-keyword">null</span>)
            {
                sub.setLastSeenEntry(node, newNode);
                node = sub.getLastSeenEntry();
            }
            <span class="code-keyword">else</span>
            {
                node = <span class="code-keyword">null</span>;
                <span class="code-keyword">break</span>;
            }

        }

        <span class="code-keyword">if</span> (node == entry)
        {
            <span class="code-comment">// If the first entry that subscription can process
is the one we are trying to deliver to it, then we are
</span>            <span class="code-comment">// good
</span>            <span class="code-keyword">return</span> <span class="code-keyword">true</span>;
        }
        <span class="code-keyword">else</span>
        {
            <span class="code-keyword">return</span> <span class="code-keyword">false</span>;
        }

    }

</pre>
</div></div>

<p>Here we see how the subscription is inspected to see where its pointer into the queue
(the last seen entry) is in respect to the entry we are trying to deliver.  We start from
the subscription's current lastSeenEntry and work our way down the list passing over entries
which are already acquired by other subscriptions, deleted, or which this subscription has
no interest in (e.g. because the node does not meet the subscription's selection criteria);
all the while we can update the lastSeenEntry to take it past the entries this subscription
has now inspected.  Performing this iteration we will eventually arrive at the next entry
the subscription is interested in (or just fall off the end of the list).  At this point either
the next entry that the subscription is interested in is the entry we wish to deliver (success&#33;)
or not.</p>

<h2><a name="QpidDesign-QueueImplementation-PriorityQueues"></a>Priority
Queues</h2>

<p>The fundamental difference between Priority Queues and other Queues is that the strict
ordering on the queue is not purely FIFO.  Instead the ordering is a combination of FIFO and
the priority assigned to the message.  To provide strict priority ordering (where a message
of higher priority will always be delivered in preference to a message of lower priority)
we can implement a priority queue as an ordered list of standard sub-queues with the ordering
between them defined such the tail of the highest priority sub-queue is followed by the head
of the sub-queue of the next highest priority.</p>

<p>By defining the standard queue implementation such that the methods which determine
the ordering between the nodes can be overridden, the implementation of such a strict priority
queue is almost trivial.</p>

<p><img src="/confluence/download/attachments/2853062/broker-priority-queue-subscription.png"
align="absmiddle" border="0" />  </p>

<p>The interface QueueEntryList provides an extension point for adding a new queue implementation
type:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">public</span> <span class="code-keyword">interface</span>
QueueEntryList
{
    AMQQueue getQueue();

    QueueEntry add(AMQMessage message);

    QueueEntry next(QueueEntry node);

    QueueEntryIterator iterator();

    QueueEntry getHead();
}
</pre>
</div></div>

<p>The class PriorityQueueList provides the concrete implementation of a strict priority
queue as defined above.  The constructor takes an argument defining how many priority levels
are to be provided.  </p>

<p>When a message is added to the list by calling the add() method, the class first
works out which sub-queue to add the message to.  This is determined by an algorithm identical
to that defined in the AMQP0-10 specification and compliant with the JMS requirements.  The
message is added to the tail of the appropriate sub-queue.</p>

<p>The next() method returns the QueueEntry which logically follows the QueueEntry provided
by the caller.  First we can simply look at the sub-queue in which the passed QueueEntry is
actually in.  If there is a subsequent entry in that sub-queue then we use that.  If there
is no subsequent entry in the sub-queue then we must find the next highest priorty subqueue
and take the head of that (repeating until we find a subqueue which is non-empty).</p>

<p>The getHead() method iterates over the subqueues to find the highest priority sibqueue
which is non-empty and then returns the head of that subqueue.</p>

<p>The iterator() method returns an iterator that respects the ordering defined above.<br/>
The only other difference between a PriortyQueue and the standard queue is that new messages
arriving may be logically "before" messages that have arrived previously (i.e. a high priority
message is always logically prior to a low priority message in the queue).  This means that
on arrival of a message into the queue all subscriptions need to be inspected to make sure
their pointer is not "ahead" of the new arrival.</p>

<p>Thus the entire implementation of AMQPriorityQueue is as follows:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">public</span> class AMQPriorityQueue <span class="code-keyword">extends</span>
SimpleAMQQueue
{
    <span class="code-keyword">protected</span> AMQPriorityQueue(<span class="code-keyword">final</span>
AMQShortString name,
                               <span class="code-keyword">final</span> <span
class="code-object">boolean</span> durable,
                               <span class="code-keyword">final</span> AMQShortString
owner,
                               <span class="code-keyword">final</span> <span
class="code-object">boolean</span> autoDelete,
                               <span class="code-keyword">final</span> VirtualHost
virtualHost,
                               <span class="code-object">int</span> priorities)
            <span class="code-keyword">throws</span> AMQException
    {
        <span class="code-keyword">super</span>(name, durable, owner, autoDelete,
virtualHost, <span class="code-keyword">new</span> PriorityQueueList.Factory(priorities));
    }

    <span class="code-keyword">public</span> <span class="code-object">int</span>
getPriorities()
    {
        <span class="code-keyword">return</span> ((PriorityQueueList) _entries).getPriorities();
    }

    @Override
    <span class="code-keyword">protected</span> void checkSubscriptionsNotAheadOfDelivery(<span
class="code-keyword">final</span> QueueEntry entry)
    {
        <span class="code-comment">// check that all subscriptions are not in advance
of the entry
</span>        SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
        <span class="code-keyword">while</span>(subIter.advance() &amp;&amp;
!entry.isAcquired())
        {
            <span class="code-keyword">final</span> Subscription subscription
= subIter.getNode().getSubscription();
            QueueEntry subnode = subscription.getLastSeenEntry();
            <span class="code-keyword">while</span>(subnode != <span class="code-keyword">null</span>
&amp;&amp; entry.compareTo(subnode) &lt; 0 &amp;&amp; !entry.isAcquired())
            {
                <span class="code-keyword">if</span>(subscription.setLastSeenEntry(subnode,entry))
                {
                    <span class="code-keyword">break</span>;
                }
                <span class="code-keyword">else</span>
                {
                    subnode = subscription.getLastSeenEntry();
                }
            }

        }
    }

}
</pre>
</div></div>

<p>The constructor merely ensures passes up the machinery to ensure a PriorityQueueList
(as described above) is used for the underlying queueing model.  The getPriorities() method
is overridden by delegating to the PriorityQueueList and then the algorithm for updating the
subscriptions' pointers into the queue is implemented in checkSubscriptionsNotAheadOfDelivery.
 Thread-safe compare-and-swap operations are used to update the pointer in-case other threads
are also trying to move it; and the loop terminates early if the new QueueEntry has already
been acquired.</p>
     </div>
     <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>

       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation">View
Online</a>
       |
       <a href="http://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=2853062&revisedVersion=3&originalVersion=2">View
Change</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message