qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Current Architecture
Date Wed, 16 Sep 2009 11:28:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
     <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Current+Architecture">Current
Architecture</a></h2>
     <h4>Page <b>edited</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
     
          <br/>
     <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#CurrentArchitecture-Currentimplementation'>Current implementation</a></li>
    <li><a href='#CurrentArchitecture-Issues'>Issues</a></li>
    <li><a href='#CurrentArchitecture-Currentimplementation'>Current implementation</a></li>
<ul>
    <li><a href='#CurrentArchitecture-Broker'>Broker</a></li>
    <li><a href='#CurrentArchitecture-Clientconnectioncreation'>Client connection
creation</a></li>
    <li><a href='#CurrentArchitecture-Clientprocessing'>Client processing</a></li>
</ul>
</ul></div>

<h1><a name="CurrentArchitecture-Currentimplementation"></a>Current implementation</h1>

<p>Inside Qpid, data is read from a socket and placed in a buffer.  A separate thread
then takes this buffer and attempts to parse it as an AMQP command.  this AMQP command is
then put on a second buffer.  Finally a third thread reads the command and processes it.</p>

<p>Currently the two buffers between these three threads are unbounded.  This means
that data is read from the network as fast as possible with no regard as to whether the broker
has the capacity to process it.</p>

<p>Queues are themselves a kind of buffer between client applications.</p>

<p>From a queue the message can be assigned to be send to a client.  At this point a
delivery command is placed in another buffer awaiting sending on the network.  When received
by the client a similar process to receiving on the broker occurs</p>

<p>The whole process looks something like this</p>

<p> Client App sends message &#45;&gt; (MINA Buffer)<br/>
 &#45;&gt; MINA Thread takes message and sends to TCP &#45;&gt; (TCP Buffer)<br/>
 &#45;&gt; TCP places bytes on wire &#45;&gt;<br/>
~~~~~ Network  ~~~~~<br/>
 &#45;&gt; TCP reads from wire &#45;&gt; (TCP Buffer)<br/>
 &#45;&gt; MINA Reads from TCP &#45;&gt; (MINA Buffer)<br/>
 &#45;&gt; Bytes parsed and converted into AMQP Command &#45;&gt; (Job Queue
Buffer)<br/>
 &#45;&gt; AMQP Command processed, message placed on Queue &#45;&gt; (Queue
- which is a buffer)<br/>
 &#45;&gt; Message taken from queue and delivery command created &#45;&gt;
(MINA Buffer)<br/>
 &#45;&gt; MINA Thread takes message and sends to TCP &#45;&gt; (TCP Buffer)<br/>
 &#45;&gt; TCP places bytes on wire &#45;&gt;<br/>
~~~~~ Network  ~~~~~<br/>
 &#45;&gt; TCP reads from wire &#45;&gt; (TCP Buffer)<br/>
 &#45;&gt; MINA Reads from TCP &#45;&gt; (MINA Buffer)<br/>
 &#45;&gt; Bytes parsed and converted into AMQP Command &#45;&gt; (Job Queue
Buffer)<br/>
 &#45;&gt; AMQP Command processed, message placed on Delivery Queue &#45;&gt;
(Delivery Queue Buffer)<br/>
 &#45;&gt; Message received by client application code</p>

<p>Or, pictorally: <img src="/confluence/download/attachments/122360/Qpid-architecture.png"
align="absmiddle" border="0" /><br/>
&nbsp;</p>

<p>Of all the buffers above, only the TCP buffers are bounded (the Delivery Queue Buffer
in the client is potentially bounded by prefetch, although prefetch is not set on bytes but
on messages which may be of arbitrary size),  every other buffer is a potential source of
out of memory exceptions.</p>

<p>From the above we can see that there are many potential sources of OutOfMemoryExceptions.
 We need to consider where we may get unbounded growth, what scenarios will cause that, and
what other ways we have to mitigate those risks.</p>

<p>In general we get growth of the IO (MINA) buffers when sender and receiver are operating
at mismatched rates (i.e. the Client and Broker).  We will get unbounded growth of the queue
if the sending client is producing at a faster rate than the receiving client can process.</p>

<h1><a name="CurrentArchitecture-Issues"></a>Issues</h1>

<ol>
	<li>The current MINA networking uses unbounded buffers.</li>
	<li>We replace over a dozen MINA classes, none of which have any unit test coverage.
We failed to get our patches upstream and haven't attempted since then.</li>
	<li>Existing unit test coverage is minimal (approx 30%)</li>
	<li>Improving unit test coverage is difficult due to poor encapsulation</li>
	<li>Poor encapsulation has lead to tight coupling of MINA to server</li>
	<li>The current behaviour of send() leaves the potential for message loss when not
using transactions and violates JMS spec. Persistent messages which are held in either the
client or servers buffers before being written to disk can be lost.</li>
	<li>MINA's internal state is currently a black box, leaving no way to determine how
much memory is being used by an individual client connection.</li>
	<li>The way that we use MINA is suboptimal for our purpouses but is difficult to change
due to the tight coupling</li>
	<li>Supporting alternative transport layers is impossible due to tight coupling of
MINA (OSI layer 4) with the AMQP handlers (OSI layer 7).</li>
</ol>


<h1><a name="CurrentArchitecture-Currentimplementation"></a>Current implementation</h1>

<h3><a name="CurrentArchitecture-Broker"></a>Broker</h3>

<p>Currently the broker decodes the incoming network data, adds the frames to a Job
queue which are then processed as Events by AMQPFastProtocolHandler which passes the majority
of the work to AMQMinaProtocolSession. Often this results in a FrameHandler being called.
On the outbound route Frames are written to AMQMinaProtocolSession which calls IoSession.writeFrame
which passes the data to Mina for writing to the wire. </p>

<p><img src="/confluence/download/attachments/122360/broker-0.5-network.png" align="absmiddle"
border="0" /></p>

<h3><a name="CurrentArchitecture-Clientconnectioncreation"></a>Client connection
creation</h3>

<p>When the client creates a connection it creates an AMQConnectionDelegate for the
protocol version it requires and  passes the new protocol handler to TransportConnection which
creates a socket of the requested type (new TCP socket, existing TCP socket or InVM). It then
attaches the socket to the protocol handler which init()s a new ProtocolSession which begins
version negotiation with the broker. </p>

<p><img src="/confluence/download/attachments/122360/client-0.5-connection-creation.png"
align="absmiddle" border="0" /></p>

<h3><a name="CurrentArchitecture-Clientprocessing"></a>Client processing</h3>

<p>Once a socket has been opened the client processes data similarly to the broker,
decoding frames using AMQDecoder and passing them to AMQProtocolHandler which, normally, calls
a frame handler to perform the actual work. If this frame is one which has a listener waiting
for it, those listeners are notified. </p>

<p>Outgoing data is generated in AMQSession or it's delegate and written to AMQProtocolHandler,
optionally with a return frame to wait for. This is passed to Mina directly. </p>

<p><img src="/confluence/download/attachments/122360/client-0.5-network-processing.png"
align="absmiddle" border="0" /></p>

<p>If the frame is a BasicDeliver containing message payload, it adds an UnprocessedMessage
to the session which then waits for the ContentHeaderBody and ContentBody payloads to arrive.
Once all the expected bodies have been recieved, the complete message is given to the AMQSession
for that channel. </p>

<p>The AMQSession instance adds the message to it's internal delivery queue and any
locks waiting on the queue are notified. The Dispatcher thread takes the message and delivers
it to one of the consumers. </p>

<p>The BasicMessageConsumer converst the UnprocessedMessage to an AbstractJMSMessage
and then either delivers it to MessageListener if one has been set or stores it on an queue
which is popped when the application calls the consumers recieve() method.</p>
     </div>
     <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>

       <a href="http://cwiki.apache.org/confluence/display/qpid/Current+Architecture">View
Online</a>
       |
       <a href="http://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=122360&revisedVersion=11&originalVersion=10">View
Change</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Current+Architecture?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message