qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Port server to new interface
Date Wed, 29 Jul 2009 09:15:00 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
     <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface">Port
server to new interface</a></h2>
     <h4>Page <b>edited</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
     
          <br/>
     <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#Portservertonewinterface-Objectives'>Objectives</a></li>
    <li><a href='#Portservertonewinterface-Overviewofnewimplementation'>Overview
of new implementation</a></li>
    <li><a href='#Portservertonewinterface-Workrequired'>Work required</a></li>
<ul>
    <li><a href='#Portservertonewinterface-AddQpidByteBuffer'>Add QpidByteBuffer</a></li>
    <li><a href='#Portservertonewinterface-ImplementNetworkDriver'>Implement NetworkDriver</a></li>
<ul>
    <li><a href='#Portservertonewinterface-MINANetworkDriverclass'>MINANetworkDriver
class</a></li>
    <li><a href='#Portservertonewinterface-ChangesrequiredtoimplementMINANetworkDriver'>Changes
required to implement MINANetworkDriver</a></li>
</ul>
    <li><a href='#Portservertonewinterface-ImplementationofProtocolEngine'>Implementation
of ProtocolEngine</a></li>
<ul>
    <li><a href='#Portservertonewinterface-ChangesrequiredtoimplementAMQProtocolEngine'>Changes
required to implement AMQProtocolEngine</a></li>
    <li><a href='#Portservertonewinterface-ChangesrequiredtoimplementAMQProtocolEngineFactory'>Changes
required to implement AMQProtocolEngineFactory</a></li>
</ul>
    <li><a href='#Portservertonewinterface-ImplementNetworkConfiguration'>Implement
NetworkConfiguration</a></li>
    <li><a href='#Portservertonewinterface-Transportlayerselectionino.a.q.server.Main'>Transport
layer selection in o.a.q.server.Main</a></li>
</ul>
</ul></div>

<p>Notes:</p>

<ul><li><a href="/confluence/display/qpid/Port+server+to+new+interface+notes"
title="Port server to new interface notes">Port server to new interface notes</a></li></ul>

<h1><a name="Portservertonewinterface-Objectives"></a>Objectives</h1>

<ol>
	<li>Implement ProtocolEngine and NetworkDriver from <a href="/confluence/display/qpid/Current+and+proposed+network+interfaces"
title="Current and proposed network interfaces">Current and proposed network interfaces</a></li>
	<li>Isolate MINA dependencies into MINANetworkDriver</li>
	<li>No changes to threading model</li>
	<li>No changes to configuration file</li>
</ol>


<h1><a name="Portservertonewinterface-Overviewofnewimplementation"></a>Overview
of new implementation</h1>

<p><img src="/confluence/download/attachments/122495/broker-0.N-network-phase-1.png"
align="absmiddle" border="0" /></p>

<p>This first phase confines the MINA dependency behind the NetworkDriver interface.
As can be seen by comparison from the following diagram describing the 0.5 implementation,
the changes to the flow of control are not significantly different. The only significant change
is to decouple the protocol processing from the network processing. </p>

<p><img src="/confluence/download/attachments/122495/broker-0.5-network.png" align="absmiddle"
border="0" /></p>


<h1><a name="Portservertonewinterface-Workrequired"></a>Work required</h1>

<h2><a name="Portservertonewinterface-AddQpidByteBuffer"></a>Add QpidByteBuffer</h2>

<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class  </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> org.apache.mina.common.ByteBuffer </td>
<td class='confluenceTd'> all </td>
<td class='confluenceTd'>  At a number of points in the code we rely on behaviour present
in  MINA ByteBuffers but not java.nio.ByteBuffers, in particular the auto-expanding functionality.
To avoid having to rewrite those parts at this stage we should import the MINA ByteBuffer
class as QpidByteBuffer and continue to use it as is. Note that this functionality is not
related to the network buffers, the QpidByteBuffers are the outputted data structure. The
existing implementations will have their MINA or java.nio.ByteBuffers converted to QpidByteBuffers
by the ProtocolEngine after decoding </td>
</tr>
</tbody></table>

<h2><a name="Portservertonewinterface-ImplementNetworkDriver"></a>Implement
NetworkDriver</h2>

<h3><a name="Portservertonewinterface-MINANetworkDriverclass"></a>MINANetworkDriver
class</h3>

<style type="text/css">
@import url(/confluence/download/resources/confluence.ext.code:code/shStyles.css);
</style>
<!--[if IE]>
<style type="text/css">
    .code textarea, .code input { padding: 0 !important; }
</style>
<![endif]-->
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shCore.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushCSharp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPhp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJScript.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushVb.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushSql.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushXml.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushShell.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushDelphi.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPython.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJava.js"></script>
<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">/***
 * This class wraps the existing MINA implementation behind the NetworkDriver interface. This
uses
 * the AsyncWritePoolingFilter to buffer network writes so that send() is not synchronous.

 */
public class MINANetworkDriver implements NetworkDriver extends IoHandlerAdapter
{ 
   /**
    * Creates a concrete NetworkDriver which opens a connection to the specified address on
the port
    * and starts the given ProtocolEngine instance using MINA
    */
   static NetworkDriver open (int port, InetAddress destination, ProtocolEngine enginer, 
                  NetworkDriverConfiguration config);  
  
   /**
    * Listens for incoming connections on the specified port and IPaddresses. Creates a new
concrete
    * NetworkDriver for each incoming connection and creates a ProtocolEngine with the given
factory
    */
   static NetworkDriver bind (int port, InetAddress[] addresses, ProtocolEngineFactory factory,
 
                  NetworkDriverConfiguration config);  
 
   void SocketAddress getRemoteAddress() 
 
   /**
    * The length of time after which the ProtocolEngines readIdle() method should be called
if no data has been 
    * read
    */
   void setMaxReadIdle(int idleTime) 

   /**
    * The length of time after which the ProtocolEngines writeIdle() method should be called
if no data has been 
    * written
    */ 
   void setMaxWriteIdle(int idleTime) 
   
   /**
    * Adds the data to an Event through AsynchWritePoolingFilter for writing to MINA
    */
   void send(java.nio.ByteBuffer data)

}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Portservertonewinterface-ChangesrequiredtoimplementMINANetworkDriver"></a>Changes
required to implement MINANetworkDriver</h3>

<p>It replaces AMQPFastProtocolHandler and the majority of the implementation will be
taken from that class. bind() will be taken from the existing org.apache.qpid.server.Main</p>

<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> general </td>
<td class='confluenceTd'> Renamed to MINANetworkDriver, implements NetworkDriver interface</td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> exceptionCaught </td>
<td class='confluenceTd'> AMQP functionality removed, passes exception onto ProtocolEngine
</td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> messageRecieved </td>
<td class='confluenceTd'> calls ProtocolEngine.received()</td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionClosed </td>
<td class='confluenceTd'> calls ProtocolEngine.closed() </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionCreated </td>
<td class='confluenceTd'> creates a new ProtocolEngine from ProtocolEngineFactory, other
AMQP functionality moved to AMQProtocolEngine. Configuration taken from NetworkDriverConfiguration
</td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionIdle </td>
<td class='confluenceTd'> calls ProtocolEngine.readerIdle() or ProtocolEngine.writerIdle()
as appropriate </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> Threading pool configuration moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> initHeartbets </td>
<td class='confluenceTd'> moved to NetworkDriver.setMaxReadIdle() and NetworkDriver
.setMaxWriteIdle() </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> getRemoteAddress </td>
<td class='confluenceTd'> moved to NetworkDriver.getRemoteAddress </td>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> bind </td>
<td class='confluenceTd'> Moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> startup </td>
<td class='confluenceTd'> Network configuration moved to NetworkDriver, replaced with
construction of NetworkDriver </td>
</tr>
</tbody></table>

<h2><a name="Portservertonewinterface-ImplementationofProtocolEngine"></a>Implementation
of ProtocolEngine</h2>

<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">/***
 * This class replaces the existing AMQMinaProtocolSession and implements the ProtocolEngine
interface. It is 
 * responsible for processing bytes into frames and for processing those frames. It is also
responsible for 
 * turning frames into bytes and passing them onto the NetworkDriver for writing. 
 */
public class AMQProtocolEngine implements ProtocolEngine
{ 
   /**
    * Processes data. It uses AMQDecoder to decode the frame and place it into a Job for later
processing
    */
   void received(java.nio.ByteBuffer data)
   
   void setNetworkDriver (NetworkDriver driver) 
 
   /**
    * calls NetworkDriver.getRemoteAddress()
    */
   SocketAddress getRemoteAddress() 
 
   long getWrittenBytes() 
 
   long getReadBytes() 
 
   /**
    * Closes the protocol engine, aborts in-progress transactions etc. 
    */
   void closed()  
  
   /**
    * Generates a heartbeat frame
    */
   void writerIdle() 

   /**
    * Closes the connection
    */
   void readerIdle() 
 
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Portservertonewinterface-ChangesrequiredtoimplementAMQProtocolEngine"></a>Changes
required to implement AMQProtocolEngine</h3>
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> Whole class </td>
<td class='confluenceTd'> Rename to AMQProtocolEngine. decouple the AMQP semantics from
the underlying networking, for which it should now use MINANetworkDriver. This class will
implement the ProtocolEngine interface and become the central point for processing AMQP frames.
It will use a Job to hold RecievedEvents for processing outside of the network thread. </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> received </td>
<td class='confluenceTd'> implement and use AMQDecoder to decode the byte stream before
placing onto Job for processing </td>
</tr>
<tr>
<td class='confluenceTd'> AMQProtocolSessionMBean </td>
<td class='confluenceTd'> Needs updated to use AMQProtocolSession without relying on
the underlying implementation </td>
</tr>
<tr>
<td class='confluenceTd'> Event, Job </td>
<td class='confluenceTd'> Created by AMQProtocolEngine.recieved rather than AsynchPoolingReadFilter
</td>
</tr>
<tr>
<td class='confluenceTd'> AMQDecoder </td>
<td class='confluenceTd'> Should no longer extend CumulativeProtocolDecoder </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> AMQP related functionality moved to AMQProtocolEngine </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPProtocolProvider </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> Removed </td>
</tr>
</tbody></table>

<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
class AMQProtocolEngineFactory implements ProtocolEngineFactory  
{ 
 
  // Returns a new instance of an AMQProtocolEngine 
  ProtocolEngine newProtocolEngine() 
   
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Portservertonewinterface-ChangesrequiredtoimplementAMQProtocolEngineFactory"></a>Changes
required to implement AMQProtocolEngineFactory</h3>

<p>No existing code needs to be changed here. The factory will simply create an AMQProtocolEngine
and return it. </p>

<h2><a name="Portservertonewinterface-ImplementNetworkConfiguration"></a>Implement
NetworkConfiguration</h2>
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> ServerConfiguration </td>
<td class='confluenceTd'> getNetworkConfiguration </td>
<td class='confluenceTd'> new method, needs to construct and return network configuration
information </td>
</tr>
</tbody></table>

<h2><a name="Portservertonewinterface-Transportlayerselectionino.a.q.server.Main"></a>Transport
layer selection in o.a.q.server.Main</h2>

<p>Currently the broker's Main method contains setup code specific to MINA.</p>

<p>Main needs to be modified to remove the MINA specific option (NIO, MultiIO, executor
pool etc) processing from there. This should be replaced with creation of a Network Driver
instance with configuration being picked up &amp; applied in the same way as other subsystems
are configured ie. from ServerConfiguration.</p>

<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> bind </td>
<td class='confluenceTd'> Moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> startup </td>
<td class='confluenceTd'> Network configuration moved to NetworkDriver, replaced with
construction of NetworkDriver and calling MINANetworkDriver.bind </td>
</tr>
</tbody></table>
     </div>
     <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>

       <a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface">View
Online</a>
       |
       <a href="http://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=122495&revisedVersion=29&originalVersion=28">View
Change</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message