qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1643302 - in /qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apach...
Date Fri, 05 Dec 2014 14:48:00 GMT
Author: kwall
Date: Fri Dec  5 14:48:00 2014
New Revision: 1643302

URL: http://svn.apache.org/r1643302
Log:
QPID-6262: Rob's prototype NIO work

Added:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
      - copied, changed from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
      - copied, changed from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
      - copied, changed from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1643302&r1=1643301&r2=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Dec  5 14:48:00 2014
@@ -44,7 +44,6 @@ import org.apache.qpid.server.plugin.Pro
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
 import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java (from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java&r1=1643301&r2=1643302&rev=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java Fri Dec  5 14:48:00 2014
@@ -17,7 +17,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.network.security.ssl;
+package org.apache.qpid.server.protocol;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,31 +29,30 @@ import javax.net.ssl.SSLException;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 import org.apache.qpid.transport.util.Logger;
 
 public class SSLBufferingSender implements Sender<ByteBuffer>
 {
-    private static final Logger log = Logger.get(SSLBufferingSender.class);
+    private static final Logger LOGGER = Logger.get(SSLBufferingSender.class);
     private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
 
-    private final Sender<ByteBuffer> delegate;
-    private final SSLEngine engine;
-    private final int sslBufSize;
-    private final ByteBuffer netData;
+    private final Sender<ByteBuffer> _delegate;
+    private final SSLEngine _engine;
+    private final int _sslBufSize;
     private final SSLStatus _sslStatus;
 
     private String _hostname;
 
-    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
     private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
 
 
     public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
     {
-        this.engine = engine;
-        this.delegate = delegate;
-        sslBufSize = engine.getSession().getPacketBufferSize();
-        netData = ByteBuffer.allocate(sslBufSize);
+        _engine = engine;
+        _delegate = delegate;
+        _sslBufSize = engine.getSession().getPacketBufferSize();
         _sslStatus = sslStatus;
     }
 
@@ -64,15 +63,15 @@ public class SSLBufferingSender implemen
 
     public void close()
     {
-        if (!closed.getAndSet(true))
+        if (!_closed.getAndSet(true))
         {
-            if (engine.isOutboundDone())
+            if (_engine.isOutboundDone())
             {
                 return;
             }
-            log.debug("Closing SSL connection");
+            LOGGER.debug("Closing SSL connection");
             doSend();
-            engine.closeOutbound();
+            _engine.closeOutbound();
             try
             {
                 tearDownSSLConnection();
@@ -85,7 +84,7 @@ public class SSLBufferingSender implemen
 
             synchronized(_sslStatus.getSslLock())
             {
-                while (!engine.isOutboundDone())
+                while (!_engine.isOutboundDone())
                 {
                     try
                     {
@@ -98,13 +97,14 @@ public class SSLBufferingSender implemen
 
                 }
             }
-            delegate.close();
+            _delegate.close();
         }
     }
 
     private void tearDownSSLConnection() throws Exception
     {
-        SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
+        ByteBuffer netData = getNetDataBuffer();
+        SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData);
         Status status = result.getStatus();
         int read   = result.bytesProduced();
         while (status != Status.CLOSED)
@@ -124,23 +124,28 @@ public class SSLBufferingSender implemen
                 netData.limit(limit);
                 netData.position(netData.position() + read);
 
-                delegate.send(data);
+                _delegate.send(data);
                 flush();
             }
-            result = engine.wrap(ByteBuffer.allocate(0), netData);
+            result = _engine.wrap(ByteBuffer.allocate(0), netData);
             status = result.getStatus();
             read   = result.bytesProduced();
         }
     }
 
+    private ByteBuffer getNetDataBuffer()
+    {
+        return ByteBuffer.allocate(_sslBufSize);
+    }
+
     public void flush()
     {
-        delegate.flush();
+        _delegate.flush();
     }
 
     public void send()
     {
-        if(!closed.get())
+        if(!_closed.get())
         {
             doSend();
         }
@@ -157,7 +162,7 @@ public class SSLBufferingSender implemen
             newBuf.flip();
             _appData = newBuf;
         }
-        if (closed.get())
+        if (_closed.get())
         {
             throw new SenderException("SSL Sender is closed");
         }
@@ -180,13 +185,15 @@ public class SSLBufferingSender implemen
         HandshakeStatus handshakeStatus;
         Status status;
 
-        while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
+        while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
               && !_sslStatus.getSslErrorFlag())
         {
+            ByteBuffer netData = getNetDataBuffer();
+
             int read = 0;
             try
             {
-                SSLEngineResult result = engine.wrap(_appData, netData);
+                SSLEngineResult result = _engine.wrap(_appData, netData);
                 read   = result.bytesProduced();
                 status = result.getStatus();
                 handshakeStatus = result.getHandshakeStatus();
@@ -208,7 +215,7 @@ public class SSLBufferingSender implemen
                 netData.limit(limit);
                 netData.position(netData.position() + read);
 
-                delegate.send(data);
+                _delegate.send(data);
             }
 
             switch(status)
@@ -246,7 +253,7 @@ public class SSLBufferingSender implemen
                 case FINISHED:
                     if (_hostname != null)
                     {
-                        SSLUtil.verifyHostname(engine, _hostname);
+                        SSLUtil.verifyHostname(_engine, _hostname);
                     }
 
                 case NOT_HANDSHAKING:
@@ -262,13 +269,13 @@ public class SSLBufferingSender implemen
     private void doTasks()
     {
         Runnable runnable;
-        while ((runnable = engine.getDelegatedTask()) != null) {
+        while ((runnable = _engine.getDelegatedTask()) != null) {
             runnable.run();
         }
     }
 
     public void setIdleTimeout(int i)
     {
-        delegate.setIdleTimeout(i);
+        _delegate.setIdleTimeout(i);
     }
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java?rev=1643302&r1=1643301&r2=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java Fri Dec  5 14:48:00 2014
@@ -28,13 +28,13 @@ import java.util.Set;
 import javax.net.ssl.SSLContext;
 
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
 
 class TCPandSSLTransport implements AcceptingTransport
 {
@@ -78,10 +78,10 @@ class TCPandSSLTransport implements Acce
         }
 
         final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
-        _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+        _networkTransport = new NonBlockingNetworkTransport();
         final MultiVersionProtocolEngineFactory protocolEngineFactory =
                 new MultiVersionProtocolEngineFactory(
-                _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
+                _port.getParent(Broker.class), _sslContext,
                 settings.wantClientAuth(), settings.needClientAuth(),
                 _supported,
                 _defaultSupportedProtocolReply,

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1643302&r1=1643301&r2=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Fri Dec  5 14:48:00 2014
@@ -111,7 +111,10 @@ public class ProtocolEngine_0_10  extend
             public void send(ByteBuffer msg)
             {
                 _lastWriteTime = System.currentTimeMillis();
-                sender.send(msg);
+                ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
+                copy.put(msg);
+                copy.flip();
+                sender.send(copy);
 
             }
 

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java (from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java&r1=1643301&r2=1643302&rev=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java Fri Dec  5 14:48:00 2014
@@ -1,5 +1,5 @@
 /*
-*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -47,17 +47,15 @@ import org.apache.qpid.transport.network
 import org.apache.qpid.transport.network.TransportActivity;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                               CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
                                                                   CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
     private Socket _socket;
-    private IoNetworkConnection _connection;
+    private NetworkConnection _connection;
     private AcceptingThread _acceptor;
 
     public NetworkConnection connect(ConnectionSettings settings,
@@ -98,7 +96,7 @@ public class IoNetworkTransport implemen
         try
         {
             IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+            _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
             ticker.setConnection(_connection);
             _connection.start();
         }
@@ -143,7 +141,6 @@ public class IoNetworkTransport implemen
         try
         {
             _acceptor = new AcceptingThread(config, factory, sslContext);
-            _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
             _acceptor.setDaemon(false);
             _acceptor.start();
         }
@@ -158,6 +155,13 @@ public class IoNetworkTransport implemen
         return _acceptor == null ? -1 : _acceptor.getPort();
     }
 
+    protected abstract NetworkConnection createNetworkConnection(Socket socket,
+                                                                 Receiver<ByteBuffer> engine,
+                                                                 Integer sendBufferSize,
+                                                                 Integer receiveBufferSize,
+                                                                 int timeout,
+                                                                 IdleTimeoutTicker ticker);
+
     private class AcceptingThread extends Thread
     {
         private volatile boolean _closed = false;
@@ -260,8 +264,13 @@ public class IoNetworkTransport implemen
 
 
                             final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
                             NetworkConnection connection =
-                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+                                    createNetworkConnection(socket,
+                                                            engine,
+                                                            sendBufferSize,
+                                                            receiveBufferSize,
+                                                            _timeout,
                                                             ticker);
 
                             connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
@@ -306,7 +315,8 @@ public class IoNetworkTransport implemen
             {
                 if(LOGGER.isDebugEnabled())
                 {
-                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
+                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+                                 + _config.getAddress());
                 }
             }
         }
@@ -327,5 +337,4 @@ public class IoNetworkTransport implemen
         }
 
     }
-
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1643302&r1=1643301&r2=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java Fri Dec  5 14:48:00 2014
@@ -20,312 +20,25 @@
  */
 package org.apache.qpid.transport.network.io;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketException;
 import java.nio.ByteBuffer;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
-    private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                  CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
-    private Socket _socket;
-    private IoNetworkConnection _connection;
-    private AcceptingThread _acceptor;
-
-    public NetworkConnection connect(ConnectionSettings settings,
-                                     Receiver<ByteBuffer> delegate,
-                                     TransportActivity transportActivity)
-    {
-        int sendBufferSize = settings.getWriteBufferSize();
-        int receiveBufferSize = settings.getReadBufferSize();
-
-        try
-        {
-            _socket = new Socket();
-            _socket.setReuseAddress(true);
-            _socket.setTcpNoDelay(settings.isTcpNodelay());
-            _socket.setSendBufferSize(sendBufferSize);
-            _socket.setReceiveBufferSize(receiveBufferSize);
 
-            if(LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
-                LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
-                LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
-            }
-
-            InetAddress address = InetAddress.getByName(settings.getHost());
-
-            _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
-        }
-        catch (SocketException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
-        }
-
-        try
-        {
-            IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
-            ticker.setConnection(_connection);
-            _connection.start();
-        }
-        catch(Exception e)
-        {
-            try
-            {
-                _socket.close();
-            }
-            catch(IOException ioe)
-            {
-                //ignored, throw based on original exception
-            }
-
-            throw new TransportException("Error creating network connection", e);
-        }
-
-        return _connection;
-    }
-
-    public void close()
-    {
-        if(_connection != null)
-        {
-            _connection.close();
-        }
-        if(_acceptor != null)
-        {
-            _acceptor.close();
-        }
-    }
-
-    public NetworkConnection getConnection()
-    {
-        return _connection;
-    }
-
-    public void accept(NetworkTransportConfiguration config,
-                       ProtocolEngineFactory factory,
-                       SSLContext sslContext)
-    {
-        try
-        {
-            _acceptor = new AcceptingThread(config, factory, sslContext);
-            _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
-            _acceptor.setDaemon(false);
-            _acceptor.start();
-        }
-        catch (IOException e)
-        {
-            throw new TransportException("Failed to start AMQP on port : " + config, e);
-        }
-    }
-
-    public int getAcceptingPort()
-    {
-        return _acceptor == null ? -1 : _acceptor.getPort();
-    }
 
-    private class AcceptingThread extends Thread
+    @Override
+    protected IoNetworkConnection createNetworkConnection(final Socket socket,
+                                                       final Receiver<ByteBuffer> engine,
+                                                       final Integer sendBufferSize,
+                                                       final Integer receiveBufferSize,
+                                                       final int timeout,
+                                                       final IdleTimeoutTicker ticker)
     {
-        private volatile boolean _closed = false;
-        private NetworkTransportConfiguration _config;
-        private ProtocolEngineFactory _factory;
-        private SSLContext _sslContext;
-        private ServerSocket _serverSocket;
-        private int _timeout;
-
-        private AcceptingThread(NetworkTransportConfiguration config,
-                                ProtocolEngineFactory factory,
-                                SSLContext sslContext) throws IOException
-        {
-            _config = config;
-            _factory = factory;
-            _sslContext = sslContext;
-            _timeout = TIMEOUT;
-
-            InetSocketAddress address = config.getAddress();
-
-            if(sslContext == null)
-            {
-                _serverSocket = new ServerSocket();
-            }
-            else
-            {
-                SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
-                _serverSocket = socketFactory.createServerSocket();
-
-                SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
-                SSLUtil.removeSSLv3Support(sslServerSocket);
-
-                if(config.needClientAuth())
-                {
-                    sslServerSocket.setNeedClientAuth(true);
-                }
-                else if(config.wantClientAuth())
-                {
-                    sslServerSocket.setWantClientAuth(true);
-                }
-
-            }
-
-            _serverSocket.setReuseAddress(true);
-            _serverSocket.bind(address);
-        }
-
-
-        /**
-            Close the underlying ServerSocket if it has not already been closed.
-         */
-        public void close()
-        {
-            LOGGER.debug("Shutting down the Acceptor");
-            _closed = true;
-
-            if (!_serverSocket.isClosed())
-            {
-                try
-                {
-                    _serverSocket.close();
-                }
-                catch (IOException e)
-                {
-                    throw new TransportException(e);
-                }
-            }
-        }
-
-        private int getPort()
-        {
-            return _serverSocket.getLocalPort();
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                while (!_closed)
-                {
-                    Socket socket = null;
-                    try
-                    {
-                        socket = _serverSocket.accept();
-
-                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
-                        if(engine != null)
-                        {
-                            socket.setTcpNoDelay(_config.getTcpNoDelay());
-                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
-                            final Integer sendBufferSize = _config.getSendBufferSize();
-                            final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
-                            socket.setSendBufferSize(sendBufferSize);
-                            socket.setReceiveBufferSize(receiveBufferSize);
-
-
-                            final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-                            NetworkConnection connection =
-                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
-                                                            ticker);
-
-                            connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
-                            ticker.setConnection(connection);
-
-                            engine.setNetworkConnection(connection, connection.getSender());
-
-                            connection.start();
-                        }
-                        else
-                        {
-                            socket.close();
-                        }
-                    }
-                    catch(RuntimeException e)
-                    {
-                        LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                        closeSocketIfNecessary(socket);
-                    }
-                    catch(IOException e)
-                    {
-                        if(!_closed)
-                        {
-                            LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                            closeSocketIfNecessary(socket);
-                            try
-                            {
-                                //Delay to avoid tight spinning the loop during issues such as too many open files
-                                Thread.sleep(1000);
-                            }
-                            catch (InterruptedException ie)
-                            {
-                                LOGGER.debug("Stopping acceptor due to interrupt request");
-                                _closed = true;
-                            }
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                if(LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
-                }
-            }
-        }
-
-        private void closeSocketIfNecessary(final Socket socket)
-        {
-            if(socket != null)
-            {
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    LOGGER.debug("Exception while closing socket", e);
-                }
-            }
-        }
-
+        return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+                                ticker);
     }
 
 }

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java?rev=1643302&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java Fri Dec  5 14:48:00 2014
@@ -0,0 +1,147 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSocket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+
+public class NonBlockingConnection implements NetworkConnection
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+    private final SocketChannel _socket;
+    private final long _timeout;
+    private final NonBlockingSender _ioSender;
+    private final NonBlockingReceiver _ioReceiver;
+    private int _maxReadIdle;
+    private int _maxWriteIdle;
+    private Principal _principal;
+    private boolean _principalChecked;
+    private final Object _lock = new Object();
+
+    public NonBlockingConnection(SocketChannel socket, Receiver<ByteBuffer> delegate,
+                                 int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
+    {
+        _socket = socket;
+        _timeout = timeout;
+
+        _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
+        _ioReceiver.setTicker(ticker);
+
+        _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
+
+        _ioSender.setReceiver(_ioReceiver);
+
+    }
+
+    public void start()
+    {
+        _ioSender.initiate();
+        _ioReceiver.initiate();
+    }
+
+    public Sender<ByteBuffer> getSender()
+    {
+        return _ioSender;
+    }
+
+    public void close()
+    {
+        try
+        {
+            _ioSender.close();
+        }
+        finally
+        {
+            _ioReceiver.close(false);
+        }
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return _socket.socket().getRemoteSocketAddress();
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return _socket.socket().getLocalSocketAddress();
+    }
+
+    public void setMaxWriteIdle(int sec)
+    {
+        _maxWriteIdle = sec;
+    }
+
+    public void setMaxReadIdle(int sec)
+    {
+        _maxReadIdle = sec;
+    }
+
+    @Override
+    public Principal getPeerPrincipal()
+    {
+        synchronized (_lock)
+        {
+            if(!_principalChecked)
+            {
+                if(_socket.socket() instanceof SSLSocket)
+                {
+                    try
+                    {
+                        _principal = ((SSLSocket) _socket.socket()).getSession().getPeerPrincipal();
+                    }
+                    catch(SSLPeerUnverifiedException e)
+                    {
+                        _principal = null;
+                    }
+                }
+
+                _principalChecked = true;
+            }
+
+            return _principal;
+        }
+    }
+
+    @Override
+    public int getMaxReadIdle()
+    {
+        return _maxReadIdle;
+    }
+
+    @Override
+    public int getMaxWriteIdle()
+    {
+        return _maxWriteIdle;
+    }
+}

Copied: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java (from r1643301, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java)
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java?p2=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java&p1=qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java&r1=1643301&r2=1643302&rev=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java Fri Dec  5 14:48:00 2014
@@ -1,5 +1,5 @@
 /*
-*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,14 +23,13 @@ package org.apache.qpid.transport.networ
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketException;
+import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
 
 import org.slf4j.LoggerFactory;
 
@@ -43,23 +42,21 @@ import org.apache.qpid.transport.Receive
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.IncomingNetworkTransport;
 import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
 import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
 
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class NonBlockingNetworkTransport implements IncomingNetworkTransport
 {
-    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
+
+    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
     private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
-                                                              CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+                                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
     private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                  CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
-    private Socket _socket;
-    private IoNetworkConnection _connection;
+                                                                   CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private AcceptingThread _acceptor;
 
+    private SocketChannel _socketChannel;
+    private NonBlockingConnection _connection;
+
     public NetworkConnection connect(ConnectionSettings settings,
                                      Receiver<ByteBuffer> delegate,
                                      TransportActivity transportActivity)
@@ -69,26 +66,23 @@ public class IoNetworkTransport implemen
 
         try
         {
-            _socket = new Socket();
-            _socket.setReuseAddress(true);
-            _socket.setTcpNoDelay(settings.isTcpNodelay());
-            _socket.setSendBufferSize(sendBufferSize);
-            _socket.setReceiveBufferSize(receiveBufferSize);
+            _socketChannel = SocketChannel.open();
+            _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+            _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay());
+            _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+            _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
 
             if(LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
-                LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
-                LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+                LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF));
+                LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF));
+                LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY));
             }
 
             InetAddress address = InetAddress.getByName(settings.getHost());
 
-            _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
-        }
-        catch (SocketException e)
-        {
-            throw new TransportException("Error connecting to broker", e);
+            _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()),
+                                            settings.getConnectTimeout());
         }
         catch (IOException e)
         {
@@ -98,7 +92,8 @@ public class IoNetworkTransport implemen
         try
         {
             IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
-            _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+            _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
+                                                  TIMEOUT, ticker);
             ticker.setConnection(_connection);
             _connection.start();
         }
@@ -106,7 +101,7 @@ public class IoNetworkTransport implemen
         {
             try
             {
-                _socket.close();
+                _socketChannel.close();
             }
             catch(IOException ioe)
             {
@@ -119,6 +114,17 @@ public class IoNetworkTransport implemen
         return _connection;
     }
 
+
+    protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
+                                                          final Receiver<ByteBuffer> engine,
+                                                          final Integer sendBufferSize,
+                                                          final Integer receiveBufferSize,
+                                                          final int timeout,
+                                                          final IdleTimeoutTicker ticker)
+    {
+        return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker);
+    }
+
     public void close()
     {
         if(_connection != null)
@@ -143,7 +149,6 @@ public class IoNetworkTransport implemen
         try
         {
             _acceptor = new AcceptingThread(config, factory, sslContext);
-            _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
             _acceptor.setDaemon(false);
             _acceptor.start();
         }
@@ -164,7 +169,7 @@ public class IoNetworkTransport implemen
         private NetworkTransportConfiguration _config;
         private ProtocolEngineFactory _factory;
         private SSLContext _sslContext;
-        private ServerSocket _serverSocket;
+        private ServerSocketChannel _serverSocket;
         private int _timeout;
 
         private AcceptingThread(NetworkTransportConfiguration config,
@@ -178,44 +183,22 @@ public class IoNetworkTransport implemen
 
             InetSocketAddress address = config.getAddress();
 
-            if(sslContext == null)
-            {
-                _serverSocket = new ServerSocket();
-            }
-            else
-            {
-                SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
-                _serverSocket = socketFactory.createServerSocket();
-
-                SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
-                SSLUtil.removeSSLv3Support(sslServerSocket);
+            _serverSocket =  ServerSocketChannel.open();
 
-                if(config.needClientAuth())
-                {
-                    sslServerSocket.setNeedClientAuth(true);
-                }
-                else if(config.wantClientAuth())
-                {
-                    sslServerSocket.setWantClientAuth(true);
-                }
-
-            }
-
-            _serverSocket.setReuseAddress(true);
+            _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
             _serverSocket.bind(address);
         }
 
 
         /**
-            Close the underlying ServerSocket if it has not already been closed.
+         Close the underlying ServerSocket if it has not already been closed.
          */
         public void close()
         {
             LOGGER.debug("Shutting down the Acceptor");
             _closed = true;
 
-            if (!_serverSocket.isClosed())
+            if (!_serverSocket.socket().isClosed())
             {
                 try
                 {
@@ -230,7 +213,7 @@ public class IoNetworkTransport implemen
 
         private int getPort()
         {
-            return _serverSocket.getLocalPort();
+            return _serverSocket.socket().getLocalPort();
         }
 
         @Override
@@ -240,28 +223,33 @@ public class IoNetworkTransport implemen
             {
                 while (!_closed)
                 {
-                    Socket socket = null;
+                    SocketChannel socket = null;
                     try
                     {
                         socket = _serverSocket.accept();
 
-                        ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
+                        ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
 
                         if(engine != null)
                         {
-                            socket.setTcpNoDelay(_config.getTcpNoDelay());
-                            socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+                            socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+                            socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
 
                             final Integer sendBufferSize = _config.getSendBufferSize();
                             final Integer receiveBufferSize = _config.getReceiveBufferSize();
 
-                            socket.setSendBufferSize(sendBufferSize);
-                            socket.setReceiveBufferSize(receiveBufferSize);
+                            socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+                            socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
 
 
                             final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
                             NetworkConnection connection =
-                                    new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
+                                    createNetworkConnection(socket,
+                                                            engine,
+                                                            sendBufferSize,
+                                                            receiveBufferSize,
+                                                            _timeout,
                                                             ticker);
 
                             connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
@@ -280,14 +268,14 @@ public class IoNetworkTransport implemen
                     catch(RuntimeException e)
                     {
                         LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                        closeSocketIfNecessary(socket);
+                        closeSocketIfNecessary(socket.socket());
                     }
                     catch(IOException e)
                     {
                         if(!_closed)
                         {
                             LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
-                            closeSocketIfNecessary(socket);
+                            closeSocketIfNecessary(socket.socket());
                             try
                             {
                                 //Delay to avoid tight spinning the loop during issues such as too many open files
@@ -306,7 +294,8 @@ public class IoNetworkTransport implemen
             {
                 if(LOGGER.isDebugEnabled())
                 {
-                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
+                    LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+                                 + _config.getAddress());
                 }
             }
         }
@@ -327,5 +316,4 @@ public class IoNetworkTransport implemen
         }
 
     }
-
 }

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java?rev=1643302&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java Fri Dec  5 14:48:00 2014
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLSocket;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.util.SystemUtils;
+
+/**
+ * IoReceiver
+ *
+ */
+
+final class NonBlockingReceiver implements Runnable
+{
+
+    private static final Logger log = Logger.get(NonBlockingReceiver.class);
+
+    private final Receiver<ByteBuffer> receiver;
+    private final int bufferSize;
+    private final SocketChannel socket;
+    private final long timeout;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Thread receiverThread;
+    private static final boolean shutdownBroken;
+
+    private Ticker _ticker;
+    static
+    {
+        shutdownBroken = SystemUtils.isWindows();
+    }
+
+    public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+    {
+        this.receiver = receiver;
+        this.bufferSize = bufferSize;
+        this.socket = socket;
+        this.timeout = timeout;
+
+        try
+        {
+            //Create but deliberately don't start the thread.
+            receiverThread = Threading.getThreadFactory().createThread(this);
+        }
+        catch(Exception e)
+        {
+            throw new RuntimeException("Error creating IOReceiver thread",e);
+        }
+        receiverThread.setDaemon(true);
+        receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress()));
+    }
+
+    public void initiate()
+    {
+        receiverThread.start();
+    }
+
+    public void close()
+    {
+        close(false);
+    }
+
+    void close(boolean block)
+    {
+        if (!closed.getAndSet(true))
+        {
+            try
+            {
+                try
+                {
+                    if (shutdownBroken || socket.socket() instanceof SSLSocket)
+                    {
+                       socket.close();
+                    }
+                    else
+                    {
+                        socket.shutdownInput();
+                    }
+                }
+                catch(SocketException se)
+                {
+                    if(!socket.socket().isClosed() && !socket.socket().isInputShutdown())
+                    {
+                        throw se;
+                    }
+                }
+                if (block && Thread.currentThread() != receiverThread)
+                {
+                    receiverThread.join(timeout);
+                    if (receiverThread.isAlive())
+                    {
+                        throw new TransportException("join timed out");
+                    }
+                }
+            }
+            catch (InterruptedException e)
+            {
+                throw new TransportException(e);
+            }
+            catch (IOException e)
+            {
+                throw new TransportException(e);
+            }
+
+        }
+    }
+
+    public void run()
+    {
+        final int threshold = bufferSize / 2;
+
+        // I set the read buffer size similar to SO_RCVBUF
+        // Haven't tested with a lower value to see if it's better or worse
+        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+        try
+        {
+            int read = 0;
+            long currentTime;
+            while(read != -1)
+            {
+                try
+                {
+                    while ((read = socket.read(buffer)) != -1)
+                    {
+                        if (read > 0)
+                        {
+                            ByteBuffer b = buffer.duplicate();
+                            b.flip();
+                            receiver.received(b);
+
+                            if (buffer.remaining() < threshold)
+                            {
+                                buffer = ByteBuffer.allocate(bufferSize);
+                            }
+                            else
+                            {
+                                buffer = buffer.slice();
+                            }
+                        }
+                        currentTime = System.currentTimeMillis();
+
+                        if(_ticker != null)
+                        {
+                            int tick = _ticker.getTimeToNextTick(currentTime);
+                            if(tick <= 0)
+                            {
+                                tick = _ticker.tick(currentTime);
+                            }
+                            try
+                            {
+                                if(!socket.socket().isClosed())
+                                {
+                                    socket.socket().setSoTimeout(tick <= 0 ? 1 : tick);
+                                }
+                            }
+                            catch(SocketException e)
+                            {
+                                // ignore - closed socket
+                            }
+                        }
+                    }
+                }
+                catch (SocketTimeoutException e)
+                {
+                    currentTime = System.currentTimeMillis();
+                    if(_ticker != null)
+                    {
+                        final int tick = _ticker.tick(currentTime);
+                        if(!socket.socket().isClosed())
+                        {
+                            try
+                            {
+                                socket.socket().setSoTimeout(tick <= 0 ? 1 : tick );
+                            }
+                            catch(SocketException ex)
+                            {
+                                // ignore - closed socket
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        catch (Exception t)
+        {
+            if (shouldReport(t))
+            {
+                receiver.exception(t);
+            }
+        }
+        finally
+        {
+            receiver.closed();
+            try
+            {
+                socket.close();
+            }
+            catch(Exception e)
+            {
+                log.warn(e, "Error closing socket");
+            }
+        }
+    }
+
+    private boolean shouldReport(Throwable t)
+    {
+        boolean brokenClose = closed.get() &&
+                              shutdownBroken &&
+                              t instanceof SocketException &&
+                              "socket closed".equalsIgnoreCase(t.getMessage());
+
+        boolean sslSocketClosed = closed.get() &&
+                                  socket.socket() instanceof SSLSocket &&
+                                  t instanceof SocketException &&
+                                  "Socket is closed".equalsIgnoreCase(t.getMessage());
+
+        boolean recvFailed = closed.get() &&
+                             shutdownBroken &&
+                             t instanceof SocketException &&
+                             "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage());
+
+        return !brokenClose && !sslSocketClosed && !recvFailed;
+    }
+
+    public Ticker getTicker()
+    {
+        return _ticker;
+    }
+
+    public void setTicker(Ticker ticker)
+    {
+        _ticker = ticker;
+    }
+
+
+}

Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java?rev=1643302&view=auto
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java (added)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java Fri Dec  5 14:48:00 2014
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+
+public final class NonBlockingSender implements Runnable, Sender<ByteBuffer>
+{
+
+    private static final Logger log = Logger.get(NonBlockingSender.class);
+
+    // by starting here, we ensure that we always test the wraparound
+    // case, we should probably make this configurable somehow so that
+    // we can test other cases as well
+    private final static int START = Integer.MAX_VALUE - 10;
+
+    private final long timeout;
+    private final SocketChannel socket;
+
+    private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+    private final Object _isEmpty = new Object();
+
+    private volatile boolean idle = true;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Thread senderThread;
+    private NonBlockingReceiver _receiver;
+    private final String _remoteSocketAddress;
+
+    private volatile Throwable exception = null;
+
+    public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout)
+    {
+        this.socket = socket;
+        this.timeout = timeout;
+        _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString();
+
+
+        try
+        {
+            //Create but deliberately don't start the thread.
+            senderThread = Threading.getThreadFactory().createThread(this);
+        }
+        catch(Exception e)
+        {
+            throw new Error("Error creating IOSender thread",e);
+        }
+
+        senderThread.setDaemon(true);
+        senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+    }
+
+    public void initiate()
+    {
+        senderThread.start();
+    }
+
+
+    public void send(ByteBuffer buf)
+    {
+        checkNotAlreadyClosed();
+
+        if(!senderThread.isAlive())
+        {
+            throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
+        }
+
+
+        _buffers.add(buf);
+        synchronized (_isEmpty)
+        {
+            _isEmpty.notifyAll();
+        }
+
+    }
+
+    public void flush()
+    {
+        if (idle)
+        {
+            synchronized (_isEmpty)
+            {
+                _isEmpty.notify();
+            }
+        }
+    }
+
+    public void close()
+    {
+        close(true, true);
+    }
+
+    private void close(boolean awaitSenderBeforeClose, boolean reportException)
+    {
+        if (!closed.getAndSet(true))
+        {
+
+            synchronized (_isEmpty)
+            {
+                _isEmpty.notify();
+            }
+
+            try
+            {
+                if (awaitSenderBeforeClose)
+                {
+                    awaitSenderThreadShutdown();
+                }
+            }
+            finally
+            {
+                closeReceiver();
+            }
+            if (reportException && exception != null)
+            {
+                throw new SenderException(exception);
+            }
+        }
+    }
+
+    private void closeReceiver()
+    {
+        if(_receiver != null)
+        {
+            try
+            {
+                _receiver.close();
+            }
+            catch(RuntimeException e)
+            {
+                log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress);
+                throw new SenderException(e.getMessage(), e);
+            }
+        }
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+
+            if (closed.get())
+            {
+                break;
+            }
+
+            idle = true;
+
+            synchronized (_isEmpty)
+            {
+                while (_buffers.isEmpty() && !closed.get())
+                {
+                    try
+                    {
+                        _isEmpty.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // pass
+                    }
+                }
+            }
+
+            idle = false;
+
+            ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+            Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+            for(int i = 0; i < bufArray.length; i++)
+            {
+                bufArray[i] = bufferIterator.next();
+            }
+
+            try
+            {
+                socket.write(bufArray);
+                for(ByteBuffer buf : bufArray)
+                {
+                    if(buf.remaining() == 0)
+                    {
+                        _buffers.poll();
+                    }
+                    else
+                    {
+                        break;
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e);
+                exception = e;
+                close(false, false);
+                break;
+            }
+
+        }
+
+    }
+
+    public void setIdleTimeout(int i)
+    {
+        try
+        {
+            socket.socket().setSoTimeout(i);
+        }
+        catch (Exception e)
+        {
+            throw new SenderException(e);
+        }
+    }
+
+    public void setReceiver(NonBlockingReceiver receiver)
+    {
+        _receiver = receiver;
+    }
+
+    private void awaitSenderThreadShutdown()
+    {
+        if (Thread.currentThread() != senderThread)
+        {
+            try
+            {
+                senderThread.join(timeout);
+                if (senderThread.isAlive())
+                {
+                    log.error("join timed out for socket %s to stop", _remoteSocketAddress);
+                    throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
+                }
+            }
+            catch (InterruptedException e)
+            {
+                log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
+                throw new SenderException(e);
+            }
+        }
+    }
+
+    private void checkNotAlreadyClosed()
+    {
+        if (closed.get())
+        {
+            throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
+        }
+    }
+}

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java?rev=1643302&r1=1643301&r2=1643302&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java Fri Dec  5 14:48:00 2014
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.security.ssl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
-    private static final Logger log = Logger.get(SSLBufferingSender.class);
-    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
-    private final Sender<ByteBuffer> delegate;
-    private final SSLEngine engine;
-    private final int sslBufSize;
-    private final ByteBuffer netData;
-    private final SSLStatus _sslStatus;
-
-    private String _hostname;
-
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
-    public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
-    {
-        this.engine = engine;
-        this.delegate = delegate;
-        sslBufSize = engine.getSession().getPacketBufferSize();
-        netData = ByteBuffer.allocate(sslBufSize);
-        _sslStatus = sslStatus;
-    }
-
-    public void setHostname(String hostname)
-    {
-        _hostname = hostname;
-    }
-
-    public void close()
-    {
-        if (!closed.getAndSet(true))
-        {
-            if (engine.isOutboundDone())
-            {
-                return;
-            }
-            log.debug("Closing SSL connection");
-            doSend();
-            engine.closeOutbound();
-            try
-            {
-                tearDownSSLConnection();
-            }
-            catch(Exception e)
-            {
-                throw new SenderException("Error closing SSL connection",e);
-            }
-
-
-            synchronized(_sslStatus.getSslLock())
-            {
-                while (!engine.isOutboundDone())
-                {
-                    try
-                    {
-                        _sslStatus.getSslLock().wait();
-                    }
-                    catch(InterruptedException e)
-                    {
-                        // pass
-                    }
-
-                }
-            }
-            delegate.close();
-        }
-    }
-
-    private void tearDownSSLConnection() throws Exception
-    {
-        SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
-        Status status = result.getStatus();
-        int read   = result.bytesProduced();
-        while (status != Status.CLOSED)
-        {
-            if (status == Status.BUFFER_OVERFLOW)
-            {
-                netData.clear();
-            }
-            if(read > 0)
-            {
-                int limit = netData.limit();
-                netData.limit(netData.position());
-                netData.position(netData.position() - read);
-
-                ByteBuffer data = netData.slice();
-
-                netData.limit(limit);
-                netData.position(netData.position() + read);
-
-                delegate.send(data);
-                flush();
-            }
-            result = engine.wrap(ByteBuffer.allocate(0), netData);
-            status = result.getStatus();
-            read   = result.bytesProduced();
-        }
-    }
-
-    public void flush()
-    {
-        delegate.flush();
-    }
-
-    public void send()
-    {
-        if(!closed.get())
-        {
-            doSend();
-        }
-    }
-
-    public synchronized void send(ByteBuffer appData)
-    {
-        boolean buffered;
-        if(buffered = _appData.hasRemaining())
-        {
-            ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
-            newBuf.put(_appData);
-            newBuf.put(appData);
-            newBuf.flip();
-            _appData = newBuf;
-        }
-        if (closed.get())
-        {
-            throw new SenderException("SSL Sender is closed");
-        }
-        doSend();
-        if(!appData.hasRemaining())
-        {
-            _appData = EMPTY_BYTE_BUFFER;
-        }
-        else if(!buffered)
-        {
-            _appData = ByteBuffer.allocate(appData.remaining());
-            _appData.put(appData);
-            _appData.flip();
-        }
-    }
-
-    private synchronized void doSend()
-    {
-
-        HandshakeStatus handshakeStatus;
-        Status status;
-
-        while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
-              && !_sslStatus.getSslErrorFlag())
-        {
-            int read = 0;
-            try
-            {
-                SSLEngineResult result = engine.wrap(_appData, netData);
-                read   = result.bytesProduced();
-                status = result.getStatus();
-                handshakeStatus = result.getHandshakeStatus();
-            }
-            catch(SSLException e)
-            {
-                // Should this set _sslError??
-                throw new SenderException("SSL, Error occurred while encrypting data",e);
-            }
-
-            if(read > 0)
-            {
-                int limit = netData.limit();
-                netData.limit(netData.position());
-                netData.position(netData.position() - read);
-
-                ByteBuffer data = netData.slice();
-
-                netData.limit(limit);
-                netData.position(netData.position() + read);
-
-                delegate.send(data);
-            }
-
-            switch(status)
-            {
-                case CLOSED:
-                    throw new SenderException("SSLEngine is closed");
-
-                case BUFFER_OVERFLOW:
-                    netData.clear();
-                    continue;
-
-                case OK:
-                    break; // do nothing
-
-                default:
-                    throw new IllegalStateException("SSLReceiver: Invalid State " + status);
-            }
-
-            switch (handshakeStatus)
-            {
-                case NEED_WRAP:
-                    if (netData.hasRemaining())
-                    {
-                        continue;
-                    }
-
-                case NEED_TASK:
-                    doTasks();
-                    break;
-
-                case NEED_UNWRAP:
-                    flush();
-                    return;
-
-                case FINISHED:
-                    if (_hostname != null)
-                    {
-                        SSLUtil.verifyHostname(engine, _hostname);
-                    }
-
-                case NOT_HANDSHAKING:
-                    break; //do  nothing
-
-                default:
-                    throw new IllegalStateException("SSLSender: Invalid State " + status);
-            }
-
-        }
-    }
-
-    private void doTasks()
-    {
-        Runnable runnable;
-        while ((runnable = engine.getDelegatedTask()) != null) {
-            runnable.run();
-        }
-    }
-
-    public void setIdleTimeout(int i)
-    {
-        delegate.setIdleTimeout(i);
-    }
-}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message