qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1172657 [17/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/csha...
Date Mon, 19 Sep 2011 15:13:38 GMT
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Sep 19 15:13:18 2011
@@ -30,6 +30,8 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Session.State.NEW;
 import static org.apache.qpid.transport.Session.State.OPEN;
 import static org.apache.qpid.transport.Session.State.RESUMING;
+
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.transport.network.Frame;
 import static org.apache.qpid.transport.util.Functions.mod;
 import org.apache.qpid.transport.util.Logger;
@@ -42,7 +44,6 @@ import static org.apache.qpid.util.Seria
 import static org.apache.qpid.util.Strings.toUTF8;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -58,7 +59,6 @@ import java.util.concurrent.TimeUnit;
 
 public class Session extends SessionInvoker
 {
-
     private static final Logger log = Logger.get(Session.class);
 
     public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
@@ -92,7 +92,9 @@ public class Session extends SessionInvo
     private int channel;
     private SessionDelegate delegate;
     private SessionListener listener = new DefaultSessionListener();
-    private long timeout = 60000;
+    private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                        Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                     ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
     private boolean autoSync = false;
 
     private boolean incomingInit;
@@ -120,7 +122,9 @@ public class Session extends SessionInvo
 
     private Thread resumer = null;
     private boolean transacted = false;
-    
+    private SessionDetachCode detachCode;
+    private final Object stateLock = new Object();
+
     protected Session(Connection connection, Binary name, long expiry)
     {
         this(connection, new SessionDelegate(), name, expiry);
@@ -962,16 +966,29 @@ public class Session extends SessionInvo
 
     public void close()
     {
+        if (log.isDebugEnabled())
+        {
+            log.debug("Closing [%s] in state [%s]", this, state);
+        }
         synchronized (commands)
         {
-            state = CLOSING;
-            setClose(true);
-            sessionRequestTimeout(0);
-            sessionDetach(name.getBytes());
-
-            awaitClose();
- 
-
+            switch(state)
+            {
+                case DETACHED:
+                    state = CLOSED;
+                    delegate.closed(this);
+                    connection.removeSession(this);
+                    listener.closed(this);
+                    break;
+                case CLOSED:
+                    break;
+                default:
+                    state = CLOSING;
+                    setClose(true);
+                    sessionRequestTimeout(0);
+                    sessionDetach(name.getBytes());
+                    awaitClose();
+            }
         }
     }
 
@@ -1045,13 +1062,55 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
-    
+
     public void setTransacted(boolean b) {
         this.transacted = b;
     }
-    
+
     public boolean isTransacted(){
         return transacted;
     }
-    
+
+    public void setDetachCode(SessionDetachCode dtc)
+    {
+        this.detachCode = dtc;
+    }
+
+    public SessionDetachCode getDetachCode()
+    {
+        return this.detachCode;
+    }
+
+    public void awaitOpen()
+    {
+        switch (state)
+        {
+        case NEW:
+            synchronized(stateLock)
+            {
+                Waiter w = new Waiter(stateLock, timeout);
+                while (w.hasTime() && state == NEW)
+                {
+                    w.await();
+                }
+            }
+
+            if (state != OPEN)
+            {
+                throw new SessionException("Timed out waiting for Session to open");
+            }
+            break;
+        case DETACHED:
+        case CLOSING:
+        case CLOSED:
+            throw new SessionException("Session closed");
+        default :
+            break;
+        }
+    }
+
+    public Object getStateLock()
+    {
+        return stateLock;
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Mon Sep 19 15:13:18 2011
@@ -76,6 +76,10 @@ public class SessionDelegate
     @Override public void sessionAttached(Session ssn, SessionAttached atc)
     {
         ssn.setState(Session.State.OPEN);
+        synchronized (ssn.getStateLock())
+        {
+            ssn.getStateLock().notifyAll();
+        }
     }
 
     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -203,10 +207,18 @@ public class SessionDelegate
     public void closed(Session session)
     {
         log.debug("CLOSED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 
     public void detached(Session session)
     {
         log.debug("DETACHED: [%s]", session);
+        synchronized (session.getStateLock())
+        {
+            session.getStateLock().notifyAll();
+        }
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.transport.network;
 
+import javax.net.ssl.SSLContext;
+
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 
 public interface IncomingNetworkTransport extends NetworkTransport
 {
-    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory);
+    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext);
 }
\ No newline at end of file

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -29,6 +29,8 @@ public interface NetworkConnection
 {
     Sender<ByteBuffer> getSender();
 
+    void start();
+
     void close();
 
     /**

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -20,9 +20,11 @@
  */
 package org.apache.qpid.transport.network;
 
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
 public interface NetworkTransport
 {
     public void close();
-
-    public NetworkConnection getConnection();
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -22,11 +22,14 @@ package org.apache.qpid.transport.networ
 
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.ssl.SSLContextFactory;
+import javax.net.ssl.SSLContext;
+
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
 
 public interface OutgoingNetworkTransport extends NetworkTransport
 {
-    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory);
+    public NetworkConnection getConnection();
+
+    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext);
 }
\ No newline at end of file

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,115 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.TransportException;
+
 public class Transport
 {
+    public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
+    public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8";
+    public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9";
+    public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1";
+    public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10";
+    public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
+
+    // Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
+    private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
+
     public static final String TCP = "tcp";
+
+    private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP;
+
+    static
+    {
+        final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
+        map.put(ProtocolVersion.v8_0, IO_TRANSPORT_CLASSNAME);
+        map.put(ProtocolVersion.v0_9, IO_TRANSPORT_CLASSNAME);
+        map.put(ProtocolVersion.v0_91, IO_TRANSPORT_CLASSNAME);
+        map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
+
+        OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
+    }
+
+    public static IncomingNetworkTransport getIncomingTransportInstance()
+    {
+        return (IncomingNetworkTransport) loadTransportClass(
+                System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, IO_TRANSPORT_CLASSNAME));
+    }
+
+    public static OutgoingNetworkTransport getOutgoingTransportInstance(
+            final ProtocolVersion protocolVersion)
+    {
+
+        final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion);
+        final String networkTransportClassName;
+        if (overrride != null)
+        {
+            networkTransportClassName = overrride;
+        }
+        else
+        {
+            networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion);
+        }
+
+        return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName);
+    }
+
+    private static NetworkTransport loadTransportClass(final String networkTransportClassName)
+    {
+        if (networkTransportClassName == null)
+        {
+            throw new IllegalArgumentException("transport class name must not be null");
+        }
+
+        try
+        {
+            final Class<?> clazz = Class.forName(networkTransportClassName);
+            return (NetworkTransport) clazz.newInstance();
+        }
+        catch (InstantiationException e)
+        {
+            throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new TransportException("Access exception " + networkTransportClassName, e);
+        }
+        catch (ClassNotFoundException e)
+        {
+            throw new TransportException("Unable to load transport class " + networkTransportClassName, e);
+        }
+    }
+
+    private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion)
+    {
+        final String protocolSpecificSystemProperty;
+
+        if (ProtocolVersion.v0_10.equals(protocolVersion))
+        {
+            protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME;
+        }
+        else if (ProtocolVersion.v0_91.equals(protocolVersion))
+        {
+            protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME;
+        }
+        else if (ProtocolVersion.v0_9.equals(protocolVersion))
+        {
+            protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME;
+        }
+        else if (ProtocolVersion.v8_0.equals(protocolVersion))
+        {
+            protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME;
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion);
+        }
+
+        return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME));
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -37,7 +37,7 @@ public class IoNetworkConnection impleme
     private final long _timeout;
     private final IoSender _ioSender;
     private final IoReceiver _ioReceiver;
-    
+
     public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> delegate,
             int sendBufferSize, int receiveBufferSize, long timeout)
     {
@@ -45,9 +45,15 @@ public class IoNetworkConnection impleme
         _timeout = timeout;
 
         _ioReceiver = new IoReceiver(_socket, delegate, receiveBufferSize,_timeout);
+
         _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
+
         _ioSender.registerCloseListener(_ioReceiver);
 
+    }
+
+    public void start()
+    {
         _ioReceiver.initiate();
         _ioSender.initiate();
     }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Mon Sep 19 15:13:18 2011
@@ -21,41 +21,35 @@
 package org.apache.qpid.transport.network.io;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
+import java.net.*;
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.OutgoingNetworkTransport;
 import org.apache.qpid.transport.util.Logger;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport
+public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
 {
-    static
-    {
-        org.apache.mina.common.ByteBuffer.setAllocator
-            (new org.apache.mina.common.SimpleByteBufferAllocator());
-        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
-            (Boolean.getBoolean("amqj.enableDirectBuffers"));
-    }
 
     private static final Logger LOGGER = Logger.get(IoNetworkTransport.class);
 
     private Socket _socket;
     private IoNetworkConnection _connection;
     private long _timeout = 60000;
-    
-    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+    private AcceptingThread _acceptor;
+
+    public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContext sslContext)
     {
         int sendBufferSize = settings.getWriteBufferSize();
         int receiveBufferSize = settings.getReadBufferSize();
-        
+
         try
         {
             _socket = new Socket();
@@ -83,6 +77,7 @@ public class IoNetworkTransport implemen
         try
         {
             _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout);
+            _connection.start();
         }
         catch(Exception e)
         {
@@ -103,11 +98,134 @@ public class IoNetworkTransport implemen
 
     public void close()
     {
-        _connection.close();
+        if(_connection != null)
+        {
+            _connection.close();
+        }
+        if(_acceptor != null)
+        {
+            _acceptor.close();
+        }
     }
 
     public NetworkConnection getConnection()
     {
         return _connection;
     }
+
+    public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContext sslContext)
+    {
+
+        try
+        {
+            _acceptor = new AcceptingThread(config, factory, sslContext);
+
+            _acceptor.start();
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Unable to start server socket", e);
+        }
+
+
+    }
+
+    private class AcceptingThread extends Thread
+    {
+        private NetworkTransportConfiguration _config;
+        private ProtocolEngineFactory _factory;
+        private SSLContext _sslContent;
+        private ServerSocket _serverSocket;
+
+        private AcceptingThread(NetworkTransportConfiguration config,
+                                ProtocolEngineFactory factory,
+                                SSLContext sslContext)
+                throws IOException
+        {
+            _config = config;
+            _factory = factory;
+            _sslContent = sslContext;
+
+            InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort());
+
+            if(sslContext == null)
+            {
+                _serverSocket = new ServerSocket();
+            }
+            else
+            {
+                SSLServerSocketFactory socketFactory = sslContext.getServerSocketFactory();
+                _serverSocket = socketFactory.createServerSocket();
+            }
+
+            _serverSocket.bind(address);
+            _serverSocket.setReuseAddress(true);
+
+
+        }
+
+
+        /**
+            Close the underlying ServerSocket if it has not already been closed.
+         */
+        public void close()
+        {
+            if (!_serverSocket.isClosed())
+            {
+                try
+                {
+                    _serverSocket.close();
+                }
+                catch (IOException e)
+                {
+                    throw new TransportException(e);
+                }
+            }
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                while (true)
+                {
+                    try
+                    {
+                        Socket socket = _serverSocket.accept();
+                        socket.setTcpNoDelay(_config.getTcpNoDelay());
+
+                        final Integer sendBufferSize = _config.getSendBufferSize();
+                        final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+                        socket.setSendBufferSize(sendBufferSize);
+                        socket.setReceiveBufferSize(receiveBufferSize);
+
+                        ProtocolEngine engine = _factory.newProtocolEngine();
+
+                        NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout);
+
+
+                        engine.setNetworkConnection(connection, connection.getSender());
+
+                        connection.start();
+
+
+                    }
+                    catch(RuntimeException e)
+                    {
+                        LOGGER.error(e, "Error in Acceptor thread " + _config.getPort());
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.debug(e, "SocketException - no new connections will be accepted on port "
+                        + _config.getPort());
+            }
+        }
+
+
+    }
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Mon Sep 19 15:13:18 2011
@@ -92,13 +92,23 @@ final class IoReceiver implements Runnab
         {
             try
             {
-                if (shutdownBroken)
+                try
                 {
-                   socket.close();
+                    if (shutdownBroken)
+                    {
+                       socket.close();
+                    }
+                    else
+                    {
+                        socket.shutdownInput();
+                    }
                 }
-                else
+                catch(SocketException se)
                 {
-                    socket.shutdownInput();
+                    if(!socket.isClosed() && !socket.isInputShutdown())
+                    {
+                        throw se;
+                    }
                 }
                 if (block && Thread.currentThread() != receiverThread)
                 {
@@ -117,6 +127,7 @@ final class IoReceiver implements Runnab
             {
                 throw new TransportException(e);
             }
+
         }
     }
 

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Mon Sep 19 15:13:18 2011
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.util.Logger;
@@ -59,7 +60,7 @@ public final class IoSender implements R
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final Thread senderThread;
     private final List<Closeable> _listeners = new ArrayList<Closeable>();
-    
+
     private volatile Throwable exception = null;
 
     public IoSender(Socket socket, int bufferSize, long timeout)
@@ -80,13 +81,13 @@ public final class IoSender implements R
         try
         {
             //Create but deliberately don't start the thread.
-            senderThread = Threading.getThreadFactory().createThread(this);                      
+            senderThread = Threading.getThreadFactory().createThread(this);
         }
         catch(Exception e)
         {
             throw new Error("Error creating IOSender thread",e);
         }
-        
+
         senderThread.setDaemon(true);
         senderThread.setName(String.format("IoSender - %s", socket.getRemoteSocketAddress()));
     }
@@ -110,7 +111,7 @@ public final class IoSender implements R
     {
         if (closed.get())
         {
-            throw new SenderException("sender is closed", exception);
+            throw new SenderClosedException("sender is closed", exception);
         }
 
         final int size = buffer.length;
@@ -143,7 +144,7 @@ public final class IoSender implements R
 
                     if (closed.get())
                     {
-                        throw new SenderException("sender is closed", exception);
+                        throw new SenderClosedException("sender is closed", exception);
                     }
 
                     if (head - tail >= size)
@@ -255,7 +256,7 @@ public final class IoSender implements R
 
     public void run()
     {
-        final int size = buffer.length;       
+        final int size = buffer.length;
         while (true)
         {
             final int hd = head;

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java Mon Sep 19 15:13:18 2011
@@ -25,8 +25,8 @@ import java.nio.ByteBuffer;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionListener;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
@@ -37,149 +37,12 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.security.ssl.SSLSender;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class SecurityLayer
+public interface SecurityLayer
 {
-    ConnectionSettings settings;
-    Connection con;
-    SSLSecurityLayer sslLayer;
-    SASLSecurityLayer saslLayer;
-    
-    public void init(Connection con) throws TransportException
-    {
-        this.con = con;
-        this.settings = con.getConnectionSettings();
-        if (settings.isUseSSL())
-        {
-            sslLayer = new SSLSecurityLayer();
-        }
-        if (settings.isUseSASLEncryption())
-        {
-            saslLayer = new SASLSecurityLayer();
-        }        
-        
-    }
-    
-    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
-    {
-        Sender<ByteBuffer> sender = delegate;
-        
-        if (settings.isUseSSL())
-        {
-            sender = sslLayer.sender(sender);
-        }     
-        
-        if (settings.isUseSASLEncryption())
-        {
-            sender = saslLayer.sender(sender);
-        }
-        
-        return sender;
-    }
-    
-    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate)
-    {
-        Receiver<ByteBuffer> receiver = delegate;
-        
-        if (settings.isUseSSL())
-        {
-            receiver = sslLayer.receiver(receiver);
-        }        
-        
-        if (settings.isUseSASLEncryption())
-        {
-            receiver = saslLayer.receiver(receiver);
-        }
-        
-        return receiver;
-    }
-    
-    public String getUserID()
-    {
-        if (settings.isUseSSL())
-        {
-            return sslLayer.getUserID();
-        }
-        else
-        {
-            return null;
-        }
-    }
-    
-    class SSLSecurityLayer
-    {
-        SSLEngine engine;
-        SSLSender sender;
-                
-        public SSLSecurityLayer() 
-        {
-            SSLContext sslCtx;
-            try
-            {
-                sslCtx = SSLUtil.createSSLContext(settings);
-            }
-            catch (Exception e)
-            {
-                throw new TransportException("Error creating SSL Context", e);
-            }
-            
-            try
-            {
-                engine = sslCtx.createSSLEngine();
-                engine.setUseClientMode(true);
-            }
-            catch(Exception e)
-            {
-                throw new TransportException("Error creating SSL Engine", e);
-            }
-        }
-        
-        public SSLSender sender(Sender<ByteBuffer> delegate)
-        {
-            sender = new SSLSender(engine,delegate);
-            sender.setConnectionSettings(settings);
-            return sender;
-        }
-        
-        public SSLReceiver receiver(Receiver<ByteBuffer> delegate)
-        {
-            if (sender == null)
-            {
-                throw new  
-                IllegalStateException("SecurityLayer.sender method should be " +
-                		"invoked before SecurityLayer.receiver");
-            }
-            
-            SSLReceiver receiver = new SSLReceiver(engine,delegate,sender);
-            receiver.setConnectionSettings(settings);
-            return receiver;
-        }
-        
-        public String getUserID()
-        {
-            return SSLUtil.retriveIdentity(engine);
-        }
-        
-    }
-    
-    class SASLSecurityLayer
-    {
-        public SASLSecurityLayer() 
-        {
-        }
-        
-        public SASLSender sender(Sender<ByteBuffer> delegate)
-        {
-            SASLSender sender = new SASLSender(delegate);
-            con.addConnectionListener((ConnectionListener)sender);
-            return sender;
-        }
-        
-        public SASLReceiver receiver(Receiver<ByteBuffer> delegate)
-        {
-            SASLReceiver receiver = new SASLReceiver(delegate);
-            con.addConnectionListener((ConnectionListener)receiver);
-            return receiver;
-        }
-        
-    }
+
+    public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate);
+    public Receiver<ByteBuffer> receiver(Receiver<ByteBuffer> delegate);
+    public String getUserID();
+
 }
+

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.transport.network.security.ssl;
 
+import java.io.IOException;
 import java.net.Socket;
+import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.Principal;
 import java.security.PrivateKey;
@@ -40,7 +42,7 @@ public class QpidClientX509KeyManager ex
     String alias;
     
     public QpidClientX509KeyManager(String alias, String keyStorePath,
-                           String keyStorePassword,String keyStoreCertType) throws Exception
+                           String keyStorePassword,String keyStoreCertType) throws GeneralSecurityException, IOException
     {
         this.alias = alias;    
         KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath,keyStorePassword);

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java Mon Sep 19 15:13:18 2011
@@ -24,43 +24,43 @@ import java.nio.ByteBuffer;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
 
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.security.SSLStatus;
 import org.apache.qpid.transport.util.Logger;
 
 public class SSLReceiver implements Receiver<ByteBuffer>
 {
-    private Receiver<ByteBuffer> delegate;
-    private SSLEngine engine;
-    private SSLSender sender;
-    private int sslBufSize;
+    private static final Logger log = Logger.get(SSLReceiver.class);
+
+    private final Receiver<ByteBuffer> delegate;
+    private final SSLEngine engine;
+    private final int sslBufSize;
+    private final ByteBuffer localBuffer;
+    private final SSLStatus _sslStatus;
     private ByteBuffer appData;
-    private ByteBuffer localBuffer;
     private boolean dataCached = false;
-    private final Object notificationToken;
-    private ConnectionSettings settings;
-    
-    private static final Logger log = Logger.get(SSLReceiver.class);
 
-    public SSLReceiver(SSLEngine engine, Receiver<ByteBuffer> delegate,SSLSender sender)
+    private String _hostname;
+
+    public SSLReceiver(final SSLEngine engine, final Receiver<ByteBuffer> delegate, final SSLStatus sslStatus)
     {
         this.engine = engine;
         this.delegate = delegate;
-        this.sender = sender;
         this.sslBufSize = engine.getSession().getApplicationBufferSize();
         appData = ByteBuffer.allocate(sslBufSize);
         localBuffer = ByteBuffer.allocate(sslBufSize);
-        notificationToken = sender.getNotificationToken();
+        _sslStatus = sslStatus;
     }
 
-    public void setConnectionSettings(ConnectionSettings settings)
+    public void setHostname(String hostname)
     {
-        this.settings = settings;
+        _hostname = hostname;
     }
     
     public void closed()
@@ -102,9 +102,9 @@ public class SSLReceiver implements Rece
             try
             {
                 SSLEngineResult result = engine.unwrap(netData, appData);
-                synchronized (notificationToken)
+                synchronized (_sslStatus.getSslLock())
                 {
-                    notificationToken.notifyAll();
+                    _sslStatus.getSslLock().notifyAll();
                 }
 
                 int read = result.bytesProduced();
@@ -129,9 +129,9 @@ public class SSLReceiver implements Rece
                 switch(status)
                 {
                     case CLOSED:
-                        synchronized(notificationToken)
+                        synchronized(_sslStatus.getSslLock())
                         {
-                            notificationToken.notifyAll();
+                            _sslStatus.getSslLock().notifyAll();
                         }
                         return;
 
@@ -163,20 +163,20 @@ public class SSLReceiver implements Rece
                         break;
 
                     case NEED_TASK:
-                        sender.doTasks();
+                        doTasks();
                         handshakeStatus = engine.getHandshakeStatus();
 
                     case FINISHED:
-                        if (this.settings != null && this.settings.isVerifyHostname() )
+                        if (_hostname != null)
                         {
-                            SSLUtil.verifyHostname(engine, this.settings.getHost());
+                            SSLUtil.verifyHostname(engine, _hostname);
                         }
                             
                     case NEED_WRAP:                        
                     case NOT_HANDSHAKING:
-                        synchronized(notificationToken)
+                        synchronized(_sslStatus.getSslLock())
                         {
-                            notificationToken.notifyAll();
+                            _sslStatus.getSslLock().notifyAll();
                         }
                         break;
 
@@ -189,14 +189,23 @@ public class SSLReceiver implements Rece
             catch(SSLException e)
             {
                 log.error(e, "Error caught in SSLReceiver");
-                sender.setErrorFlag();
-                synchronized(notificationToken)
+                _sslStatus.setSslErrorFlag();
+                synchronized(_sslStatus.getSslLock())
                 {
-                    notificationToken.notifyAll();
+                    _sslStatus.getSslLock().notifyAll();
                 }                
                 exception(new TransportException("Error in SSLReceiver",e));
             }
 
         }
     }
+
+    private void doTasks()
+    {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Mon Sep 19 15:13:18 2011
@@ -31,35 +31,38 @@ import javax.net.ssl.SSLEngineResult.Sta
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
 import org.apache.qpid.transport.util.Logger;
 
 public class SSLSender implements Sender<ByteBuffer>
 {
-    private Sender<ByteBuffer> delegate;
-    private SSLEngine engine;
-    private int sslBufSize;
-    private ByteBuffer netData;
-    private long timeout = 30000;
-    private ConnectionSettings settings;
-    
-    private final Object engineState = new Object();
+    private static final Logger log = Logger.get(SSLSender.class);
+
+    private final Sender<ByteBuffer> delegate;
+    private final SSLEngine engine;
+    private final int sslBufSize;
+    private final ByteBuffer netData;
+    private final long timeout;
+    private final SSLStatus _sslStatus;
+
+    private String _hostname;
+
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final AtomicBoolean error = new AtomicBoolean(false);
 
-    private static final Logger log = Logger.get(SSLSender.class);
 
-    public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate)
+    public SSLSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
     {
         this.engine = engine;
         this.delegate = delegate;
         sslBufSize = engine.getSession().getPacketBufferSize();
         netData = ByteBuffer.allocate(sslBufSize);
         timeout = Long.getLong("qpid.ssl_timeout", 60000);
+        _sslStatus = sslStatus;
     }
     
-    public void setConnectionSettings(ConnectionSettings settings)
+    public void setHostname(String hostname)
     {
-        this.settings = settings;
+        _hostname = hostname;
     }
 
     public void close()
@@ -83,13 +86,13 @@ public class SSLSender implements Sender
             }
 
 
-            synchronized(engineState)
+            synchronized(_sslStatus.getSslLock())
             {
                 while (!engine.isOutboundDone())
                 {
                     try
                     {
-                        engineState.wait();
+                        _sslStatus.getSslLock().wait();
                     }
                     catch(InterruptedException e)
                     {
@@ -148,7 +151,7 @@ public class SSLSender implements Sender
         HandshakeStatus handshakeStatus;
         Status status;
 
-        while(appData.hasRemaining() && !error.get())
+        while(appData.hasRemaining() && !_sslStatus.getSslErrorFlag())
         {
             int read = 0;
             try
@@ -160,6 +163,7 @@ public class SSLSender implements Sender
             }
             catch(SSLException e)
             {
+                // Should this set _sslError??
                 throw new SenderException("SSL, Error occurred while encrypting data",e);
             }
 
@@ -207,7 +211,7 @@ public class SSLSender implements Sender
 
                 case NEED_UNWRAP:
                     flush();
-                    synchronized(engineState)
+                    synchronized(_sslStatus.getSslLock())
                     {
                         switch (engine.getHandshakeStatus())
                         {
@@ -215,7 +219,7 @@ public class SSLSender implements Sender
                             long start = System.currentTimeMillis();
                             try
                             {
-                                engineState.wait(timeout);
+                                _sslStatus.getSslLock().wait(timeout);
                             }
                             catch(InterruptedException e)
                             {
@@ -234,9 +238,9 @@ public class SSLSender implements Sender
                     break;
 
                 case FINISHED:
-                    if (this.settings != null && this.settings.isVerifyHostname() )
+                    if (_hostname != null)
                     {
-                        SSLUtil.verifyHostname(engine, this.settings.getHost());
+                        SSLUtil.verifyHostname(engine, _hostname);
                     }
                     
                 case NOT_HANDSHAKING:
@@ -249,7 +253,7 @@ public class SSLSender implements Sender
         }
     }
 
-    public void doTasks()
+    private void doTasks()
     {
         Runnable runnable;
         while ((runnable = engine.getDelegatedTask()) != null) {
@@ -257,16 +261,6 @@ public class SSLSender implements Sender
         }
     }
 
-    public Object getNotificationToken()
-    {
-        return engineState;
-    }
-    
-    public void setErrorFlag()
-    {
-        error.set(true);
-    }
-
     public void setIdleTimeout(int i)
     {
         delegate.setIdleTimeout(i);

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java Mon Sep 19 15:13:18 2011
@@ -125,38 +125,6 @@ public class SSLUtil
         return id.toString();
     }
     
-    public static SSLContext createSSLContext(ConnectionSettings settings) throws Exception
-    {
-        SSLContextFactory sslContextFactory;
-        
-        if (settings.getCertAlias() == null)
-        {
-            sslContextFactory = 
-                new SSLContextFactory(settings.getTrustStorePath(),
-                                      settings.getTrustStorePassword(),
-                                      settings.getTrustStoreCertType(),
-                                      settings.getKeyStorePath(),
-                                      settings.getKeyStorePassword(),
-                                      settings.getKeyStoreCertType());
-
-        } else
-        {
-            sslContextFactory = 
-                new SSLContextFactory(settings.getTrustStorePath(),
-                                      settings.getTrustStorePassword(),
-                                      settings.getTrustStoreCertType(),
-                    new QpidClientX509KeyManager(settings.getCertAlias(),
-                                                     settings.getKeyStorePath(),
-                                                     settings.getKeyStorePassword(),
-                                                     settings.getKeyStoreCertType()));
-            
-            log.debug("Using custom key manager");
-        }
-
-        return sslContextFactory.buildServerContext();
-        
-    }
-    
     public static KeyStore getInitializedKeyStore(String storePath, String storePassword) throws GeneralSecurityException, IOException
     {
         KeyStore ks = KeyStore.getInstance("JKS");
@@ -176,7 +144,10 @@ public class SSLUtil
             {
                 throw new IOException("Unable to load keystore resource: " + storePath);
             }
-            ks.load(in, storePassword.toCharArray());
+
+            char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray();
+
+            ks.load(in, storeCharPassword);
         }
         finally
         {

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java Mon Sep 19 15:13:18 2011
@@ -143,8 +143,9 @@ public class FileUtils
     }
 
     /**
-     * Either opens the specified filename as an input stream, or uses the default resource loaded using the
-     * specified class loader, if opening the file fails or no file name is specified.
+     * Either opens the specified filename as an input stream or either the filesystem or classpath,
+     * or uses the default resource loaded using the specified class loader, if opening the file fails
+     * or no file name is specified.
      *
      * @param filename        The name of the file to open.
      * @param defaultResource The name of the default resource on the classpath if the file cannot be opened.
@@ -156,28 +157,28 @@ public class FileUtils
     {
         InputStream is = null;
 
-        // Flag to indicate whether the default resource should be used. By default this is true, so that the default
-        // is used when opening the file fails.
-        boolean useDefault = true;
-
         // Try to open the file if one was specified.
         if (filename != null)
         {
+            // try on filesystem
             try
             {
                 is = new BufferedInputStream(new FileInputStream(new File(filename)));
-
-                // Clear the default flag because the file was succesfully opened.
-                useDefault = false;
             }
             catch (FileNotFoundException e)
             {
-                // Ignore this exception, the default will be used instead.
+                is = null;
+            }
+
+            if (is == null)
+            {
+                // failed on filesystem, so try on classpath
+                is = cl.getResourceAsStream(filename);
             }
         }
 
         // Load the default resource if a file was not specified, or if opening the file failed.
-        if (useDefault)
+        if (is == null)
         {
             is = cl.getResourceAsStream(defaultResource);
         }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Mon Sep 19 15:13:18 2011
@@ -21,6 +21,9 @@ package org.apache.qpid.codec;
  */
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -46,9 +49,16 @@ public class AMQDecoderTest extends Test
     }
    
     
-    public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    private ByteBuffer getHeartbeatBodyBuffer() throws IOException
     {
-        ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        HeartbeatBody.FRAME.writePayload(new DataOutputStream(baos));
+        return ByteBuffer.wrap(baos.toByteArray());
+    }
+    
+    public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
+    {
+        ByteBuffer msg = getHeartbeatBodyBuffer();
         ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
         if (frames.get(0) instanceof AMQFrame)
         {
@@ -60,9 +70,9 @@ public class AMQDecoderTest extends Test
         }
     }
     
-    public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
     {
-        ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msg = getHeartbeatBodyBuffer();
         ByteBuffer msgA = msg.slice();
         int msgbPos = msg.remaining() / 2;
         int msgaLimit = msg.remaining() - msgbPos;
@@ -83,10 +93,10 @@ public class AMQDecoderTest extends Test
         }
     }
     
-    public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
     {
-        ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
-        ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgA = getHeartbeatBodyBuffer();
+        ByteBuffer msgB = getHeartbeatBodyBuffer();
         ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining());
         msg.put(msgA);
         msg.put(msgB);
@@ -106,11 +116,11 @@ public class AMQDecoderTest extends Test
         }
     }
     
-    public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+    public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException
     {
-        ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
-        ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
-        ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer();
+        ByteBuffer msgA = getHeartbeatBodyBuffer();
+        ByteBuffer msgB = getHeartbeatBodyBuffer();
+        ByteBuffer msgC = getHeartbeatBodyBuffer();
         
         ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2);
         sliceA.put(msgA);

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Mon Sep 19 15:13:18 2011
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
 import junit.framework.TestCase;
 
+import java.io.*;
+
 
 public class BasicContentHeaderPropertiesTest extends TestCase
 {
@@ -76,15 +76,14 @@ public class BasicContentHeaderPropertie
         assertEquals(99, _testProperties.getPropertyFlags());
     }
 
-    public void testWritePropertyListPayload()
+    public void testWritePropertyListPayload() throws IOException
     {
-        ByteBuffer buf = ByteBuffer.allocate(300);
-        _testProperties.writePropertyListPayload(buf);
+        _testProperties.writePropertyListPayload(new DataOutputStream(new ByteArrayOutputStream(300)));
     }
 
     public void testPopulatePropertiesFromBuffer() throws Exception
     {
-        ByteBuffer buf = ByteBuffer.allocate(300);
+        DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300]));
         _testProperties.populatePropertiesFromBuffer(buf, 99, 99);
     }
 

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java Mon Sep 19 15:13:18 2011
@@ -23,14 +23,14 @@ package org.apache.qpid.framing;
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQPInvalidClassException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.*;
+
 public class PropertyFieldTableTest extends TestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class);
@@ -441,7 +441,7 @@ public class PropertyFieldTableTest exte
     }
 
     /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */
-    public void testNestedFieldTable()
+    public void testNestedFieldTable() throws IOException
     {
         byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 };
 
@@ -465,14 +465,16 @@ public class PropertyFieldTableTest exte
         outerTable.setFieldTable("innerTable", innerTable);
 
         // Write the outer table into the buffer.
-        final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4);
-        outerTable.writeToBuffer(buffer);
-        buffer.flip();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        outerTable.writeToBuffer(new DataOutputStream(baos));
+
+        byte[] data = baos.toByteArray();
 
         // Extract the table back from the buffer again.
         try
         {
-            FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer);
+            FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data)));
 
             FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable");
 
@@ -567,7 +569,7 @@ public class PropertyFieldTableTest exte
         Assert.assertEquals("Hello", table.getObject("object-string"));
     }
 
-    public void testwriteBuffer()
+    public void testwriteBuffer() throws IOException
     {
         byte[] bytes = { 99, 98, 97, 96, 95 };
 
@@ -585,15 +587,17 @@ public class PropertyFieldTableTest exte
         table.setString("string", "hello");
         table.setString("null-string", null);
 
-        final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem?
 
-        table.writeToBuffer(buffer);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4);
+        table.writeToBuffer(new DataOutputStream(baos));
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+        DataInputStream dis = new DataInputStream(bais);
 
-        buffer.flip();
 
-        long length = buffer.getUnsignedInt();
+        long length = dis.readInt() & 0xFFFFFFFFL;
 
-        FieldTable table2 = new FieldTable(buffer, length);
+        FieldTable table2 = new FieldTable(dis, length);
 
         Assert.assertEquals((Boolean) true, table2.getBoolean("bool"));
         Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte"));

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java Mon Sep 19 15:13:18 2011
@@ -24,18 +24,28 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.util.*;
 
 import junit.framework.TestCase;
 import junit.framework.TestResult;
 
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.mina.util.AvailablePortFinder;
+
 
 public class QpidTestCase extends TestCase
 {
-    protected static final Logger _logger = Logger.getLogger(QpidTestCase.class);
+    public static final String QPID_HOME = System.getProperty("QPID_HOME");
+    public static final String TEST_RESOURCES_DIR = QPID_HOME + "/../test-profiles/test_resources/";
+
+    private static final Logger _logger = Logger.getLogger(QpidTestCase.class);
+
+    private final Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>();
+    private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>();
+
+    private String _testName;
 
     /**
      * Some tests are excluded when the property test.excludes is set to true.
@@ -129,8 +139,186 @@ public class QpidTestCase extends TestCa
         return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ;
     }
 
+
+    public static final int MIN_PORT_NUMBER = 1;
+    public static final int MAX_PORT_NUMBER = 49151;
+
+
+    /**
+     * Gets the next available port starting at a port.
+     *
+     * @param fromPort the port to scan for availability
+     * @throws NoSuchElementException if there are no ports available
+     */
+    protected int getNextAvailable(int fromPort)
+    {
+        if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER))
+        {
+            throw new IllegalArgumentException("Invalid start port: " + fromPort);
+        }
+
+        for (int i = fromPort; i <= MAX_PORT_NUMBER; i++)
+        {
+            if (available(i)) {
+                return i;
+            }
+        }
+
+        throw new NoSuchElementException("Could not find an available port above " + fromPort);
+    }
+
+    /**
+     * Checks to see if a specific port is available.
+     *
+     * @param port the port to check for availability
+     */
+    private boolean available(int port)
+    {
+        if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER))
+        {
+            throw new IllegalArgumentException("Invalid start port: " + port);
+        }
+
+        ServerSocket ss = null;
+        DatagramSocket ds = null;
+        try
+        {
+            ss = new ServerSocket(port);
+            ss.setReuseAddress(true);
+            ds = new DatagramSocket(port);
+            ds.setReuseAddress(true);
+            return true;
+        }
+        catch (IOException e)
+        {
+        }
+        finally
+        {
+            if (ds != null)
+            {
+                ds.close();
+            }
+
+            if (ss != null)
+            {
+                try
+                {
+                    ss.close();
+                }
+                catch (IOException e)
+                {
+                    /* should not be thrown */
+                }
+            }
+        }
+
+        return false;
+    }
+
     public int findFreePort()
     {
-        return AvailablePortFinder.getNextAvailable(10000);
+        return getNextAvailable(10000);
+    }
+
+    /**
+     * Set a System property for duration of this test only. The tearDown will
+     * guarantee to reset the property to its previous value after the test
+     * completes.
+     *
+     * @param property The property to set
+     * @param value the value to set it to, if null, the property will be cleared
+     */
+    protected void setTestSystemProperty(final String property, final String value)
+    {
+        if (!_propertiesSetForTest.containsKey(property))
+        {
+            // Record the current value so we can revert it later.
+            _propertiesSetForTest.put(property, System.getProperty(property));
+        }
+
+        if (value == null)
+        {
+            System.clearProperty(property);
+        }
+        else
+        {
+            System.setProperty(property, value);
+        }
+    }
+
+    /**
+     * Restore the System property values that were set by this test run.
+     */
+    protected void revertTestSystemProperties()
+    {
+        if(!_propertiesSetForTest.isEmpty())
+        {
+            _logger.debug("reverting " + _propertiesSetForTest.size() + " test properties");
+            for (String key : _propertiesSetForTest.keySet())
+            {
+                String value = _propertiesSetForTest.get(key);
+                if (value != null)
+                {
+                    System.setProperty(key, value);
+                }
+                else
+                {
+                    System.clearProperty(key);
+                }
+            }
+
+            _propertiesSetForTest.clear();
+        }
+    }
+
+    /**
+     * Adjust the VMs Log4j Settings just for this test run
+     *
+     * @param logger the logger to change
+     * @param level the level to set
+     */
+    protected void setLoggerLevel(Logger logger, Level level)
+    {
+        assertNotNull("Cannot set level of null logger", logger);
+        assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level);
+
+        if (!_loggerLevelSetForTest.containsKey(logger))
+        {
+            // Record the current value so we can revert it later.
+            _loggerLevelSetForTest.put(logger, logger.getLevel());
+        }
+
+        logger.setLevel(level);
+    }
+
+    /**
+     * Restore the logging levels defined by this test.
+     */
+    protected void revertLoggingLevels()
+    {
+        for (Logger logger : _loggerLevelSetForTest.keySet())
+        {
+            logger.setLevel(_loggerLevelSetForTest.get(logger));
+        }
+
+        _loggerLevelSetForTest.clear();
+    }
+
+    protected void tearDown() throws java.lang.Exception
+    {
+        _logger.info("========== tearDown " + _testName + " ==========");
+        revertTestSystemProperties();
+        revertLoggingLevels();
+    }
+
+    protected void setUp() throws Exception
+    {
+        _testName = getClass().getSimpleName() + "." + getName();
+        _logger.info("========== start " + _testName + " ==========");
+    }
+
+    protected String getTestName()
+    {
+        return _testName;
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Mon Sep 19 15:13:18 2011
@@ -20,32 +20,27 @@
  */
 package org.apache.qpid.transport;
 
-import org.apache.mina.util.AvailablePortFinder;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
+import static org.apache.qpid.transport.Option.EXPECTED;
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
 
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.io.IOException;
 
-import static org.apache.qpid.transport.Option.*;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+import org.apache.qpid.transport.util.Waiter;
 
 /**
  * ConnectionTest
  */
-
 public class ConnectionTest extends QpidTestCase implements SessionListener
 {
-
-    private static final Logger log = Logger.get(ConnectionTest.class);
-
     private int port;
     private volatile boolean queue = false;
     private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
@@ -58,7 +53,7 @@ public class ConnectionTest extends Qpid
     {
         super.setUp();
 
-        port = AvailablePortFinder.getNextAvailable(12000);
+        port = findFreePort();
     }
 
     protected void tearDown() throws Exception
@@ -158,7 +153,8 @@ public class ConnectionTest extends Qpid
 
     private Connection connect(final CountDownLatch closed)
     {
-        Connection conn = new Connection();
+        final Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.addConnectionListener(new ConnectionListener()
         {
             public void opened(Connection conn) {}
@@ -182,9 +178,9 @@ public class ConnectionTest extends Qpid
     {
         // Force os.name to be windows to exercise code in IoReceiver
         // that looks for the value of os.name
-        System.setProperty("os.name","windows");
+        setTestSystemProperty("os.name","windows");
 
-        // Start server as 0-9 to froce a ProtocolVersionException
+        // Start server as 0-9 to force a ProtocolVersionException
         startServer(new ProtocolHeader(1, 0, 9));
         
         CountDownLatch closed = new CountDownLatch(1);
@@ -219,7 +215,7 @@ public class ConnectionTest extends Qpid
                 conn.send(protocolHeader);
                 List<Object> utf8 = new ArrayList<Object>();
                 utf8.add("utf8");
-                conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
+                conn.connectionStart(null, Collections.emptyList(), utf8);
             }
 
             @Override
@@ -270,40 +266,7 @@ public class ConnectionTest extends Qpid
         }
     }
 
-    class FailoverConnectionListener implements ConnectionListener
-    {
-        public void opened(Connection conn) {}
-
-        public void exception(Connection conn, ConnectionException e)
-        {
-            throw e;
-        }
-
-        public void closed(Connection conn)
-        {
-            queue = true;
-            conn.connect("localhost", port, null, "guest", "guest");
-            conn.resume();
-        }
-    }
-
-    class TestSessionListener implements SessionListener
-    {
-        public void opened(Session s) {}
-        public void resumed(Session s) {}
-        public void exception(Session s, SessionException e) {}
-        public void message(Session s, MessageTransfer xfr)
-        {
-            synchronized (incoming)
-            {
-                incoming.add(xfr);
-                incoming.notifyAll();
-            }
 
-            s.processed(xfr);
-        }
-        public void closed(Session s) {}
-    }
 
     public void testResumeNonemptyReplayBuffer() throws Exception
     {
@@ -311,6 +274,7 @@ public class ConnectionTest extends Qpid
 
         Connection conn = new Connection();
         conn.addConnectionListener(new FailoverConnectionListener());
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession(1);
         ssn.setSessionListener(new TestSessionListener());
@@ -365,6 +329,7 @@ public class ConnectionTest extends Qpid
         startServer();
 
         Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.addConnectionListener(new FailoverConnectionListener());
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession(1);
@@ -387,6 +352,7 @@ public class ConnectionTest extends Qpid
         startServer();
 
         Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession();
         ssn.sessionFlush(EXPECTED);
@@ -400,6 +366,7 @@ public class ConnectionTest extends Qpid
     {
         startServer();
         Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.connect("localhost", port, null, "guest", "guest");
         conn.connectionHeartbeat();
         conn.close();
@@ -410,6 +377,7 @@ public class ConnectionTest extends Qpid
         startServer();
 
         Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession();
         send(ssn, "EXCP 0");
@@ -429,6 +397,7 @@ public class ConnectionTest extends Qpid
         startServer();
 
         Connection conn = new Connection();
+        conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings()));
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession();
         send(ssn, "EXCP 0", true);
@@ -443,4 +412,38 @@ public class ConnectionTest extends Qpid
         }
     }
 
+    class FailoverConnectionListener implements ConnectionListener
+    {
+        public void opened(Connection conn) {}
+
+        public void exception(Connection conn, ConnectionException e)
+        {
+            throw e;
+        }
+
+        public void closed(Connection conn)
+        {
+            queue = true;
+            conn.connect("localhost", port, null, "guest", "guest");
+            conn.resume();
+        }
+    }
+
+    class TestSessionListener implements SessionListener
+    {
+        public void opened(Session s) {}
+        public void resumed(Session s) {}
+        public void exception(Session s, SessionException e) {}
+        public void message(Session s, MessageTransfer xfr)
+        {
+            synchronized (incoming)
+            {
+                incoming.add(xfr);
+                incoming.notifyAll();
+            }
+
+            s.processed(xfr);
+        }
+        public void closed(Session s) {}
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java Mon Sep 19 15:13:18 2011
@@ -49,10 +49,12 @@ public class TestNetworkConnection imple
         _sender = new MockSender();
     }
 
+
+
     public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
             NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException
     {
-        
+
     }
 
     public SocketAddress getLocalAddress()
@@ -68,37 +70,37 @@ public class TestNetworkConnection imple
     public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config,
             SSLContextFactory sslFactory) throws OpenException
     {
-        
+
     }
 
     public void setMaxReadIdle(int idleTime)
     {
-        
+
     }
 
     public void setMaxWriteIdle(int idleTime)
     {
-        
+
     }
 
     public void close()
     {
-           
+
     }
 
     public void flush()
     {
-        
+
     }
 
     public void send(ByteBuffer msg)
     {
-        
+
     }
 
     public void setIdleTimeout(int i)
     {
-        
+
     }
 
     public void setPort(int port)
@@ -135,4 +137,8 @@ public class TestNetworkConnection imple
     {
         return _sender;
     }
+
+    public void start()
+    {
+    }
 }

Copied: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (from r1156187, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java?p2=qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java&r1=1156187&r2=1172657&rev=1172657&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java Mon Sep 19 15:13:18 2011
@@ -23,16 +23,16 @@ package org.apache.qpid.transport.networ
 
 import java.nio.ByteBuffer;
 
+import javax.net.ssl.SSLContext;
+
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.io.IoNetworkTransport;
-import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
 
 public class TransportTest extends QpidTestCase
 {
@@ -43,7 +43,7 @@ public class TransportTest extends QpidT
     {
         final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
         assertNotNull(networkTransport);
-        assertTrue(networkTransport instanceof MinaNetworkTransport);
+        assertTrue(networkTransport instanceof IoNetworkTransport);
     }
 
     public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception
@@ -75,7 +75,7 @@ public class TransportTest extends QpidT
     {
         final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
         assertNotNull(networkTransport);
-        assertTrue(networkTransport instanceof MinaNetworkTransport);
+        assertTrue(networkTransport instanceof IoNetworkTransport);
     }
 
     public void testOverriddenGetIncomingTransport() throws Exception
@@ -129,7 +129,7 @@ public class TransportTest extends QpidT
         }
 
         public NetworkConnection connect(ConnectionSettings settings,
-                Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+                Receiver<ByteBuffer> delegate, SSLContext sslContext)
         {
             throw new UnsupportedOperationException();
         }
@@ -149,7 +149,7 @@ public class TransportTest extends QpidT
         }
 
         public void accept(NetworkTransportConfiguration config,
-                ProtocolEngineFactory factory, SSLContextFactory sslFactory)
+                ProtocolEngineFactory factory, SSLContext sslContext)
         {
             throw new UnsupportedOperationException();
         }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java Mon Sep 19 15:13:18 2011
@@ -80,7 +80,7 @@ public class IoAcceptor<E> extends Threa
             try
             {
                 Socket sock = socket.accept();
-                IoTransport<E> transport = new IoTransport<E>(sock, binding,false);
+                IoTransport<E> transport = new IoTransport<E>(sock, binding);
             }
             catch (IOException e)
             {

Copied: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (from r1156187, qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java?p2=qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java&p1=qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java&r1=1156187&r2=1172657&rev=1172657&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java Mon Sep 19 15:13:18 2011
@@ -22,15 +22,8 @@ package org.apache.qpid.transport.networ
 import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.qpid.ssl.SSLContextFactory;
 import org.apache.qpid.transport.Binding;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
-import org.apache.qpid.transport.network.security.ssl.SSLSender;
 import org.apache.qpid.transport.util.Logger;
 
 /**
@@ -45,13 +38,6 @@ import org.apache.qpid.transport.util.Lo
 public final class IoTransport<E>
 {
 
-    static
-    {
-        org.apache.mina.common.ByteBuffer.setAllocator
-            (new org.apache.mina.common.SimpleByteBufferAllocator());
-        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
-            (Boolean.getBoolean("amqj.enableDirectBuffers"));
-    }
 
     private static final Logger log = Logger.get(IoTransport.class);
 
@@ -67,18 +53,10 @@ public final class IoTransport<E>
     private IoReceiver receiver;
     private long timeout = 60000;
 
-    IoTransport(Socket socket, Binding<E,ByteBuffer> binding, boolean ssl)
+    IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
     {
         this.socket = socket;
-
-        if (ssl)
-        {
-            setupSSLTransport(socket, binding);
-        }
-        else
-        {
-            setupTransport(socket, binding);
-        }
+        setupTransport(socket, binding);
     }
 
     private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding)
@@ -95,40 +73,6 @@ public final class IoTransport<E>
         ios.registerCloseListener(this.receiver);
     }
 
-    private void setupSSLTransport(Socket socket, Binding<E, ByteBuffer> binding)
-    {
-        SSLEngine engine = null;
-        SSLContext sslCtx;
-        try
-        {
-            sslCtx = createSSLContext();
-        }
-        catch (Exception e)
-        {
-            throw new TransportException("Error creating SSL Context", e);
-        }
-
-        try
-        {
-            engine = sslCtx.createSSLEngine();
-            engine.setUseClientMode(true);
-        }
-        catch(Exception e)
-        {
-            throw new TransportException("Error creating SSL Engine", e);
-        }
-        IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout);
-        ios.initiate();
-        this.sender = new SSLSender(engine,ios);
-        this.endpoint = binding.endpoint(sender);
-        this.receiver = new IoReceiver(socket, new SSLReceiver(engine,binding.receiver(endpoint),(SSLSender)sender),
-                2*readBufferSize, timeout);
-        this.receiver.initiate();
-        ios.registerCloseListener(this.receiver);
-
-        log.info("SSL Sender and Receiver initiated");
-    }
-
     public Sender<ByteBuffer> getSender()
     {
         return sender;
@@ -144,22 +88,4 @@ public final class IoTransport<E>
         return socket;
     }
 
-    private SSLContext createSSLContext() throws Exception
-    {
-        String trustStorePath = System.getProperty("javax.net.ssl.trustStore");
-        String trustStorePassword = System.getProperty("javax.net.ssl.trustStorePassword");
-        String trustStoreCertType = System.getProperty("qpid.ssl.trustStoreCertType","SunX509");
-                
-        String keyStorePath = System.getProperty("javax.net.ssl.keyStore",trustStorePath);
-        String keyStorePassword = System.getProperty("javax.net.ssl.keyStorePassword",trustStorePassword);
-        String keyStoreCertType = System.getProperty("qpid.ssl.keyStoreCertType","SunX509");
-        
-        SSLContextFactory sslContextFactory = new SSLContextFactory(trustStorePath,trustStorePassword,
-                                                                    trustStoreCertType,keyStorePath,
-                                                                    keyStorePassword,keyStoreCertType);
-        
-        return sslContextFactory.buildServerContext();
-        
-    }
-
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java Mon Sep 19 15:13:18 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
@@ -333,7 +334,7 @@ public class MinaNetworkHandlerTest exte
         }
     }
     
-    public class CountingProtocolEngine implements ProtocolEngine
+    public class CountingProtocolEngine implements ServerProtocolEngine
     {
         public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
         private int _readBytes;
@@ -447,6 +448,11 @@ public class MinaNetworkHandlerTest exte
             return _closed;
         }
 
+        public long getConnectionId()
+        {
+            return -1;
+        }
+
     }
 
     private class EchoProtocolEngine extends CountingProtocolEngine



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message