qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Qpid Design - Queue Implementation
Date Wed, 02 Sep 2009 14:30:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation">Qpid
Design - Queue Implementation</a></h2>
    <h4>Page  <b>added</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
         <br/>
    <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#QpidDesign-QueueImplementation-StrictOrdering'>Strict Ordering</a></li>
    <li><a href='#QpidDesign-QueueImplementation-Enqueing'>Enqueing</a></li>
</ul></div>

<h2><a name="QpidDesign-QueueImplementation-StrictOrdering"></a>Strict Ordering</h2>

<p>The fundamental principal of the Queuing model is that the queue provides a strict
order on the messages being enqueued.  Furthermore that order is maintained through the lifetime
of the entries on the queue: thus if a message is returned (e.g. the prefetched messages being
released upon the consumer closing) the order of that message with respect to other messages
on the queue is maintained.  </p>

<p>The strict ordering is enforced by the use of a queue data-structure.  In order for
this to be performant, the data structure uses a lockless thread-safe designed based around
the same algorithm used in the java.util.concurrent.ConcurrentLinkedList (more precisely it
is based on the public domain implementation in the backport util concurrent project).  See
the section on Concurrent List implementations for more details.</p>

<p>Each subscription keeps a "pointer" into the list denoting the point at which that
particular subscription has reached.  A particular subscription will only deliver a message
if it is the next AVAILABLE entry on the queue after the pointer which it maintains which
matches any selection criteria the subscription may have.</p>

<p>Thread safety is maintained by using the thread-safe atomic compare-and-swap operations
for maintaining queue entry state (as described above) and also for updating the pointer on
the subscription.  The queue is written so that many threads may be simultaneously attempting
to perform deliveries simultaneously on the same messages and/or subscriptions.</p>

<h2><a name="QpidDesign-QueueImplementation-Enqueing"></a>Enqueing</h2>

<p>When a message is enqueued (using the enqueue() method on the AMQQueue implementation)
it is first added to the tail of the list.  Then the code iterates over the subscriptions
(starting at the last subscription the queue was known to have delivered for reasons of fairness).
 For each subscription found it attempts delivery (details describe below).  If the message
cannot be delivered to any subscription then the "immediate" flag on the message is inspected.
 If the message required immediate delivery then the message is immediately dequeued, otherwise
an asynchronous job is created to attempt delivery at a later point.</p>

<p>(Note there is a "shortcut" path for queues which have an exclusive subscriber. 
In this case we know there is one and only one subscriber and so we can go directly to trying
to deliver to it without worrying about iterators, etc.)</p>

<p>Potential Issue:  Looking at the code which performs the check of the immediate flag
I believe there is a race condition:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
        <span class="code-keyword">if</span> (entry.immediateAndNotDelivered())
        {
            dequeue(storeContext, entry);
            entry.dispose(storeContext);
        }
</pre>
</div></div>

<p>This does not look to be safe in the case where there is a simultaneous execution
of an asynchronous deliver which may acquire the message between the check of immediateAndDelivered
and dequeue.  Instead of calling dequeue directly we should instead do a safe compare-and-swap
test to make sure the entry state is "AVAILABLE" before setting it to DEQUEUED.  The implementation
of this should probably look much like the implementation of entry.dequeue except for the
different expected starting state.<br/>
Immediate Delivery</p>

<p>For each subscription to the queue, we call the following code:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">private</span> void deliverToSubscription(<span
class="code-keyword">final</span> Subscription sub, <span class="code-keyword">final</span>
QueueEntry entry)
            <span class="code-keyword">throws</span> AMQException
    {

        sub.getSendLock();
        <span class="code-keyword">try</span>
        {
            <span class="code-keyword">if</span> (subscriptionReadyAndHasInterest(sub,
entry)
                &amp;&amp; !sub.isSuspended())
            {
                <span class="code-keyword">if</span> (!sub.wouldSuspend(entry))
                {
                    <span class="code-keyword">if</span> (!sub.isBrowser() &amp;&amp;
!entry.acquire(sub))
                    {
                        <span class="code-comment">// restore credit here that would
have been taken away by wouldSuspend since we didn't manage
</span>                        <span class="code-comment">// to acquire the entry
<span class="code-keyword">for</span> <span class="code-keyword">this</span>
subscription
</span>                        sub.restoreCredit(entry);
                    }
                    <span class="code-keyword">else</span>
                    {

                        deliverMessage(sub, entry);

                    }
                }
            }
        }
        <span class="code-keyword">finally</span>
        {
            sub.releaseSendLock();
        }
    }

</pre>
</div></div>

<p>This code first takes a lock on the subscriber (this prevents it being removed while
we are carrying out this operation).  It then tests if the given subscription can take this
message at the moment (see below for more details).  It then further tests if there is enough
flow control credit to send this message to this subscription.  If there is credit (and the
subscription is not a "browser" then is attempts to acquire the entry ( entry.acquire(sub)
).  If the acquisition is successful (or if the subscription is a browser and thus does not
need to acquire the entry) then the entry is delivered to the subscription, else the credit
that would have been used by the message if sent is restored.</p>

<p>The most interesting method called in the above is subscriptionReadyAndHasInterest(sub,
entry):</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">private</span> <span class="code-object">boolean</span>
subscriptionReadyAndHasInterest(<span class="code-keyword">final</span> Subscription
sub, <span class="code-keyword">final</span> QueueEntry entry)
    {
        <span class="code-comment">// We need to move <span class="code-keyword">this</span>
subscription on, past entries which are already acquired, or deleted or ones it has no
</span>        <span class="code-comment">// interest in.
</span>        QueueEntry node = sub.getLastSeenEntry();
        <span class="code-keyword">while</span> (node != <span class="code-keyword">null</span>
&amp;&amp; (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
        {

            QueueEntry newNode = _entries.next(node);
            <span class="code-keyword">if</span> (newNode != <span class="code-keyword">null</span>)
            {
                sub.setLastSeenEntry(node, newNode);
                node = sub.getLastSeenEntry();
            }
            <span class="code-keyword">else</span>
            {
                node = <span class="code-keyword">null</span>;
                <span class="code-keyword">break</span>;
            }

        }

        <span class="code-keyword">if</span> (node == entry)
        {
            <span class="code-comment">// If the first entry that subscription can process
is the one we are trying to deliver to it, then we are
</span>            <span class="code-comment">// good
</span>            <span class="code-keyword">return</span> <span class="code-keyword">true</span>;
        }
        <span class="code-keyword">else</span>
        {
            <span class="code-keyword">return</span> <span class="code-keyword">false</span>;
        }

    } 

</pre>
</div></div>

<p>Here we see how the subscription is inspected to see where its pointer into the queue
(the last seen entry) is in respect to the entry we are trying to deliver.  We start from
the subscription's current lastSeenEntry and work our way down the list passing over entries
which are already acquired by other subscriptions, deleted, or which this subscription has
no interest in (e.g. because the node does not meet the subscription's selection criteria);
all the while we can update the lastSeenEntry to take it past the entries this subscription
has now inspected.  Performing this iteration we will eventually arrive at the next entry
the subscription is interested in (or just fall off the end of the list).  At this point either
the next entry that the subscription is interested in is the entry we wish to deliver (success!)
or not.</p>

    </div>
    <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>
       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation">View
Online</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Queue+Implementation?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
           </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message