qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Current and proposed network interfaces
Date Mon, 27 Jul 2009 16:53:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
     <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Current+and+proposed+network+interfaces">Current
and proposed network interfaces</a></h2>
     <h4>Page <b>edited</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
     
          <br/>
     <div class="notificationGreySide">
         <h2><a name="Currentandproposednetworkinterfaces-Purpose"></a>Purpose</h2>

<p>This design page describes the low level design for the new interface which is aimed
at facilitating encapsulation for the Network code in both the Java Broker &amp; Client.
</p>

<p>This is the first step in decoupling the exsiting IO layer from both the surrounding
Qpid code and more specifically from the current tie-in to MINA.</p>

<p>This document will provide sufficient information for architecture review and also
for input to task breakdown &amp; planning.</p>

<h2><a name="Currentandproposednetworkinterfaces-InterfaceRequirements"></a>Interface
Requirements</h2>

<ol>
	<li>Provide an API which supports pluggable network layers</li>
	<li>Facilitate the replacement of instantiations of MINA classes with an abstraction</li>
	<li>Expose methods to set TCP options (see main design doc for details)</li>
	<li>Provide support for configuration of related properties including buffer size</li>
	<li>Protocol driver should be non-blocking</li>
	<li>Network interface and drivers should be thread model agnostic</li>
	<li>Data will be passed around as byte arrays</li>
	<li>The Protocol driver will not perform any buffering</li>
	<li>Buffering will be the responsibility of the Network Driver</li>
	<li>Initial designs will only support TCP (see main design doc for info)</li>
	<li>The interface will support an SSLEngine</li>
</ol>


<h2><a name="Currentandproposednetworkinterfaces-Currentimplementation"></a>Current
implementation</h2>

<h3><a name="Currentandproposednetworkinterfaces-Broker"></a>Broker</h3>

<p>Currently the broker decodes the incoming network data, adds the frames to a Job
queue which are then processed as Events by AMQPFastProtocolHandler which passes the majority
of the work to AMQMinaProtocolSession. Often this results in a FrameHandler being called.
On the outbound route Frames are written to AMQMinaProtocolSession which calls IoSession.writeFrame
which passes the data to Mina for writing to the wire. </p>

<p><img src="/confluence/download/attachments/2327666/broker-0.5-network.png" align="absmiddle"
border="0" /></p>

<h3><a name="Currentandproposednetworkinterfaces-Clientconnectioncreation"></a>Client
connection creation</h3>

<p>When the client creates a connection it creates an AMQConnectionDelegate for the
protocol version it requires and  passes the new protocol handler to TransportConnection which
creates a socket of the requested type (new TCP socket, existing TCP socket or InVM). It then
attaches the socket to the protocol handler which init()s a new ProtocolSession which begins
version negotiation with the broker. </p>

<p><img src="/confluence/download/attachments/2327666/client-0.5-connection-creation.png"
align="absmiddle" border="0" /></p>

<h3><a name="Currentandproposednetworkinterfaces-Clientprocessing"></a>Client
processing</h3>

<p>Once a socket has been opened the client processes data similarly to the broker,
decoding frames using AMQDecoder and passing them to AMQProtocolHandler which, normally, calls
a frame handler to perform the actual work. If this frame is one which has a listener waiting
for it, those listeners are notified. </p>

<p>Outgoing data is generated in AMQSession or it's delegate and written to AMQProtocolHandler,
optionally with a return frame to wait for. This is passed to Mina directly. </p>

<p><img src="/confluence/download/attachments/2327666/client-0.5-network-processing.png"
align="absmiddle" border="0" /></p>

<h2><a name="Currentandproposednetworkinterfaces-Newimplementation"></a>New
implementation</h2>

<p><b>NetworkDriver</b> takes bytes from the network and passes them to
the ProtocolEngine. It also accepts bytes from the ProtocolEngine and writes them to the network.</p>

<p><b>ProtocolEngine</b> accepts bytes from the NetworkDriver and turns
them into AMQFrames for processing. </p>

<p>In the new version, a NetworkDriver is created by a ProtocolEngine (in the case of
outgoing conenctions) or by the Broker at startup and creates a ProtocolEngine when new connections
are created. The network driver passes raw data to the ProtocolEngine which is responsible
for both decoding the frames and processing them. When the ProtocolEngine wishes to send data,
it does so by calling the NetworkDriver. The existing mechanisms for frame listeners etc are
retained, but are decoupled from the network processing parts. </p>

<p>At the start of a connection the the NetworkDriver will pass data to a ProtocolEngine
which will handle protocol negotiation and return the ProtocolEngine to use or throw an exception
if no driver is available. The implementation will use the existing Sender and Reciever interfaces
in org.apache.qpid.transport which will allow the use of the existing alternate transport
layer implementations.</p>

<p>The thread of control remains with the network driver. </p>

<p>Data comes in from the operating system, is read from the socket by the NetworkDriver
and given to the ProtocolEngines received method. The ProtocolEngine is responsible for processing
the bytes and interfacing to the rest of the broker or client. </p>

<p>The ProtocolEngine will write bytes to the wire using the NetworkDriver which implements
the existing Sender interface from org.apache.qpid.transport</p>

<p><img src="/confluence/download/attachments/2327666/broker-0.N-state.png" align="absmiddle"
border="0" /></p>

<p>Sender (already exists): </p>
<style type="text/css">
@import url(/confluence/download/resources/confluence.ext.code:code/shStyles.css);
</style>
<!--[if IE]>
<style type="text/css">
    .code textarea, .code input { padding: 0 !important; }
</style>
<![endif]-->
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shCore.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushCSharp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPhp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJScript.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushVb.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushSql.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushXml.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushShell.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushDelphi.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPython.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJava.js"></script>
<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface Sender&lt;T&gt;
{

    // Sets the TCP  idle time out
    void setIdleTimeout(long l);

    // Accepts the data for sending
    void send(T msg);

    // Flushes all data pending
    void flush();

    // Closes the connection
    void close();

}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<p>The ProtocolEngine will implement the Reciever interface to be given bytes by the
NetworkDriver. </p>

<p>Receiever (already exists):</p>
<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface Receiver&lt;T&gt;
{

    // Called when data has been received from the network
    void received(T msg);

    // Called when an exception has occured 
    void exception(Throwable t);

    // Called when the underlying socket has been closed for reading
    void closed();

}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<p>The ProtocolEngine will implement the following interface:</p>

<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface ProtocolEngine implements Receiver&lt;java.nio.ByteBuffer&gt;
{
   // Sets the network driver providing data for this ProtocolEngine
   void setNetworkDriver (NetworkDriver driver)

   // Returns the remote address of the NetworkDriver
   void SocketAddress getRemoteAddress()

   // Returns number of bytes written
   long getWrittenBytes()

   // Returns number of bytes read
   long getReadBytes()

   // Called by the NetworkDriver when the socket has been closed for reading
   void closed() 

   // Called when the NetworkEngine has not written data for the specified period of time
(will trigger a 
   // heartbeat)
   void writerIdle()  
 
   // Called when the NetworkEngine has not read data for the specified period of time (will
close the connection)
   void readerIdle()  
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface ProtocolEngineFactory 
{

  // Returns a new instance of a ProtocolEngine
  ProtocolEngine newProtocolEngine()
  
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<p>The NetworkDriver will implement the following interface:</p>
<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface NetworkDriver implements Sender&lt;java.nio.ByteBuffer&gt;
{
   // Creates a NetworkDriver which attempts to connect to destination on port and attaches
the ProtocolEngine to 
   // it using the SSLEngine if provided
   static NetworkDriver open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration
config, SSLEngine engine); 
  
   // listens for incoming connections on the specified ports and address and creates a new
NetworkDriver which
   // processes incoming connections with ProtocolEngines created from factory using the SSLEngine
if provided
   static void bind (int port, InetAddress[] addresses, ProtocolEngineFactory factory, 
                              NetworkDriverConfiguration config, SSLEngine engine); 

   // Returns the remote address of underlying socket
   void SocketAddress getRemoteAddress()

   /**
    * The length of time after which the ProtocolEngines readIdle() method should be called
if no data has been 
    * read
    */ 
   void setMaxReadIdle(int idleTime)  
 
   /**
    * The length of time after which the ProtocolEngines writeIdle() method should be called
if no data has been 
    * written
    */  
   void setMaxWriteIdle(int idleTime)  

}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<p>The NetworkConfiguration interface provides configuration data for the NetworkDriver:</p>

<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
interface NetworkDriverConfiguration 
{ 
   // Taken from Socket 
   boolean getKeepAlive()
   boolean getOOBInline()
   boolean getReuseAddress()
   Integer getSoLinger() // null means off
   int getSoTimeout()
   boolean getTcpNoDelay()
   int getTrafficClass()
 
   // The amount of memory in bytes to allocate to the incoming buffer
   int getReceiveBufferSize(); 
   
   // The amount of memory in bytes to allocate to the outgoing buffer
   int getSendBufferSize(int size); 
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Currentandproposednetworkinterfaces-Newbrokernetwork%2Fprotocolengineinterface"></a>New
broker network / protocol engine interface</h3>

<p><img src="/confluence/download/attachments/2327666/broker-0.N-network.png" align="absmiddle"
border="0" /></p>

<h3><a name="Currentandproposednetworkinterfaces-Newclientnetwork%2Fprotocolengineinterface"></a>New
client network / protocol engine interface</h3>

<p><img src="/confluence/download/attachments/2327666/client-0.N-network-processing.png"
align="absmiddle" border="0" /></p>
     </div>
     <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>

       <a href="http://cwiki.apache.org/confluence/display/qpid/Current+and+proposed+network+interfaces">View
Online</a>
       |
       <a href="http://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=2327666&revisedVersion=51&originalVersion=50">View
Change</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Current+and+proposed+network+interfaces?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message