qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Qpid > Port server to new interface
Date Tue, 28 Jul 2009 13:02:01 GMT
<html>
<head>
    <base href="http://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/1519/1/1/_/styles/combined.css?spaceKey=qpid&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background-color: white" bgcolor="white">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
     <h2><a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface">Port
server to new interface</a></h2>
     <h4>Page <b>edited</b> by             <a href="http://cwiki.apache.org/confluence/display/~aidan">Aidan
Skinner</a>
    </h4>
     
          <br/>
     <div class="notificationGreySide">
         <div>
<ul>
    <li><a href='#Portservertonewinterface-Overview'>Overview</a></li>
<ul>
<ul>
    <li><a href='#Portservertonewinterface-ImplementationofNetworkDriver'>Implementation
of NetworkDriver</a></li>
</ul>
    <li><a href='#Portservertonewinterface-ImplementationofProtocolEngine'>Implementation
of ProtocolEngine</a></li>
    <li><a href='#Portservertonewinterface-Diagramofnewbrokernetwork%2Fprotocolimplementation'>Diagram
of new broker network / protocol implementation</a></li>
<ul>
    <li><a href='#Portservertonewinterface-Transportlayerselectionino.a.q.server.Main'>Transport
layer selection in o.a.q.server.Main</a></li>
</ul>
    <li><a href='#Portservertonewinterface-Tasks'>Tasks</a></li>
</ul>
</ul></div>

<h2><a name="Portservertonewinterface-Overview"></a>Overview</h2>

<p>This component element is comprised of the following sub-tasks:</p>

<ol>
	<li>Existing code needs to be refactored to remove dependence on MINA</li>
	<li>Existing code changed to use <a href="/confluence/display/qpid/Current+and+proposed+network+interfaces"
title="Current and proposed network interfaces">Current and proposed network interfaces</a></li>
	<li>Configuration to select a transport</li>
	<li>QpidByteBuffer</li>
</ol>


<p>These tasks are described in more detail below.</p>

<h4><a name="Portservertonewinterface-ImplementationofNetworkDriver"></a>Implementation
of NetworkDriver</h4>

<style type="text/css">
@import url(/confluence/download/resources/confluence.ext.code:code/shStyles.css);
</style>
<!--[if IE]>
<style type="text/css">
    .code textarea, .code input { padding: 0 !important; }
</style>
<![endif]-->
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shCore.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushCSharp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPhp.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJScript.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushVb.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushSql.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushXml.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushShell.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushDelphi.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushPython.js"></script>
<script class="javascript" src="/confluence/download/resources/confluence.ext.code:code/shBrushJava.js"></script>
<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
class MINANetworkDriver implements Sender&lt;java.nio.ByteBuffer&gt; 
{ 
   /**
    * Creates a concrete NetworkDriver which opens a connection to the specified address on
the port
    * and starts the given ProtocolEngine instance using MINA
    */
   static NetworkDriver open (int port, InetAddress destination, ProtocolEngine enginer, 
                  NetworkDriverConfiguration config);  
  
   /**
    * Listens for incoming connections on the specified port and IPaddresses. Creates a new
concrete
    * NetworkDriver for each incoming connection and creates a ProtocolEngine with the given
factory
    */
   static NetworkDriver bind (int port, InetAddress[] addresses, ProtocolEngineFactory factory,
 
                  NetworkDriverConfiguration config);  
 
   void SocketAddress getRemoteAddress() 
 
   /**
    * The length of time after which the ProtocolEngines readIdle() method should be called
if no data has been 
    * read
    */
   void setMaxReadIdle(int idleTime) 

   /**
    * The length of time after which the ProtocolEngines writeIdle() method should be called
if no data has been 
    * written
    */ 
   void setMaxWriteIdle(int idleTime) 
   
   /**
    * Adds the data to an Event through AsynchWritePoolingFilter for writing to MINA
    */
   void send(java.nio.ByteBuffer data)

}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Portservertonewinterface-ImplementationofProtocolEngine"></a>Implementation
of ProtocolEngine</h3>

<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
class AMQProtocolEngine implements ProtocolEngine
{ 
   /**
    * Processes data. If no concrete AMQProtocolEngine_0_N delegate exists at the time, it
uses AMQDecoder to 
    * decode the AMQ protocol initiation and creates the appropriate delegate. Otherwise it
passes the data
    * onto the delegate for processing. 
    */
   void recieved(java.nio.ByteBuffer data)
   
   void setNetworkDriver (NetworkDriver driver) 
 
   void SocketAddress getRemoteAddress() 
 
   long getWrittenBytes() 
 
   long getReadBytes() 
 
   long close()  
  
   // Called when the NetworkEngine has not written data for the specified period of time
   void writerIdle() 

   // Called when the NetworkEngine has not read data for the specified period of time   

   void readerIdle() 
 
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
class AMQProtocolEngine_0_N implements ProtocolEngine
{ 
   /**
    * Decodes the data using AMQBlockDecoder_0_N and adds the Event to the current Job for
processing. 
    * When the Job is processed, the RecievedEvent calls frameRecieved() retained from AMQMMinaProtocolSession
    * which either closes the session if it's ChannelCloseOk or calls the frame bodys handle()
method 
    */
   void recieved(java.nio.ByteBuffer data)
   
   void setNetworkDriver (NetworkDriver driver) 
 
   void SocketAddress getRemoteAddress() 
 
   long getWrittenBytes() 
 
   long getReadBytes() 
 
   long close()  
 
   // Called when the NetworkEngine has not written data for the specified period of time
   void writerIdle() 

   // Called when the NetworkEngine has not read data for the specified period of time   

   void readerIdle() 
 
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<div class="code">
<textarea name="newcodemacro" class="java:nocontrols:nogutter" rows="10" readonly="readonly">public
class AMQProtocolEngineFactory implements ProtocolEngineFactory  
{ 
 
  // Returns a new instance of an AMQProtocolEngine 
  ProtocolEngine newProtocolEngine() 
   
}</textarea>
<script class="javascript">
    if(!window.newcodemacro_initialised)
    {
        window.newcodemacro_initialised = true;
        window.oldonloadmethod = window.onload;
        window.onload = function(){
            dp.SyntaxHighlighter.HighlightAll('newcodemacro');
            if(window.oldonloadmethod)
            {
                window.oldonloadmethod();
            }
        }
    }

</script>
</div>


<h3><a name="Portservertonewinterface-Diagramofnewbrokernetwork%2Fprotocolimplementation"></a>Diagram
of new broker network / protocol implementation</h3>

<p><img src="/confluence/download/attachments/122495/broker-0.N-network.png" align="absmiddle"
border="0" /></p>

<h4><a name="Portservertonewinterface-Transportlayerselectionino.a.q.server.Main"></a>Transport
layer selection in o.a.q.server.Main</h4>

<p>Currently the broker's Main method contains setup code specific to MINA.</p>

<p>Main needs to be modified to remove the MINA specific option (NIO, MultiIO, executor
pool etc) processing from there. This should be replaced with creation of a Network Driver
instance with configuration being picked up &amp; applied in the same way as other subsystems
are configured ie. using ServerConfiguration.</p>

<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> bind </td>
<td class='confluenceTd'> Moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> startup </td>
<td class='confluenceTd'> Network configuration moved to NetworkDriver, replaced with
construction of NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> ServerConfiguration </td>
<td class='confluenceTd'> getNetworkConfiguration </td>
<td class='confluenceTd'> needs to parse config file and return networking configuration
information </td>
</tr>
</tbody></table>

<h3><a name="Portservertonewinterface-Tasks"></a>Tasks</h3>

<ol>
	<li>Implement MINANetworkDriver
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> createSession </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> exceptionCaught </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> messageRecieved </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> messageSent </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionClosed </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionCreated </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionIdle </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'> sessionOpened </td>
<td class='confluenceTd'> moved to the NetworkDriver implementation after removing their
AMQP specific functionality </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> Threading pool configuration moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> initHeartbets </td>
<td class='confluenceTd'> moved to NetworkDriver.setMaxReadIdle() and NetworkDriver
.setMaxWriteIdle() </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> getRemoteAddress </td>
<td class='confluenceTd'> moved to NetworkDriver.getRemoteAddress </td>
</tr>
</tbody></table></li>
</ol>


<ol>
	<li>Implement AMQProtocolEngine
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> Whole class </td>
<td class='confluenceTd'> Rename to AMQProtocolEngine. decouple the AMQP semantics from
the underlying networking, for which it should now use MINANetworkDriver. This class will
implement the ProtocolEngine interface and become the central point for processing AMQP frames.
It will use a Job to hold RecievedEvents for processing outside of the network thread. </td>
</tr>
<tr>
<td class='confluenceTd'> AMQMinaProtocolSession </td>
<td class='confluenceTd'> received </td>
<td class='confluenceTd'> implement and use AMQDecoder to decode the byte stream before
placing onto Job for processing </td>
</tr>
<tr>
<td class='confluenceTd'> AMQProtocolSessionMBean </td>
<td class='confluenceTd'> Needs updated to use AMQProtocolSession without relying on
the underlying implementation </td>
</tr>
<tr>
<td class='confluenceTd'> Event, Job </td>
<td class='confluenceTd'> Created by AMQProtocolEngine.recieved rather than AsynchPoolingReadFilter
</td>
</tr>
<tr>
<td class='confluenceTd'> AMQDecoder </td>
<td class='confluenceTd'> Should no longer extend CumulativeProtocolDecoder </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPFastProtocolHandler </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> AMQP related functionality moved to AMQProtocolEngine </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPProtocolProvider </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> Removed </td>
</tr>
</tbody></table></li>
</ol>


<ol>
	<li>Add QpidByteBuffer
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class  </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> QpidByteBuffer </td>
<td class='confluenceTd'> At a number of points in the code we rely on behaviour particular
to MINA ByteBuffers, in particular the auto-expanding functionality. To avoid having to rewrite
those parts at this stage we should import the MINA ByteBuffer class as QpidByteBuffer and
continue to use it as is. Note that this functionality is not related to the network buffers,
the QpidByteBuffers are the outputted data structure. The existing implementations will have
their MINA or java.nio.ByteBuffers converted to QpidByteBuffers by the ProtocolEngine. </td>
</tr>
</tbody></table></li>
</ol>


<ol>
	<li>Implement NetworkConfiguration and change Main to use MINANetworkDriver
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Class </th>
<th class='confluenceTh'> Method </th>
<th class='confluenceTh'> Change </th>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> bind </td>
<td class='confluenceTd'> Moved to NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> Main </td>
<td class='confluenceTd'> startup </td>
<td class='confluenceTd'> Network configuration moved to NetworkDriver, replaced with
construction of NetworkDriver </td>
</tr>
<tr>
<td class='confluenceTd'> ServerConfiguration </td>
<td class='confluenceTd'> getNetworkConfiguration </td>
<td class='confluenceTd'> needs to parse config file and return networking configuration
information </td>
</tr>
<tr>
<td class='confluenceTd'> AMQPProtocolProvider </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> This class is just removed </td>
</tr>
</tbody></table></li>
</ol>

     </div>
     <div id="commentsSection" class="wiki-content pageSection">
       <div style="float: right;">
            <a href="http://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
       </div>

       <a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface">View
Online</a>
       |
       <a href="http://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=122495&revisedVersion=24&originalVersion=23">View
Change</a>
              |
       <a href="http://cwiki.apache.org/confluence/display/qpid/Port+server+to+new+interface?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message