qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r825362 [2/2] - in /qpid/trunk/qpid/java: broker/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/connection/ broker/src/main/java/org/apach...
Date Thu, 15 Oct 2009 01:06:26 GMT
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Oct 15 01:06:23 2009
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@
 
     protected static final String SASL_CLIENT = "SASLClient";
 
-    protected final IoSession _minaProtocolSession;
-
-    protected WriteFuture _lastWriteFuture;
-
     /**
      * The handler from which this session was created and which is used to handle protocol events. We send failover
      * events to the handler.
@@ -102,28 +94,15 @@
 
     protected final AMQConnection _connection;
 
-    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+    private ConnectionTuneParameters _connectionTuneParameters;
 
-    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
-    {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = protocolSession;
-        _minaProtocolSession.setAttachment(this);
-        // properties of the connection are made available to the event handlers
-        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
-        // fixme - real value needed
-        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        _protocolVersion = connection.getProtocolVersion();
-        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
-                                                                           this);
-        _connection = connection;
+    private SaslClient _saslClient;
 
-    }
+    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
     {
-        _protocolHandler = protocolHandler;
-        _minaProtocolSession = null;
+        _protocolHandler = protocolHandler;        
         _protocolVersion = connection.getProtocolVersion();
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
                                                                            this);
@@ -134,7 +113,7 @@
     {
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
-        _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+        _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
     public String getClientID()
@@ -175,14 +154,9 @@
         return getAMQConnection().getPassword();
     }
 
-    public IoSession getIoSession()
-    {
-        return _minaProtocolSession;
-    }
-
     public SaslClient getSaslClient()
     {
-        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);    
+        return _saslClient;    
     }
 
     /**
@@ -192,28 +166,21 @@
      */
     public void setSaslClient(SaslClient client)
     {
-        if (client == null)
-        {
-            _minaProtocolSession.removeAttribute(SASL_CLIENT);
-        }
-        else
-        {
-            _minaProtocolSession.setAttribute(SASL_CLIENT, client);
-        }
+        _saslClient = client;
     }
 
     public ConnectionTuneParameters getConnectionTuneParameters()
     {
-        return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+        return _connectionTuneParameters;
     }
 
     public void setConnectionTuneParameters(ConnectionTuneParameters params)
     {
-        _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+        _connectionTuneParameters = params;
         AMQConnection con = getAMQConnection();
         con.setMaximumChannelCount(params.getChannelMax());
         con.setMaximumFrameSize(params.getFrameMax());
-        initHeartbeats((int) params.getHeartbeat());
+        _protocolHandler.initHeartbeats((int) params.getHeartbeat());
     }
 
     /**
@@ -335,21 +302,12 @@
      */
     public void writeFrame(AMQDataBlock frame)
     {
-        writeFrame(frame, false);
+        _protocolHandler.writeFrame(frame);
     }
 
     public void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        WriteFuture f = _minaProtocolSession.write(frame);
-        if (wait)
-        {
-            // fixme -- time out?
-            f.join();
-        }
-        else
-        {
-            _lastWriteFuture = f;
-        }
+        _protocolHandler.writeFrame(frame, wait);
     }
 
     /**
@@ -407,33 +365,12 @@
 
     public AMQConnection getAMQConnection()
     {
-        return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+        return _connection;
     }
 
-    public void closeProtocolSession()
+    public void closeProtocolSession() throws AMQException
     {
-        closeProtocolSession(true);
-    }
-
-    public void closeProtocolSession(boolean waitLast)
-    {
-        _logger.debug("Waiting for last write to join.");
-        if (waitLast && (_lastWriteFuture != null))
-        {
-            _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-        }
-
-        _logger.debug("Closing protocol session");
-        
-        final CloseFuture future = _minaProtocolSession.close();
-
-        // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
-        // then wait for the connection to close.
-        // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
-        // error now shouldn't matter.
-
-        _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
-        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        _protocolHandler.closeConnection(0);
     }
 
     public void failover(String host, int port)
@@ -449,22 +386,11 @@
             id = _queueId++;
         }
         // get rid of / and : and ; from address for spec conformance
-        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+        String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
 
         return new AMQShortString("tmp_" + localAddress + "_" + id);
     }
 
-    /** @param delay delay in seconds (not ms) */
-    void initHeartbeats(int delay)
-    {
-        if (delay > 0)
-        {
-            _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
-            _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
-            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
-        }
-    }
-
     public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
     {
         final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@
 
     public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
     {
-        _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+        _protocolHandler.methodBodyReceived(channel, amqMethodBody);
     }
 
     public void notifyError(Exception error)

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Thu Oct 15 01:06:23 2009
@@ -20,27 +20,20 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
+import org.apache.qpid.client.SSLConfiguration;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class SocketTransportConnection implements ITransportConnection
 {
     private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +64,27 @@
         }
 
         final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
-        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
-        // if we do not use our own thread model we get the MINA default which is to use
-        // its own leader-follower model
-        boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
-        if (readWriteThreading)
-        {
-            cfg.setThreadModel(ReadWriteThreadModel.getInstance());
-        }
-
-        SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
-        scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
-        scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
-        _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
-        scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
-        _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
         final InetSocketAddress address;
 
         if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
         {
             address = null;
-
-            Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
-            if (socket != null)
-            {
-                _logger.info("Using existing Socket:" + socket);
-
-                ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
-            }
-            else
-            {
-                throw new IllegalArgumentException("Active Socket must be provided for broker " +
-                                                   "with 'socket://<SocketID>' transport:" + brokerDetail);
-            }
         }
         else
         {
             address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
             _logger.info("Attempting connection to " + address);
         }
-
-
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
-        // wait for connection to complete
-        if (future.join(brokerDetail.getTimeout()))
-        {
-            // we call getSession which throws an IOException if there has been an error connecting
-            future.getSession();
-        }
-        else
-        {
-            throw new IOException("Timeout waiting for connection.");
-        }
+        
+        SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+        SSLContextFactory sslFactory = null;
+        if (sslConfig != null)
+        {
+            sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+        }
+        
+        MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+        driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+        protocolHandler.setNetworkDriver(driver);
     }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Oct 15 01:06:23 2009
@@ -20,6 +20,12 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
@@ -30,16 +36,12 @@
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
 /**
  * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
  * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -61,7 +63,7 @@
 
     private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
 
-    private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+    private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
 
     private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
 
@@ -75,7 +77,7 @@
         return _openSocketRegister.remove(socketID);
     }
 
-    public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+    public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
     {
         int transport = getTransport(details.getTransport());
 
@@ -91,7 +93,22 @@
                 {
                     public IoConnector newSocketConnector()
                     {
-                        return new ExistingSocketConnector(1,new QpidThreadExecutor());
+                        ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+                        Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+                        if (socket != null)
+                        {
+                            _logger.info("Using existing Socket:" + socket);
+
+                            ((ExistingSocketConnector) connector).setOpenSocket(socket);
+                        }
+                        else
+                        {
+                            throw new IllegalArgumentException("Active Socket must be provided for broker " +
+                                                               "with 'socket://<SocketID>' transport:" + details);
+                        }
+                        return connector;
                     }
                 });
             case TCP:
@@ -189,8 +206,6 @@
             _acceptor = new VmPipeAcceptor();
 
             IoServiceConfig config = _acceptor.getDefaultConfig();
-
-            config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
         synchronized (_inVmPipeAddress)
         {
@@ -275,7 +290,10 @@
         {
             Class[] cnstr = {Integer.class};
             Object[] params = {port};
-            provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+            
+            provider = new MINANetworkDriver();
+            ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+            ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
             // Give the broker a second to create
             _logger.info("Created VMBroker Instance:" + port);
         }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Thu Oct 15 01:06:23 2009
@@ -20,25 +20,26 @@
  */
 package org.apache.qpid.client.transport;
 
+import java.io.IOException;
+
 import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class VmPipeTransportConnection implements ITransportConnection
 {
     private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
 
     private static int _port;
 
+    private MINANetworkDriver _networkDriver;
+
     public VmPipeTransportConnection(int port)
     {
         _port = port;
@@ -47,16 +48,16 @@
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
     {
         final VmPipeConnector ioConnector = new QpidVmPipeConnector();
-        final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
-        cfg.setThreadModel(ReadWriteThreadModel.getInstance());
 
         final VmPipeAddress address = new VmPipeAddress(_port);
         _logger.info("Attempting connection to " + address);
-        ConnectFuture future = ioConnector.connect(address, protocolHandler);
+        _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+        protocolHandler.setNetworkDriver(_networkDriver);
+        ConnectFuture future = ioConnector.connect(address, _networkDriver);
         // wait for connection to complete
         future.join();
         // we call getSession which throws an IOException if there has been an error connecting
         future.getSession();
+        _networkDriver.setProtocolEngine(protocolHandler);
     }
 }

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Oct 15 01:06:23 2009
@@ -27,6 +27,7 @@
 import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
 import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@
     {
         //Create a new ProtocolHandler with a fake connection.
         _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
-        _handler.sessionCreated(new MockIoSession());
-
+        _handler.setNetworkDriver(new TestNetworkDriver());
          AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
         _blockFrame = new AMQFrame(0, body);
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Thu Oct 15 01:06:23 2009
@@ -23,6 +23,7 @@
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
 import org.apache.mina.filter.codec.ProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
  * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
@@ -50,9 +51,9 @@
      * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
      *                                 frame, <tt>false</tt> if it is going to be a standard AMQ data block.
      */
-    public AMQCodecFactory(boolean expectProtocolInitiation)
+    public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
     {
-        _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+        _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
     }
 
     /**
@@ -70,7 +71,7 @@
      *
      * @return The AMQP decoder.
      */
-    public ProtocolDecoder getDecoder()
+    public AMQDecoder getDecoder()
     {
         return _frameDecoder;
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Oct 15 01:06:23 2009
@@ -20,14 +20,21 @@
  */
 package org.apache.qpid.codec;
 
+import java.util.ArrayList;
+
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 
+import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
  * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -62,14 +69,19 @@
     private boolean _expectProtocolInitiation;
     private boolean firstDecode = true;
 
+    private AMQMethodBodyFactory _bodyFactory;
+
+    private ByteBuffer _remainingBuf;
+    
     /**
      * Creates a new AMQP decoder.
      *
      * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
      */
-    public AMQDecoder(boolean expectProtocolInitiation)
+    public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
     {
         _expectProtocolInitiation = expectProtocolInitiation;
+        _bodyFactory = new AMQMethodBodyFactory(session);
     }
 
     /**
@@ -120,7 +132,7 @@
     protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
         int pos = in.position();
-        boolean enoughData = _dataBlockDecoder.decodable(session, in);
+        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
         in.position(pos);
         if (!enoughData)
         {
@@ -149,7 +161,7 @@
      */
     private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
-        boolean enoughData = _piDecoder.decodable(session, in);
+        boolean enoughData = _piDecoder.decodable(in.buf());
         if (!enoughData)
         {
             // returning false means it will leave the contents in the buffer and
@@ -158,7 +170,8 @@
         }
         else
         {
-            _piDecoder.decode(session, in, out);
+            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+            out.write(pi);
 
             return true;
         }
@@ -177,7 +190,7 @@
     }
 
 
- /**
+    /**
      * Cumulates content of <tt>in</tt> into internal buffer and forwards
      * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
@@ -268,4 +281,60 @@
         session.setAttribute( BUFFER, remainingBuf );
     }
 
+    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+
+        // get prior remaining data from accumulator
+        ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+        ByteBuffer msg;
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( _remainingBuf != null )
+        {
+            _remainingBuf.put(buf);
+            _remainingBuf.flip();
+            msg = _remainingBuf;
+        }
+        else
+        {
+            msg = ByteBuffer.wrap(buf);
+        }
+        
+        if (_expectProtocolInitiation  
+            || (firstDecode
+                && (msg.remaining() > 0)
+                && (msg.get(msg.position()) == (byte)'A')))
+        {
+            if (_piDecoder.decodable(msg.buf()))
+            {
+                dataBlocks.add(new ProtocolInitiation(msg.buf()));
+            }
+        }
+        else
+        {
+            boolean enoughData = true;
+            while (enoughData)
+            {
+                int pos = msg.position();
+
+                enoughData = _dataBlockDecoder.decodable(msg);
+                msg.position(pos);
+                if (enoughData)
+                {
+                    dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
+                }
+                else
+                {
+                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+                    _remainingBuf.setAutoExpand(true);
+                    _remainingBuf.put(msg);
+                }
+            }
+        }
+        if(firstDecode && dataBlocks.size() > 0)
+        {
+            firstDecode = false;
+        }
+        return dataBlocks;
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Thu Oct 15 01:06:23 2009
@@ -47,7 +47,7 @@
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
     {
         final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
@@ -56,14 +56,15 @@
             return false;
         }
 
-        in.skip(1 + 2);
-        final long bodySize = in.getUnsignedInt();
+        in.position(in.position() + 1 + 2);
+        // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
+        final long bodySize = in.getInt() & 0xffffffffL; 
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
         throws AMQFrameDecodingException, AMQProtocolVersionException
     {
         final byte type = in.get();
@@ -71,15 +72,7 @@
         BodyFactory bodyFactory;
         if (type == AMQMethodBody.TYPE)
         {
-            bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
-            if (bodyFactory == null)
-            {
-                AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
-                bodyFactory = new AMQMethodBodyFactory(protocolSession);
-                session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
-            }
-
+            bodyFactory = methodBodyFactory;
         }
         else
         {
@@ -115,6 +108,24 @@
 
     public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
-        out.write(createAndPopulateFrame(session, in));
+        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+        if (bodyFactory == null)
+        {
+            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+            bodyFactory = new AMQMethodBodyFactory(protocolSession);
+            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+        }
+        
+        out.write(createAndPopulateFrame(bodyFactory, in));
+    }
+
+    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+    {
+        return decodable(msg.buf());
+    }
+
+    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
     }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Thu Oct 15 01:06:23 2009
@@ -20,12 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 import org.apache.qpid.AMQException;
 
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
 public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -53,13 +51,12 @@
         _protocolMajor = protocolMajor;
         _protocolMinor = protocolMinor;
     }
-
+    
     public ProtocolInitiation(ProtocolVersion pv)
     {
         this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
     }
 
-
     public ProtocolInitiation(ByteBuffer in)
     {
         _protocolHeader = new byte[4];
@@ -71,6 +68,11 @@
         _protocolMinor = in.get();
     }
 
+    public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+    {
+        writePayload(buffer.buf());
+    }
+    
     public long getSize()
     {
         return 4 + 1 + 1 + 1 + 1;
@@ -127,16 +129,11 @@
          * @return true if we have enough data to decode the PI frame fully, false if more
          * data is required
          */
-        public boolean decodable(IoSession session, ByteBuffer in)
+        public boolean decodable(ByteBuffer in)
         {
             return (in.remaining() >= 8);
         }
 
-        public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
-        {
-            ProtocolInitiation pi = new ProtocolInitiation(in);
-            out.write(pi);
-        }
     }
 
     public ProtocolVersion checkVersion() throws AMQException
@@ -192,4 +189,5 @@
         buffer.append(Integer.toHexString(_protocolMinor));
         return buffer.toString();
     }
+
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Oct 15 01:06:23 2009
@@ -21,9 +21,12 @@
 package org.apache.qpid.pool;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -52,35 +55,28 @@
  */
 public class Job implements ReadWriteRunnable
 {
+    
+    /** Defines the maximum number of events that will be batched into a single job. */
+    public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
 
-    /** The Mina session. */
-    private final IoSession _session;
-
     /** Holds the queue of events that make up the job. */
-    private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+    private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
 
     /** Holds a status flag, that indicates when the job is actively running. */
     private final AtomicBoolean _active = new AtomicBoolean();
 
-    /** Holds the completion continuation, called upon completion of a run of the job. */
-    private final JobCompletionHandler _completionHandler;
-
     private final boolean _readJob;
 
-    /**
-     * Creates a new job that aggregates many continuations together.
-     *
-     * @param session           The Mina session.
-     * @param completionHandler The per job run, terminal continuation.
-     * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
-     * @param readJob
-     */
-    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
+    private ReferenceCountingExecutorService _poolReference;
+
+    private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+    
+    public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
     {
-        _session = session;
-        _completionHandler = completionHandler;
+        _poolReference = poolReference;
         _maxEvents = maxEvents;
         _readJob = readJob;
     }
@@ -90,7 +86,7 @@
      *
      * @param evt The continuation to enqueue.
      */
-    void add(Event evt)
+    public void add(Runnable evt)
     {
         _eventQueue.add(evt);
     }
@@ -104,14 +100,14 @@
         int i = _maxEvents;
         while( --i != 0 )
         {
-            Event e = _eventQueue.poll();
+            Runnable e = _eventQueue.poll();
             if (e == null)
             {
                 return true;
             }
             else
             {
-                e.process(_session);
+                e.run();
             }
         }
         return false;
@@ -153,40 +149,105 @@
         if(processAll())
         {
             deactivate();
-            _completionHandler.completed(_session, this);
+            completed();
         }
         else
         {
-            _completionHandler.notCompleted(_session, this);
+            notCompleted();
         }
     }
 
-    public boolean isReadJob()
-    {
-        return _readJob;
-    }
-
     public boolean isRead()
     {
         return _readJob;
     }
-
-    public boolean isWrite()
+    
+    /**
+     * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+     *
+     * @param job The job.
+     * @param event   The event to hand off asynchronously.
+     */
+    public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
     {
-        return !_readJob;
-    }
 
+        job.add(event);
+
+
+        if(pool == null)
+        {
+            return;
+        }
+
+        // rather than perform additional checks on pool to check that it hasn't shutdown.
+        // catch the RejectedExecutionException that will result from executing on a shutdown pool
+        if (job.activate())
+        {
+            try
+            {
+                pool.execute(job);
+            }
+            catch(RejectedExecutionException e)
+            {
+                _logger.warn("Thread pool shutdown while tasks still outstanding");
+            }
+        }
 
+    }
+    
     /**
-     * Another interface for a continuation.
+     * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+     * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
      *
-     * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
-     *       Runnable or a custom Continuation interface.
+     * @param session The Mina session to work in.
+     * @param job     The job that completed.
      */
-    static interface JobCompletionHandler
+    public void completed()
+    {
+        if (!isComplete())
+        {
+            final ExecutorService pool = _poolReference.getPool();
+
+            if(pool == null)
+            {
+                return;
+            }
+
+
+            // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+            // Can the pool be shutdown at this point?
+            if (activate())
+            {
+                try
+                {
+                    pool.execute(this);
+                }
+                catch(RejectedExecutionException e)
+                {
+                    _logger.warn("Thread pool shutdown while tasks still outstanding");
+                }
+
+            }
+        }
+    }
+
+    public void notCompleted()
     {
-        public void completed(IoSession session, Job job);
+        final ExecutorService pool = _poolReference.getPool();
+
+        if(pool == null)
+        {
+            return;
+        }
 
-        public void notCompleted(final IoSession session, final Job job);
+        try
+        {
+            pool.execute(this);
+        }
+        catch(RejectedExecutionException e)
+        {
+            _logger.warn("Thread pool shutdown while tasks still outstanding");
+        }
     }
+    
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Thu Oct 15 01:06:23 2009
@@ -23,5 +23,4 @@
 public interface ReadWriteRunnable extends Runnable
 {
     boolean isRead();
-    boolean isWrite();
 }

Modified: qpid/trunk/qpid/java/systests/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/build.xml?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/build.xml (original)
+++ qpid/trunk/qpid/java/systests/build.xml Thu Oct 15 01:06:23 2009
@@ -20,7 +20,7 @@
  -->
 <project name="System Tests" default="build">
 
-    <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/>
+    <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/>
     <property name="module.test.src" location="src/main/java"/>
     <property name="module.test.excludes"
               value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java Thu Oct 15 01:06:23 2009
@@ -195,18 +195,12 @@
 
         // Send IO Exception - causing failover
         _connection.getProtocolHandler().
-                exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(),
-                                new WriteTimeoutException("WriteTimeoutException to cause failover."));
+                exception(new WriteTimeoutException("WriteTimeoutException to cause failover."));
 
         // Verify Failover occured through ConnectionListener
         assertTrue("Failover did not occur",
                    _failoverOccured.await(4000, TimeUnit.MILLISECONDS));
 
-        //Verify new protocolSession is not the same as the original
-        assertNotSame("Protocol Session has not changed",
-                      protocolSession,
-                      _connection.getProtocolHandler().getProtocolSession());
-
         /***********************************/
         // This verifies that the bug has been resolved
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Thu Oct 15 01:06:23 2009
@@ -630,9 +630,9 @@
         //around the connection close race during tearDown() causing sporadic failures
     	final CountDownLatch exceptionReceived = new CountDownLatch(1);
 
+        Connection conn = getConnection("server", "guest");
         try
         {
-            Connection conn = getConnection("server", "guest");
 
             conn.setExceptionListener(new ExceptionListener()
             {
@@ -649,7 +649,6 @@
             session.createTemporaryQueue();
 
             fail("Test failed as creation succeded.");
-            //conn will be automatically closed
         }
         catch (JMSException e)
         {
@@ -664,6 +663,17 @@
             assertTrue("Timed out waiting for conneciton to report close",
             		exceptionReceived.await(2, TimeUnit.SECONDS));
         }
+        finally
+        {
+        	try
+        	{
+        		conn.close();
+        	}
+        	catch (Exception e)
+        	{
+        		// This normally fails because we are denied
+        	}
+        }
     }
 
     public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Thu Oct 15 01:06:23 2009
@@ -20,27 +20,27 @@
  */
 package org.apache.qpid.test.unit.client.protocol;
 
-import org.apache.mina.common.IoSession;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.protocol.TestIoSession;
+import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.NetworkDriver;
 
 public class AMQProtocolSessionTest extends QpidTestCase
 {
     private static class AMQProtSession extends AMQProtocolSession
     {
 
-        public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+        public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
         {
-            super(protocolHandler,protocolSession,connection);
+            super(protocolHandler,connection);
         }
 
-        public TestIoSession getMinaProtocolSession()
+        public TestNetworkDriver getNetworkDriver()
         {
-            return (TestIoSession) _minaProtocolSession;
+            return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
         }
 
         public AMQShortString genQueueName()
@@ -63,8 +63,11 @@
     {
         super.setUp();
 
+        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+        AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+        protocolHandler.setNetworkDriver(new TestNetworkDriver());
         //don't care about the values set here apart from the dummy IoSession
-        _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest"));
+        _testSession = new AMQProtSession(protocolHandler , con);
 
         //initialise addresses for test and expected results
         _port = 123;
@@ -81,20 +84,20 @@
         AMQShortString testAddress;
 
         //test address with / and ; chars which generateQueueName should removeKey
-        _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
-        _testSession.getMinaProtocolSession().setLocalPort(_port);
+        _testSession.getNetworkDriver().setLocalAddress(_brokenAddress);
+        _testSession.getNetworkDriver().setPort(_port);
 
         testAddress = _testSession.genQueueName();
         assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
 
         //test empty address
-        _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
+        _testSession.getNetworkDriver().setLocalAddress(_emptyAddress);
 
         testAddress = _testSession.genQueueName();
         assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
 
         //test address with no special chars
-        _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
+        _testSession.getNetworkDriver().setLocalAddress(_validAddress);
 
         testAddress = _testSession.genQueueName();
         assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=825362&r1=825361&r2=825362&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Thu Oct 15 01:06:23 2009
@@ -960,7 +960,7 @@
         return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
     }
 
-    public Connection getConnection() throws Exception
+    public Connection getConnection() throws JMSException, NamingException
     {
         return getConnection("guest", "guest");
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message