qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Qpid Design - Message Delivery
Date Wed, 02 Sep 2009 15:34:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Message+Delivery">Qpid
Design - Message Delivery</a></h2>
    <h4>Page  <b>added</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
         <br/>
    <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#QpidDesign-MessageDelivery-AsynchronousDelivery'>Asynchronous
Delivery</a></li>
    <li><a href='#QpidDesign-MessageDelivery-Subscriptions'>Subscriptions</a></li>
    <li><a href='#QpidDesign-MessageDelivery-Removal'>Removal</a></li>
    <li><a href='#QpidDesign-MessageDelivery-FlowControl'>Flow Control</a></li>
    <li><a href='#QpidDesign-MessageDelivery-Acknowledgement'>Acknowledgement</a></li>
    <li><a href='#QpidDesign-MessageDelivery-RejectandRelease'>Reject and Release</a></li>
</ul></div>

<h2><a name="QpidDesign-MessageDelivery-AsynchronousDelivery"></a>Asynchronous
Delivery</h2>

<p>If there are no subscriptions that can currently take delivery of a message then
we need to schedule an asynchronous delivery.  While the code is thread safe and could cope
with multiple threads performing asynchronous delivery simultaneously, we limit ourselves
to only having one asynchronous delivery job scheduled at any one time, so as not to overwhelm
the broker:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">public</span> void deliverAsync()
    {
        _stateChangeCount.incrementAndGet();

        Runner runner = <span class="code-keyword">new</span> Runner();

        <span class="code-keyword">if</span> (_asynchronousRunner.compareAndSet(<span
class="code-keyword">null</span>, runner))
        {
            _asyncDelivery.execute(runner);
        }
    } 
</pre>
</div></div>

<p>Here we first increment our count of "stateChanges".  This provides us with a way
of knowing between loops of the asynchronous delivery thread whether anything else has happened
that makes it worth our while running the asynchronous delivery loop again (in effect it prevents
us having to always add another thread to cope with race conditions where we want to start
the async delivery just as it is ending).  We then create a new instance of the asynchronous
delivery "Runner", and attempt to make this instance the current one by means of the ubiquitous
compare-and-swap operation.  Here we test if we are the thread that moved the queue from having
no asynchronous runner to having one; and if so we need to schedule the runner to execute
by way of calling _asyncDelivery.execute(runner). </p>

<p>The actual work of the asynchronous delivery is done in the processQueue(Runnable
runner) method.  </p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">private</span> void processQueue(<span class="code-object">Runnable</span>
runner) <span class="code-keyword">throws</span> AMQException
    {
        <span class="code-object">long</span> stateChangeCount;
        <span class="code-object">long</span> previousStateChangeCount = <span
class="code-object">Long</span>.MIN_VALUE;
        <span class="code-object">boolean</span> deliveryIncomplete = <span
class="code-keyword">true</span>;

        <span class="code-object">int</span> extraLoops = 1;
        <span class="code-object">int</span> deliveries = MAX_ASYNC_DELIVERIES;

        _asynchronousRunner.compareAndSet(runner, <span class="code-keyword">null</span>);

        <span class="code-keyword">while</span> (deliveries != 0 &amp;&amp;

               ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get()))
|| deliveryIncomplete) 
               &amp;&amp; _asynchronousRunner.compareAndSet(<span class="code-keyword">null</span>,
runner))
        {
            <span class="code-comment">// we want to have one extra loop after every
subscription has reached the point where it cannot move
</span>            <span class="code-comment">// further, just in <span class="code-keyword">case</span>
the advance of one subscription in the last loop allows a different subscription to
</span>            <span class="code-comment">// move forward in the next iteration
</span>
            <span class="code-keyword">if</span> (previousStateChangeCount !=
stateChangeCount)
            {
                extraLoops = 1;
            }

            previousStateChangeCount = stateChangeCount;
            deliveryIncomplete = _subscriptionList.size() != 0;
            <span class="code-object">boolean</span> done = <span class="code-keyword">true</span>;
</pre>
</div></div>

<p>In this first fragment of the method we see the constraint on how long the asynchronous
delivery will keep attempting to deliver more messages.  </p>

<p>The first constraint "deliveries != 0" is testing a countdown value "deliveries"
which is intialised with an initial maximum (currently set to 10): every successful delivery
the thread makes decrements this counter.  This implements a limit on how long the processQueue
method will be allowed to run for, stopping this queue from starving other queues of processor
time.  At the end, if this countdown was the factor to cause the loop to terminate, the asynchronous
delivery is scheduled to run again.</p>

<p>The second constraint "((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get()))
|| deliveryIncomplete) " is testing whether there is provably nothing left to do on this queue.
 The first half tests if there have been any changes since the last iteration that have incremented
that state change count (and thus require another loop), the second half says, "even if there
haven't been any changes keep looping if last time round we thought there was still more to
do".</p>

<p>The final constraint "_asynchronousRunner.compareAndSet(null, runner))" is our familiar
compare-and-swap operation ensuring that this is the designated instance of the asynchronous
processer running.  </p>

<p>This loop runs, attempting to deliver one message in each iteration:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
            SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
            <span class="code-comment">//iterate over the subscribers and <span class="code-keyword">try</span>
to advance their pointer
</span>            <span class="code-keyword">while</span> (subscriptionIter.advance())
            {
                <span class="code-object">boolean</span> closeConsumer = <span
class="code-keyword">false</span>;
                Subscription sub = subscriptionIter.getNode().getSubscription();
                <span class="code-keyword">if</span> (sub != <span class="code-keyword">null</span>)
                {
</pre>
</div></div>

<p>Iterate over the subscriptions on the queue...</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                    sub.getSendLock();
</pre>
</div></div>
<p>Lock the subscription so it does not get deleted while attempting to delvier to it.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                    <span class="code-keyword">try</span>
                    {
                        QueueEntry node = moveSubscriptionToNextNode(sub);
</pre>
</div></div>
<p>Find the next node which the subscription should try to deliver by skipping over
already acquired entries, if it is null then this subscription is at the tail of the queue.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                        <span class="code-keyword">if</span> (node != <span
class="code-keyword">null</span> &amp;&amp; sub.isActive())
                        {
Keep a track of whether <span class="code-keyword">this</span> subscription is
really active and whether we managed to advance the pointer on <span class="code-keyword">this</span>
subscription in <span class="code-keyword">this</span> loop (these values go into
determining <span class="code-keyword">if</span> there is anything left to <span
class="code-keyword">do</span> in a <span class="code-keyword">new</span>
loop).

                            <span class="code-object">boolean</span> advanced
= <span class="code-keyword">false</span>;
                            <span class="code-object">boolean</span> subActive
= <span class="code-keyword">false</span>;

                            <span class="code-keyword">if</span> (!(node.isAcquired()
|| node.isDeleted()))
                            {
                                <span class="code-keyword">if</span> (!sub.isSuspended())
                                {
The node is not yet acquired or deleted, and we can now be sure the subscription is active.
                                    subActive = <span class="code-keyword">true</span>;
                                    <span class="code-keyword">if</span> (sub.hasInterest(node))
                                    {
The following code is similar to that in the deliverToSubscription method described previously.
 It should be possible to factor <span class="code-keyword">this</span> out. 
The primary difference is the behaviour with a browser where need to explicitly note that
we have advanced.

                                        <span class="code-keyword">if</span> (!sub.wouldSuspend(node))
                                        {
                                            <span class="code-keyword">if</span>
(!sub.isBrowser() &amp;&amp; !node.acquire(sub))
                                            {
                                                sub.restoreCredit(node);

                                            }
                                            <span class="code-keyword">else</span>
                                            {
                                                deliverMessage(sub, node);
                                                deliveries--;

                                                <span class="code-keyword">if</span>
(sub.isBrowser())
                                                {
                                                    QueueEntry newNode = _entries.next(node);

                                                    <span class="code-keyword">if</span>
(newNode != <span class="code-keyword">null</span>)
                                                    {
                                                        sub.setLastSeenEntry(node, newNode);
                                                        node = sub.getLastSeenEntry();
                                                        advanced = <span class="code-keyword">true</span>;
                                                    }

                                                }
                                            }
                                            done = <span class="code-keyword">false</span>;
                                        }
                                        <span class="code-keyword">else</span>
<span class="code-comment">// Not enough Credit <span class="code-keyword">for</span>
message and wouldSuspend
</span>                                        {
</pre>
</div></div>

<p>This case covers the scenario where we are using bytes based flow control, and the
currently available credit is less than the size of the next message.  We need to wait either
for the credit to be increased (which will cause a state change event) or the entry to be
picked off by another subscription (which we capture with the state change listener)</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                                            <span class="code-comment">//QPID-1187 -
Treat the subscription as suspended <span class="code-keyword">for</span> <span
class="code-keyword">this</span> message
</span>                                            <span class="code-comment">//
and wait <span class="code-keyword">for</span> the message to be removed to <span
class="code-keyword">continue</span> delivery.
</span>                                            subActive = <span class="code-keyword">false</span>;

                                            node.addStateChangeListener(<span class="code-keyword">new</span>
QueueEntryListener(sub, node));
                                        }
                                    }
                                    <span class="code-keyword">else</span>
                                    {
                                        <span class="code-comment">// <span class="code-keyword">this</span>
subscription is not interested in <span class="code-keyword">this</span> node
so we can skip over it
</span>                                        QueueEntry newNode = _entries.next(node);
                                        <span class="code-keyword">if</span> (newNode
!= <span class="code-keyword">null</span>)
                                        {
                                            sub.setLastSeenEntry(node, newNode);
                                        }
                                    }
                                }
                            }
</pre>
</div></div>

<p>Here we calculate if there is anything left to do on this particular subscription.
 If we are at the tail of the subscription, or the subscription is no longer active, then
this subscription can be considered done.  If all subscriptions are done then we are truly
finished.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                            <span class="code-keyword">final</span> <span class="code-object">boolean</span>
atTail = (_entries.next(node) == <span class="code-keyword">null</span>);
                            done = done &amp;&amp; (!subActive || atTail);
</pre>
</div></div>

<p>Here calculate if we need to auto-close the subscription - we do this if we are at
the tail of the queue, and we didn't advance in this iteration and this is an auto-close subscription.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                            closeConsumer = (atTail &amp;&amp; !advanced &amp;&amp;
sub.isAutoClose());
                        }
                    }
                    <span class="code-keyword">finally</span>
                    {
                        sub.releaseSendLock();
                    }

                    <span class="code-keyword">if</span> (closeConsumer)
                    {
                        unregisterSubscription(sub);

                        ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
                        converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(),
sub.getConsumerTag());
                    }

                } 
</pre>
</div></div>

<p>This ends the iteration over subscriptions, now we calculate if we believe there
we should try iterating over the subscriptions again.  We use the value of "done" we calculated
while iterating over the subscriptions to determine if we need to loop again.  If we believe
we are done and we have already used our "extra" loop then we can stop.  If we are "done"
but we have not yet used the extra loop, then we decrement the extra loop counter (setting
it to 0 might be clearer since 0 and 1 are the only valid values) and go round one more time.
 If we are not done then we restore extraLoops to 1.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
                <span class="code-keyword">if</span> (done)
                {
                    <span class="code-keyword">if</span> (extraLoops == 0)
                    {
                        deliveryIncomplete = <span class="code-keyword">false</span>;
                    }
                    <span class="code-keyword">else</span>
                    {
                        extraLoops--;
                    }
                }
                <span class="code-keyword">else</span>
                {
                    extraLoops = 1;
                }
            }
            _asynchronousRunner.set(<span class="code-keyword">null</span>);
        } 
</pre>
</div></div>

<p>This ends the the "while(..." loop.  The final action in the asynchronous process
is to determine if we need to schedule ourselves for another execution:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
        <span class="code-comment">// If deliveries == 0 then the limitting factor was
the time-slicing rather than available messages or credit
</span>        <span class="code-comment">// therefore we should schedule <span
class="code-keyword">this</span> runner again (unless someone beats us to it :-)
).
</span>        <span class="code-keyword">if</span> (deliveries == 0 &amp;&amp;
_asynchronousRunner.compareAndSet(<span class="code-keyword">null</span>, runner))
        {
            _asyncDelivery.execute(runner);
        }
    }
</pre>
</div></div>

<h2><a name="QpidDesign-MessageDelivery-Subscriptions"></a>Subscriptions</h2>

<p>Subscription model the entities created by the receiving of a "Basic.Consume" event
in AMQP0-8/0-9.  That is they represent  a relationship between an AMQP Channel (equivalent
to a Java JMS Session) and a queue.  As messages are placed on the queue, the queue takes
responsibility for as quickly as possible finding a subscriber which is willing to take the
message.  The subscriber is responsible for delivering the message to the receiving client.
 As outlined above, a significant change introduced by the refactoring is that the Subscriptions
now maintain state representing a pointer into the queue.  This pointer represents the current
position where the subscription can guarantee that no message prior to that is of interest
to it.  Generally this pointer only ever moves forward through the queue (see the section
on reject and release for the exception to this rule).  This is the only dynamic state maintained
directly by the subscription.</p>

<p>Different subclasses of SubscriptionImpl are used to model the different behviour
associated with different acknowledgement modes.  The subclasses used are AckSubscription,
NoAckSubscription, BrowserSubscription and GetNoAckSubscription.  The last of these is a special
implementation which is used to model a Basic.Get command as a temporary subscription that
can only ever receive one message.  Modelling Get in this way mirrors how the same semantics
are implemented in 0-10 and removes having two separate ways to dequeue messages from the
queue.<br/>
Adding</p>

<p>When a "Basic.Consume" event is processed the subscription is added to the list of
subscriptions on the queue, the "pointer" in the subscription is set to point at the head
of the list of queue entries, and then an asynchronous job is kicked off to deliver to that
subscription as many messages as can be delivered starting at the head.  This uses an algorithm
almost identical to that described above to asynchronous message delivery, except it only
considers the one subscription.  This is found within the "flushSubscription" method of the
queue (flushing a subscription is a 0-10 concept where you attempt to send as much as possible
to a given subscription and then signal completion when either the subscription's credit runs
out or there are no more messages on the queue).</p>

<p>Future Improvement: Factor out the common code between flushSubscription and processQueue.</p>

<h2><a name="QpidDesign-MessageDelivery-Removal"></a>Removal</h2>

<p>A consumer is removed either through the reception of a "Basic.Cancel" event or through
the closure of the encapsulating channel.  For thread safety, the first action is to remove
the subscription from the list of subscriptions that the asynchronous delivery task iterates
over.  Next the subscription's close() method is called.  This takes out a lock on the subscription
(to avoid conflicting with any attempt to concurrently send to the subscription) and changes
the subscription's state to "closed".  The combination of these steps allow us to assert that
after that point in time the subscription will not be used by any other threads to attempt
to deliver messages.  Next the subscription's pointer into the queue is null-ed out in a thread-safe
way - this is done to prevent memory leaks due to references being held to points in the queue
(due to the way that the concurrent-safe queues work "deleted" elements may not be eligible
for garabage collection for some time).</p>

<p>Finally, if the queue is of "auto-delete" type and the subscription being removed
is the last subscription attached to the queue, then the queue needs to be deleted.</p>

<h2><a name="QpidDesign-MessageDelivery-FlowControl"></a>Flow Control</h2>

<p>There are now concrete classes modeling the behaviour of the flow control algorithm.
 These flow control managers are set at the subscription level. For AMQP 0-8 and 0-9 flow
control still happens at a per-channel level, so the same instance of the flow control manager
is shared between all subscriptions on a channel.  For 0-10 implementations we will be able
to use the same code to implement the per-subscription flow-control model that it utilizes.</p>

<h2><a name="QpidDesign-MessageDelivery-Acknowledgement"></a>Acknowledgement</h2>

<h2><a name="QpidDesign-MessageDelivery-RejectandRelease"></a>Reject and
Release</h2>

<p>Messages delivered to a subscription may subsequently be returned to the queue either
explicitly (by use of a reject command) or implicitly (by the closure of the channel).  In
this case the message must be made available again to subscribers to the queue.  The issue
here is that the pointers held by the subscriptions are likely to be in advance of the point
to which the message is being returned.  Thus for each message that is returned we must iterate
over all subscribers to the queue, and if their current pointer is in advance of the returned
message it must be moved back such that the next entry that that subscriber sees is the returned
message.  We do not reset the pointer for browsing consumers however as doing so would lead
to all the browsed messages that are after the returned message in the queue being redelivered
to the browsing subscription.</p>
    </div>
    <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>
       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Message+Delivery">View
Online</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Qpid+Design+-+Message+Delivery?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
           </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message