qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Java Broker - AMQP0-9 Tactical Producer Flow Control
Date Mon, 17 Aug 2009 14:53:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Java+Broker+-+AMQP0-9+Tactical+Producer+Flow+Control">Java
Broker - AMQP0-9 Tactical Producer Flow Control</a></h2>
    <h4>Page  <b>added</b> by             <a href="http://cwiki.apache.org/confluence/display/~godfrer">Rob
Godfrey</a>
    </h4>
         <br/>
    <div class="notificationGreySide">
         <h2><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-ProblemStatement"></a>Problem
Statement</h2>

<p>The Java Broker currently performs no throttling of producing clients.&nbsp;
In combination with the way that the Java Broker holds every transient message in memory until
consumed, we can encounter scenarios where the Java broker runs out of heap space.&nbsp;
For example, if a producer P sends messages at a rate of 100msg/s to a queue Q, but the only
consumer, C, of queue Q processes messages at a rate of 10msg/s, then Q will grow at a rate
of 90msg/s until such time as the broker runs out of heap space.&nbsp;</p>

<p>Tactically we may attempt to solve the problem of Queues becoming overfull, and thus
causing out of memory exceptions, without attempting to solve the totality of out of memory
issues.&nbsp;</p>


<h2><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-Analysis"></a>Analysis</h2>

<p>AMQP0-8/0-9/0-9-1 provides no mechanism for throttling producers of messages based
on credit (either for a given destination, or even at the granualrity of a session).&nbsp;
There are two mechanisms available to throttle a producing client - the use of TCP flow control,
and the use of the AMQP Channel.Flow command.&nbsp;</p>

<p>The use of TCP flow control throttles the producer to the rate at which the Broker
can process the incomming messages, but does not address the throttling of the producer to
the consumption of messages by a third part consuming client.</p>

<p>The Channel.Flow command instructs the recipient to either cease (or resume) sending
messages.&nbsp; The receiver of the command should send Channel.Flow-ok once the flow
command has been received.</p>

<p>In AMQP0-9-1&nbsp; and earlier we cannot determine prior to a producer sending
a message, which queues a producer wishes to send to.&nbsp; Thus we are limitted in general
to a reactive flow control - that is, when a producer attempts to send to an overful queue
we can request that the sender send no more messages, my issuing a Channel.Flow.&nbsp;
Further, since many messages may already be "on-the-wire" by the time our Channel.Flow is
received, we cannot guarantee by how much the producer may "overfill" the queue before it
ceases publishing.</p>

<h2><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-Proposal"></a>Proposal</h2>

<p>Allow each queue on the Java Broker to be configured with a "full" size.&nbsp;
Implement flow control such that the publisher of a message which is enqueued on a "full"
queue is immediately sent a Channel.Flow command to cease publication.&nbsp; Monitor queue
sizes such that when an "overfull" Queue has available space, then sessions which are blocked
waiting for this event are free to send messages again.&nbsp;</p>

<p>Ensure that the Java Client respects the Channel.Flow command, and causes all attempts
to send Messages to block, until the session is unflowed.</p>

<h2><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-Design"></a>Design</h2>

<h3><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-BrokerChanges"></a>Broker
Changes</h3>

<p>Add the following configurable properties to Queues:</p>

<p><b>capacity</b>: size in bytes at which the queue is thought to be full
(and thus publishes which send messages which take the total queue size above this mark will
be blocked).&nbsp; Default 0 (no maximum)</p>


<p><b>flowResumeCapacity</b>: the queue size at which producers are unflowed
(defaulted to <b>capacity</b>)</p>


<p>Like other such values these may be set on individual queues in the config, or on
a per-virtualhost basis.</p>


<p>Alter the following files in the org.apache.qpid.server.configuration package to
set the queue properties based on this configuration:</p>


<p>VirtualHostConfiguration<br/>
QueueConfiguration<br/>
ServerConfiguration</p>

<p>Alter the AMQQueue.java interface to add the following method</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">    /** Post enqueue check to ensure that the queue is not overfull.
 If the queue is overfull
        then request the channel to begin flow control */

    void checkCapacity(AMQChannel channel);
</pre>
</div></div>

<p>Update the following two classes in package org.apache.qpid.server.txn to call checkCapacity
on the queue after they have enqueued a message</p>

<p>LocalTransactionalContext<br/>
NonTransactionalContext</p>

<p>Add the following code to AMQChannel in package org.apache.qpid.server</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">    /** The set of queues on which the session is currently blocking.
 Only a session blocking on no queues can be unblocked */
    <span class="code-keyword">private</span> <span class="code-keyword">final</span>
ConcurrentMap&lt;AMQQueue, <span class="code-object">Boolean</span>&gt;
_blockingQueues = <span class="code-keyword">new</span> ConcurrentHashMap&lt;AMQQueue,
<span class="code-object">Boolean</span>&gt;();

    /** Toggle to indicate whether the session is currently being blocked by an overfull queue
condition or not */
    <span class="code-keyword">private</span> <span class="code-keyword">final</span>
AtomicBoolean _blocking = <span class="code-keyword">new</span> AtomicBoolean(<span
class="code-keyword">false</span>);


    /** Add the given queue to the set of those which the session is blocking on (ignore <span
class="code-keyword">if</span> we are already blocking on <span class="code-keyword">this</span>
queue)
        <span class="code-keyword">if</span> <span class="code-keyword">this</span>
moves us from being unblocked to blocked, issue a flow command */
    <span class="code-keyword">public</span> void block(AMQQueue queue)
    {
        <span class="code-keyword">if</span>(_blockingQueues.putIfAbsent(queue,
<span class="code-object">Boolean</span>.TRUE) == <span class="code-keyword">null</span>)
        {
            <span class="code-keyword">if</span>(_blocking.compareAndSet(<span
class="code-keyword">false</span>,<span class="code-keyword">true</span>))
            {
                flow(<span class="code-keyword">false</span>);
            }
        }
    }

    /** Remove the given queue to the set of those which the session is blocking on (ignore
<span class="code-keyword">if</span> we are no longer blocking on <span class="code-keyword">this</span>
queue)
        If <span class="code-keyword">this</span> moves us from a blocking to
an unblocked condition, allow client to resume publishing by issuing a flow */
    <span class="code-keyword">public</span> void unblock(AMQQueue queue)
    {
        <span class="code-keyword">if</span>(_blockingQueues.remove(queue))
        {
            <span class="code-keyword">if</span>(_blocking.compareAndSet(<span
class="code-keyword">true</span>,<span class="code-keyword">false</span>))
            {
                flow(<span class="code-keyword">true</span>);
            }
        }
    }

    /** Send a Channel.Flow command to the client */
    <span class="code-keyword">private</span> void flow(<span class="code-object">boolean</span>
flow)
    {
        MethodRegistry methodRegistry = _session.getMethodRegistry();
        AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
        _session.writeFrame(responseBody.generateFrame(_channelId));
    }

</pre>
</div></div>

<p>Modify SimpleAMQQueue to perform the capacity check, and also to unblock blocked
channels when the queue reduces in size.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">    <span class="code-keyword">private</span> <span
class="code-keyword">final</span> ConcurrentMap&lt;AMQChannel, <span class="code-object">Boolean</span>&gt;
_blockedChannels = <span class="code-keyword">new</span> ConcurrentHashMap&lt;AMQChannel,
<span class="code-object">Boolean</span>&gt;();


    <span class="code-keyword">public</span> void checkCapacity(AMQChannel channel)
    {
        <span class="code-keyword">if</span>(_capcity != 0L &amp;&amp;
_atomicQueueSize.get() &gt; _capacity)
        {
            <span class="code-keyword">if</span>(_blockedChannels.putIfAbsent(channel,
<span class="code-object">Boolean</span>.TRUE)==<span class="code-keyword">null</span>)
            {
                channel.block(<span class="code-keyword">this</span>);
            }

            <span class="code-comment">// guard against race condition where consumer
takes messages, decreasing queue size message
</span>            <span class="code-comment">// but not seeing that the queue
was blocked so not issuing unblock
</span>            <span class="code-keyword">if</span>(_atomicQueueSize.get()
&lt;= _flowResumeCapacity)
            {
               channel.unblock(<span class="code-keyword">this</span>);
               _blockedChannels.remove(channel);

            }

        }
    }

    <span class="code-keyword">private</span> void decrementQueueSize(<span
class="code-keyword">final</span> QueueEntry entry)
    {
        getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
        checkFreeCapacity();
    }

    <span class="code-keyword">private</span> void checkFreeCapacity()
    {
        <span class="code-keyword">if</span>(_capacity != 0L &amp;&amp;
!_blockedChannels.isEmpty() &amp;&amp; _atomicQueueSize.get() &lt;= _flowResumeCapacity)
        {
            <span class="code-keyword">for</span>(AMQChannel c : _blockedChannels.keySet())
            {
                c.unblock(<span class="code-keyword">this</span>);
                _blockedChannels.remove(c);
            }
        }
    }


</pre>
</div></div>

<p>&nbsp;&nbsp;&nbsp;&nbsp;</p>

<h3><a name="JavaBroker-AMQP0-9TacticalProducerFlowControl-ClientChanges"></a>Client
Changes</h3>

<p>Hook in the existing handler for ChannelFlow commands by altering the dispatchChannelFlow
method in the ClientMethodDispatcherImpl class</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">    <span class="code-keyword">public</span> <span
class="code-object">boolean</span> dispatchChannelFlow(ChannelFlowBody body, <span
class="code-object">int</span> channelId) <span class="code-keyword">throws</span>
AMQException
    {
        _channelFlowMethodHandler.methodReceived(_session, body, channelId);
        <span class="code-keyword">return</span> <span class="code-keyword">true</span>;
    }
</pre>
</div></div>
    </div>
    <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>
       <a href="http://cwiki.apache.org/confluence/display/qpid/Java+Broker+-+AMQP0-9+Tactical+Producer+Flow+Control">View
Online</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Java+Broker+-+AMQP0-9+Tactical+Producer+Flow+Control?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
           </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message