cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject [34/48] git commit: updated refs/heads/4.9-bountycastle-daan to 98bf0ca
Date Tue, 24 May 2016 09:49:39 GMT
CLOUDSTACK-9348: NioConnection improvements

- Unit test to demonstrate denial of service attack
  The NioConnection uses blocking handlers for various events such as connect,
  accept, read, write. In case a client connects NioServer (used by
  agent mgr to service agents on port 8250) but fails to participate in SSL
  handshake or just sits idle, this would block the main IO/selector loop in
  NioConnection. Such a client could be either malicious or aggresive.

  This unit test demonstrates such a malicious client that can perform a
  denial-of-service attack on NioServer that blocks it to serve any other client.

- Use non-blocking SSL handshake
  - Uses non-blocking socket config in NioClient and NioServer/NioConnection
  - Scalable connectivity from agents and peer clustered-management server
  - Removes blocking ssl handshake code with a non-blocking code
  - Protects from denial-of-service issues that can degrade mgmt server responsiveness
    due to an aggressive/malicious client
  - Uses separate executor services for handling ssl handshakes

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/9c751869
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/9c751869
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/9c751869

Branch: refs/heads/4.9-bountycastle-daan
Commit: 9c7518698d2f4a9fcc6a83fd22dd5b2fc4260232
Parents: b4ad38d
Author: Rohit Yadav <rohit.yadav@shapeblue.com>
Authored: Fri Apr 15 00:24:53 2016 +0530
Committer: Rohit Yadav <rohit.yadav@shapeblue.com>
Committed: Thu May 19 16:45:16 2016 +0530

----------------------------------------------------------------------
 .../manager/ClusteredAgentManagerImpl.java      |   9 +-
 .../src/main/java/com/cloud/utils/nio/Link.java | 297 ++++++++++++-------
 .../java/com/cloud/utils/nio/NioClient.java     |  31 +-
 .../java/com/cloud/utils/nio/NioConnection.java |  86 +++---
 .../java/com/cloud/utils/nio/NioServer.java     |   6 +-
 .../java/com/cloud/utils/testcase/NioTest.java  | 249 ++++++++++------
 6 files changed, 408 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 75f860d..9239adc 100644
--- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -499,7 +499,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements
Clust
                 SocketChannel ch1 = null;
                 try {
                     ch1 = SocketChannel.open(new InetSocketAddress(addr, Port.value()));
-                    ch1.configureBlocking(true); // make sure we are working at blocking
mode
+                    ch1.configureBlocking(false);
                     ch1.socket().setKeepAlive(true);
                     ch1.socket().setSoTimeout(60 * 1000);
                     try {
@@ -507,8 +507,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements
Clust
                         sslEngine = sslContext.createSSLEngine(ip, Port.value());
                         sslEngine.setUseClientMode(true);
                         sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-
-                        Link.doHandshake(ch1, sslEngine, true);
+                        sslEngine.beginHandshake();
+                        if (!Link.doHandshake(ch1, sslEngine, true)) {
+                            ch1.close();
+                            throw new IOException("SSL handshake failed!");
+                        }
                         s_logger.info("SSL: Handshake done");
                     } catch (final Exception e) {
                         ch1.close();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/Link.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/Link.java b/utils/src/main/java/com/cloud/utils/nio/Link.java
index 6d6306a..02ffaab 100644
--- a/utils/src/main/java/com/cloud/utils/nio/Link.java
+++ b/utils/src/main/java/com/cloud/utils/nio/Link.java
@@ -19,37 +19,33 @@
 
 package com.cloud.utils.nio;
 
+import com.cloud.utils.PropertiesUtil;
+import com.cloud.utils.db.DbProperties;
+import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.cloudstack.utils.security.SSLUtils;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.db.DbProperties;
-
 /**
  */
 public class Link {
@@ -453,115 +449,188 @@ public class Link {
         return sslContext;
     }
 
-    public static void doHandshake(SocketChannel ch, SSLEngine sslEngine, boolean isClient)
throws IOException {
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace("SSL: begin Handshake, isClient: " + isClient);
+    public static ByteBuffer enlargeBuffer(ByteBuffer buffer, final int sessionProposedCapacity)
{
+        if (buffer == null || sessionProposedCapacity < 0) {
+            return buffer;
         }
-
-        SSLEngineResult engResult;
-        SSLSession sslSession = sslEngine.getSession();
-        HandshakeStatus hsStatus;
-        ByteBuffer in_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
-        ByteBuffer in_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize()
+ 40);
-        ByteBuffer out_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
-        ByteBuffer out_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize()
+ 40);
-        int count;
-        ch.socket().setSoTimeout(60 * 1000);
-        InputStream inStream = ch.socket().getInputStream();
-        // Use readCh to make sure the timeout on reading is working
-        ReadableByteChannel readCh = Channels.newChannel(inStream);
-
-        if (isClient) {
-            hsStatus = SSLEngineResult.HandshakeStatus.NEED_WRAP;
+        if (sessionProposedCapacity > buffer.capacity()) {
+            buffer = ByteBuffer.allocate(sessionProposedCapacity);
         } else {
-            hsStatus = SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
+            buffer = ByteBuffer.allocate(buffer.capacity() * 2);
         }
+        return buffer;
+    }
 
-        while (hsStatus != SSLEngineResult.HandshakeStatus.FINISHED) {
-            if (s_logger.isTraceEnabled()) {
-                s_logger.trace("SSL: Handshake status " + hsStatus);
+    public static ByteBuffer handleBufferUnderflow(final SSLEngine engine, ByteBuffer buffer)
{
+        if (engine == null || buffer == null) {
+            return buffer;
+        }
+        if (buffer.position() < buffer.limit()) {
+            return buffer;
+        }
+        ByteBuffer replaceBuffer = enlargeBuffer(buffer, engine.getSession().getPacketBufferSize());
+        buffer.flip();
+        replaceBuffer.put(buffer);
+        return replaceBuffer;
+    }
+
+    private static boolean doHandshakeUnwrap(final SocketChannel socketChannel, final SSLEngine
sslEngine,
+                                             ByteBuffer peerAppData, ByteBuffer peerNetData,
final int appBufferSize) throws IOException {
+        if (socketChannel == null || sslEngine == null || peerAppData == null || peerNetData
== null || appBufferSize < 0) {
+            return false;
+        }
+        if (socketChannel.read(peerNetData) < 0) {
+            if (sslEngine.isInboundDone() && sslEngine.isOutboundDone()) {
+                return false;
             }
-            engResult = null;
-            if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
-                out_pkgBuf.clear();
-                out_appBuf.clear();
-                out_appBuf.put("Hello".getBytes());
-                engResult = sslEngine.wrap(out_appBuf, out_pkgBuf);
-                out_pkgBuf.flip();
-                int remain = out_pkgBuf.limit();
-                while (remain != 0) {
-                    remain -= ch.write(out_pkgBuf);
-                    if (remain < 0) {
-                        throw new IOException("Too much bytes sent?");
-                    }
-                }
-            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
-                in_appBuf.clear();
-                // One packet may contained multiply operation
-                if (in_pkgBuf.position() == 0 || !in_pkgBuf.hasRemaining()) {
-                    in_pkgBuf.clear();
-                    count = 0;
-                    try {
-                        count = readCh.read(in_pkgBuf);
-                    } catch (SocketTimeoutException ex) {
-                        if (s_logger.isTraceEnabled()) {
-                            s_logger.trace("Handshake reading time out! Cut the connection");
-                        }
-                        count = -1;
-                    }
-                    if (count == -1) {
-                        throw new IOException("Connection closed with -1 on reading size.");
-                    }
-                    in_pkgBuf.flip();
+            try {
+                sslEngine.closeInbound();
+            } catch (SSLException e) {
+                s_logger.warn("This SSL engine was forced to close inbound due to end of
stream.");
+            }
+            sslEngine.closeOutbound();
+            // After closeOutbound the engine will be set to WRAP state,
+            // in order to try to send a close message to the client.
+            return true;
+        }
+        peerNetData.flip();
+        SSLEngineResult result = null;
+        try {
+            result = sslEngine.unwrap(peerNetData, peerAppData);
+            peerNetData.compact();
+        } catch (SSLException sslException) {
+            s_logger.error("SSL error occurred while processing unwrap data: " + sslException.getMessage());
+            sslEngine.closeOutbound();
+            return true;
+        }
+        switch (result.getStatus()) {
+            case OK:
+                break;
+            case BUFFER_OVERFLOW:
+                // Will occur when peerAppData's capacity is smaller than the data derived
from peerNetData's unwrap.
+                peerAppData = enlargeBuffer(peerAppData, appBufferSize);
+                break;
+            case BUFFER_UNDERFLOW:
+                // Will occur either when no data was read from the peer or when the peerNetData
buffer
+                // was too small to hold all peer's data.
+                peerNetData = handleBufferUnderflow(sslEngine, peerNetData);
+                break;
+            case CLOSED:
+                if (sslEngine.isOutboundDone()) {
+                    return false;
+                } else {
+                    sslEngine.closeOutbound();
+                    break;
                 }
-                engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
-                ByteBuffer tmp_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize()
+ 40);
-                int loop_count = 0;
-                while (engResult.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW)
{
-                    // The client is too slow? Cut it and let it reconnect
-                    if (loop_count > 10) {
-                        throw new IOException("Too many times in SSL BUFFER_UNDERFLOW, disconnect
guest.");
-                    }
-                    // We need more packets to complete this operation
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace("SSL: Buffer underflowed, getting more packets");
-                    }
-                    tmp_pkgBuf.clear();
-                    count = ch.read(tmp_pkgBuf);
-                    if (count == -1) {
-                        throw new IOException("Connection closed with -1 on reading size.");
-                    }
-                    tmp_pkgBuf.flip();
-
-                    in_pkgBuf.mark();
-                    in_pkgBuf.position(in_pkgBuf.limit());
-                    in_pkgBuf.limit(in_pkgBuf.limit() + tmp_pkgBuf.limit());
-                    in_pkgBuf.put(tmp_pkgBuf);
-                    in_pkgBuf.reset();
+            default:
+                throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+        }
+        return true;
+    }
 
-                    in_appBuf.clear();
-                    engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
-                    loop_count++;
+    private static boolean doHandshakeWrap(final SocketChannel socketChannel, final SSLEngine
sslEngine,
+                                           ByteBuffer myAppData, ByteBuffer myNetData, ByteBuffer
peerNetData,
+                                           final int netBufferSize) throws IOException {
+        if (socketChannel == null || sslEngine == null || myNetData == null || peerNetData
== null
+                || myAppData == null || netBufferSize < 0) {
+            return false;
+        }
+        myNetData.clear();
+        SSLEngineResult result = null;
+        try {
+            result = sslEngine.wrap(myAppData, myNetData);
+        } catch (SSLException sslException) {
+            s_logger.error("SSL error occurred while processing wrap data: " + sslException.getMessage());
+            sslEngine.closeOutbound();
+            return true;
+        }
+        switch (result.getStatus()) {
+            case OK :
+                myNetData.flip();
+                while (myNetData.hasRemaining()) {
+                    socketChannel.write(myNetData);
                 }
-            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_TASK) {
-                Runnable run;
-                while ((run = sslEngine.getDelegatedTask()) != null) {
-                    if (s_logger.isTraceEnabled()) {
-                        s_logger.trace("SSL: Running delegated task!");
+                break;
+            case BUFFER_OVERFLOW:
+                // Will occur if there is not enough space in myNetData buffer to write all
the data
+                // that would be generated by the method wrap. Since myNetData is set to
session's packet
+                // size we should not get to this point because SSLEngine is supposed to
produce messages
+                // smaller or equal to that, but a general handling would be the following:
+                myNetData = enlargeBuffer(myNetData, netBufferSize);
+                break;
+            case BUFFER_UNDERFLOW:
+                throw new SSLException("Buffer underflow occurred after a wrap. We should
not reach here.");
+            case CLOSED:
+                try {
+                    myNetData.flip();
+                    while (myNetData.hasRemaining()) {
+                        socketChannel.write(myNetData);
                     }
-                    run.run();
+                    // At this point the handshake status will probably be NEED_UNWRAP
+                    // so we make sure that peerNetData is clear to read.
+                    peerNetData.clear();
+                } catch (Exception e) {
+                    s_logger.error("Failed to send server's CLOSE message due to socket channel's
failure.");
                 }
-            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
-                throw new IOException("NOT a handshaking!");
+                break;
+            default:
+                throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
+        }
+        return true;
+    }
+
+    public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine
sslEngine, final boolean isClient) throws IOException {
+        if (socketChannel == null || sslEngine == null) {
+            return false;
+        }
+        final int appBufferSize = sslEngine.getSession().getApplicationBufferSize();
+        final int netBufferSize = sslEngine.getSession().getPacketBufferSize();
+        ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
+        ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
+        ByteBuffer myNetData = ByteBuffer.allocate(netBufferSize);
+        ByteBuffer peerNetData = ByteBuffer.allocate(netBufferSize);
+
+        final long startTimeMills = System.currentTimeMillis();
+
+        HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+        while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
+                && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING)
{
+            final long timeTaken = System.currentTimeMillis() - startTimeMills;
+            if (timeTaken > 15000L) {
+                s_logger.warn("SSL Handshake has taken more than 15s to connect to: " + socketChannel.getRemoteAddress()
+
+                        ". Please investigate this connection.");
+                return false;
             }
-            if (engResult != null && engResult.getStatus() != SSLEngineResult.Status.OK)
{
-                throw new IOException("Fail to handshake! " + engResult.getStatus());
+            switch (handshakeStatus) {
+                case NEED_UNWRAP:
+                    if (!doHandshakeUnwrap(socketChannel, sslEngine, peerAppData, peerNetData,
appBufferSize)) {
+                        return false;
+                    }
+                    break;
+                case NEED_WRAP:
+                    if (!doHandshakeWrap(socketChannel, sslEngine,  myAppData, myNetData,
peerNetData, netBufferSize)) {
+                        return false;
+                    }
+                    break;
+                case NEED_TASK:
+                    Runnable task;
+                    while ((task = sslEngine.getDelegatedTask()) != null) {
+                        if (s_logger.isTraceEnabled()) {
+                            s_logger.trace("SSL: Running delegated task!");
+                        }
+                        task.run();
+                    }
+                    break;
+                case FINISHED:
+                    break;
+                case NOT_HANDSHAKING:
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
             }
-            if (engResult != null)
-                hsStatus = engResult.getHandshakeStatus();
-            else
-                hsStatus = sslEngine.getHandshakeStatus();
+            handshakeStatus = sslEngine.getHandshakeStatus();
         }
+        return true;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioClient.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
index d989f30..dc4f670 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
@@ -36,7 +36,6 @@ public class NioClient extends NioConnection {
     private static final Logger s_logger = Logger.getLogger(NioClient.class);
 
     protected String _host;
-    protected String _bindAddress;
     protected SocketChannel _clientConnection;
 
     public NioClient(final String name, final String host, final int port, final int workers,
final HandlerFactory factory) {
@@ -44,10 +43,6 @@ public class NioClient extends NioConnection {
         _host = host;
     }
 
-    public void setBindAddress(final String ipAddress) {
-        _bindAddress = ipAddress;
-    }
-
     @Override
     protected void init() throws IOException {
         _selector = Selector.open();
@@ -55,33 +50,25 @@ public class NioClient extends NioConnection {
 
         try {
             _clientConnection = SocketChannel.open();
-            _clientConnection.configureBlocking(true);
-            s_logger.info("Connecting to " + _host + ":" + _port);
-
-            if (_bindAddress != null) {
-                s_logger.info("Binding outbound interface at " + _bindAddress);
-
-                final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
-                _clientConnection.socket().bind(bindAddr);
-            }
 
+            s_logger.info("Connecting to " + _host + ":" + _port);
             final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
             _clientConnection.connect(peerAddr);
-
-            SSLEngine sslEngine = null;
-            // Begin SSL handshake in BLOCKING mode
-            _clientConnection.configureBlocking(true);
+            _clientConnection.configureBlocking(false);
 
             final SSLContext sslContext = Link.initSSLContext(true);
-            sslEngine = sslContext.createSSLEngine(_host, _port);
+            SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port);
             sslEngine.setUseClientMode(true);
             sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-
-            Link.doHandshake(_clientConnection, sslEngine, true);
+            sslEngine.beginHandshake();
+            if (!Link.doHandshake(_clientConnection, sslEngine, true)) {
+                s_logger.error("SSL Handshake failed while connecting to host: " + _host
+ " port: " + _port);
+                _selector.close();
+                throw new IOException("SSL Handshake failed while connecting to host: " +
_host + " port: " + _port);
+            }
             s_logger.info("SSL: Handshake done");
             s_logger.info("Connected to " + _host + ":" + _port);
 
-            _clientConnection.configureBlocking(false);
             final Link link = new Link(peerAddr, this);
             link.setSSLEngine(sslEngine);
             final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
index 249f512..9d755d6 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
@@ -19,8 +19,13 @@
 
 package com.cloud.utils.nio;
 
-import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.exception.NioConnectionException;
+import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -44,14 +49,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.cloudstack.utils.security.SSLUtils;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.exception.NioConnectionException;
+import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable;
 
 /**
  * NioConnection abstracts the NIO socket operations.  The Java implementation
@@ -71,6 +69,7 @@ public abstract class NioConnection implements Callable<Boolean> {
     protected HandlerFactory _factory;
     protected String _name;
     protected ExecutorService _executor;
+    protected ExecutorService _sslHandshakeExecutor;
 
     public NioConnection(final String name, final int port, final int workers, final HandlerFactory
factory) {
         _name = name;
@@ -79,6 +78,7 @@ public abstract class NioConnection implements Callable<Boolean> {
         _port = port;
         _factory = factory;
         _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name + "-Handler"));
+        _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name
+ "-SSLHandshakeHandler"));
     }
 
     public void start() throws NioConnectionException {
@@ -125,7 +125,7 @@ public abstract class NioConnection implements Callable<Boolean>
{
     public Boolean call() throws NioConnectionException {
         while (_isRunning) {
             try {
-                _selector.select();
+                _selector.select(1000);
 
                 // Someone is ready for I/O, get the ready keys
                 final Set<SelectionKey> readyKeys = _selector.selectedKeys();
@@ -185,8 +185,9 @@ public abstract class NioConnection implements Callable<Boolean>
{
 
     protected void accept(final SelectionKey key) throws IOException {
         final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
-
         final SocketChannel socketChannel = serverSocketChannel.accept();
+        socketChannel.configureBlocking(false);
+
         final Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
 
@@ -194,43 +195,52 @@ public abstract class NioConnection implements Callable<Boolean>
{
             s_logger.trace("Connection accepted for " + socket);
         }
 
-        // Begin SSL handshake in BLOCKING mode
-        socketChannel.configureBlocking(true);
-
-        SSLEngine sslEngine = null;
+        final SSLEngine sslEngine;
         try {
             final SSLContext sslContext = Link.initSSLContext(false);
             sslEngine = sslContext.createSSLEngine();
             sslEngine.setUseClientMode(false);
             sslEngine.setNeedClientAuth(false);
             sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-
-            Link.doHandshake(socketChannel, sslEngine, false);
-
+            final NioConnection nioConnection = this;
+            _sslHandshakeExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    _selector.wakeup();
+                    try {
+                        sslEngine.beginHandshake();
+                        if (!Link.doHandshake(socketChannel, sslEngine, false)) {
+                            throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress());
+                        }
+                        if (s_logger.isTraceEnabled()) {
+                            s_logger.trace("SSL: Handshake done");
+                        }
+                        final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
+                        final Link link = new Link(saddr, nioConnection);
+                        link.setSSLEngine(sslEngine);
+                        link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ,
link));
+                        final Task task = _factory.create(Task.Type.CONNECT, link, null);
+                        registerLink(saddr, link);
+                        _executor.submit(task);
+                    } catch (IOException e) {
+                        if (s_logger.isTraceEnabled()) {
+                            s_logger.trace("Connection closed due to failure: " + e.getMessage());
+                        }
+                        closeAutoCloseable(socket, "accepting socket");
+                        closeAutoCloseable(socketChannel, "accepting socketChannel");
+                    } finally {
+                        _selector.wakeup();
+                    }
+                }
+            });
         } catch (final Exception e) {
             if (s_logger.isTraceEnabled()) {
-                s_logger.trace("Socket " + socket + " closed on read.  Probably -1 returned:
" + e.getMessage());
+                s_logger.trace("Connection closed due to failure: " + e.getMessage());
             }
+            closeAutoCloseable(socket, "accepting socket");
             closeAutoCloseable(socketChannel, "accepting socketChannel");
-            closeAutoCloseable(socket, "opened socket");
-            return;
-        }
-
-        if (s_logger.isTraceEnabled()) {
-            s_logger.trace("SSL: Handshake done");
-        }
-        socketChannel.configureBlocking(false);
-        final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
-        final Link link = new Link(saddr, this);
-        link.setSSLEngine(sslEngine);
-        link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
-        final Task task = _factory.create(Task.Type.CONNECT, link, null);
-        registerLink(saddr, link);
-
-        try {
-            _executor.submit(task);
-        } catch (final Exception e) {
-            s_logger.warn("Exception occurred when submitting the task", e);
+        } finally {
+            _selector.wakeup();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioServer.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
index 539c2bb..b655f18 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
@@ -43,6 +43,10 @@ public class NioServer extends NioConnection {
         _links = new WeakHashMap<InetSocketAddress, Link>(1024);
     }
 
+    public int getPort() {
+        return _serverSocket.socket().getLocalPort();
+    }
+
     @Override
     protected void init() throws IOException {
         _selector = SelectorProvider.provider().openSelector();
@@ -55,7 +59,7 @@ public class NioServer extends NioConnection {
 
         _serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null);
 
-        s_logger.info("NioConnection started and listening on " + _localAddr.toString());
+        s_logger.info("NioConnection started and listening on " + _serverSocket.socket().getLocalSocketAddress());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
----------------------------------------------------------------------
diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
index d8510cf..894aa1a 100644
--- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
+++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
@@ -19,14 +19,7 @@
 
 package com.cloud.utils.testcase;
 
-import java.nio.channels.ClosedChannelException;
-import java.util.Random;
-
-import junit.framework.TestCase;
-
-import org.apache.log4j.Logger;
-import org.junit.Assert;
-
+import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.exception.NioConnectionException;
 import com.cloud.utils.nio.HandlerFactory;
 import com.cloud.utils.nio.Link;
@@ -34,131 +27,199 @@ import com.cloud.utils.nio.NioClient;
 import com.cloud.utils.nio.NioServer;
 import com.cloud.utils.nio.Task;
 import com.cloud.utils.nio.Task.Type;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
- *
- *
- *
- *
+ * NioTest demonstrates that NioServer can function without getting its main IO
+ * loop blocked when an aggressive or malicious client connects to the server but
+ * fail to participate in SSL handshake. In this test, we run bunch of clients
+ * that send a known payload to the server, to which multiple malicious clients
+ * also try to connect and hang.
+ * A malicious client could cause denial-of-service if the server's main IO loop
+ * along with SSL handshake was blocking. A passing tests shows that NioServer
+ * can still function in case of connection load and that the main IO loop along
+ * with SSL handshake is non-blocking with some internal timeout mechanism.
  */
 
-public class NioTest extends TestCase {
+public class NioTest {
+
+    private static final Logger LOGGER = Logger.getLogger(NioTest.class);
 
-    private static final Logger s_logger = Logger.getLogger(NioTest.class);
+    // Test should fail in due time instead of looping forever
+    private static final int TESTTIMEOUT = 60000;
 
-    private NioServer _server;
-    private NioClient _client;
+    final private int totalTestCount = 2;
+    private int completedTestCount = 0;
 
-    private Link _clientLink;
+    private NioServer server;
+    private List<NioClient> clients = new ArrayList<>();
+    private List<NioClient> maliciousClients = new ArrayList<>();
 
-    private int _testCount;
-    private int _completedCount;
+    private ExecutorService clientExecutor = Executors.newFixedThreadPool(totalTestCount,
new NamedThreadFactory("NioClientHandler"));;
+    private ExecutorService maliciousExecutor = Executors.newFixedThreadPool(totalTestCount,
new NamedThreadFactory("MaliciousNioClientHandler"));;
+
+    private Random randomGenerator = new Random();
+    private byte[] testBytes;
 
     private boolean isTestsDone() {
         boolean result;
         synchronized (this) {
-            result = _testCount == _completedCount;
+            result = totalTestCount == completedTestCount;
         }
         return result;
     }
 
-    private void getOneMoreTest() {
-        synchronized (this) {
-            _testCount++;
-        }
-    }
-
     private void oneMoreTestDone() {
         synchronized (this) {
-            _completedCount++;
+            completedTestCount++;
         }
     }
 
-    @Override
+    @Before
     public void setUp() {
-        s_logger.info("Test");
+        LOGGER.info("Setting up Benchmark Test");
 
-        _testCount = 0;
-        _completedCount = 0;
+        completedTestCount = 0;
+        testBytes = new byte[1000000];
+        randomGenerator.nextBytes(testBytes);
 
-        _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer());
+        server = new NioServer("NioTestServer", 0, 1, new NioTestServer());
         try {
-            _server.start();
+            server.start();
         } catch (final NioConnectionException e) {
-            fail(e.getMessage());
+            Assert.fail(e.getMessage());
         }
 
-        _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient());
-        try {
-            _client.start();
-        } catch (final NioConnectionException e) {
-            fail(e.getMessage());
-        }
+        for (int i = 0; i < totalTestCount; i++) {
+            final NioClient maliciousClient = new NioMaliciousClient("NioMaliciousTestClient-"
+ i, "127.0.0.1", server.getPort(), 1, new NioMaliciousTestClient());
+            maliciousClients.add(maliciousClient);
+            maliciousExecutor.submit(new ThreadedNioClient(maliciousClient));
 
-        while (_clientLink == null) {
-            try {
-                s_logger.debug("Link is not up! Waiting ...");
-                Thread.sleep(1000);
-            } catch (final InterruptedException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
+            final NioClient client = new NioClient("NioTestClient-" + i, "127.0.0.1", server.getPort(),
1, new NioTestClient());
+            clients.add(client);
+            clientExecutor.submit(new ThreadedNioClient(client));
         }
     }
 
-    @Override
+    @After
     public void tearDown() {
+        stopClient();
+        stopServer();
+    }
+
+    protected void stopClient() {
+        for (NioClient client : clients) {
+            client.stop();
+        }
+        for (NioClient maliciousClient : maliciousClients) {
+            maliciousClient.stop();
+        }
+        LOGGER.info("Clients stopped.");
+    }
+
+    protected void stopServer() {
+        server.stop();
+        LOGGER.info("Server stopped.");
+    }
+
+    @Test(timeout=TESTTIMEOUT)
+    public void testConnection() {
         while (!isTestsDone()) {
             try {
-                s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting
for completion");
+                LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done. Waiting
for completion");
                 Thread.sleep(1000);
             } catch (final InterruptedException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+                Assert.fail(e.getMessage());
             }
         }
-        stopClient();
-        stopServer();
+        LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done.");
     }
 
-    protected void stopClient() {
-        _client.stop();
-        s_logger.info("Client stopped.");
+    protected void doServerProcess(final byte[] data) {
+        oneMoreTestDone();
+        Assert.assertArrayEquals(testBytes, data);
+        LOGGER.info("Verify data received by server done.");
     }
 
-    protected void stopServer() {
-        _server.stop();
-        s_logger.info("Server stopped.");
+    public byte[] getTestBytes() {
+        return testBytes;
     }
 
-    protected void setClientLink(final Link link) {
-        _clientLink = link;
+    public class ThreadedNioClient implements Runnable {
+        final private NioClient client;
+        ThreadedNioClient(final NioClient client) {
+            this.client = client;
+        }
+
+        @Override
+        public void run() {
+            try {
+                client.start();
+            } catch (NioConnectionException e) {
+                Assert.fail(e.getMessage());
+            }
+        }
     }
 
-    Random randomGenerator = new Random();
+    public class NioMaliciousClient extends NioClient {
 
-    byte[] _testBytes;
+        public NioMaliciousClient(String name, String host, int port, int workers, HandlerFactory
factory) {
+            super(name, host, port, workers, factory);
+        }
 
-    public void testConnection() {
-        _testBytes = new byte[1000000];
-        randomGenerator.nextBytes(_testBytes);
-        try {
-            getOneMoreTest();
-            _clientLink.send(_testBytes);
-            s_logger.info("Client: Data sent");
-            getOneMoreTest();
-            _clientLink.send(_testBytes);
-            s_logger.info("Client: Data sent");
-        } catch (final ClosedChannelException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+        @Override
+        protected void init() throws IOException {
+            _selector = Selector.open();
+            try {
+                _clientConnection = SocketChannel.open();
+                LOGGER.info("Connecting to " + _host + ":" + _port);
+                final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
+                _clientConnection.connect(peerAddr);
+                // This is done on purpose, the malicious client would connect
+                // to the server and then do nothing, hence using a large sleep value
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (final IOException e) {
+                _selector.close();
+                throw e;
+            } catch (InterruptedException e) {
+                LOGGER.debug(e.getMessage());
+            }
         }
     }
 
-    protected void doServerProcess(final byte[] data) {
-        oneMoreTestDone();
-        Assert.assertArrayEquals(_testBytes, data);
-        s_logger.info("Verify done.");
+    public class NioMaliciousTestClient implements HandlerFactory {
+
+        @Override
+        public Task create(final Type type, final Link link, final byte[] data) {
+            return new NioMaliciousTestClientHandler(type, link, data);
+        }
+
+        public class NioMaliciousTestClientHandler extends Task {
+
+            public NioMaliciousTestClientHandler(final Type type, final Link link, final
byte[] data) {
+                super(type, link, data);
+            }
+
+            @Override
+            public void doTask(final Task task) {
+                LOGGER.info("Malicious Client: Received task " + task.getType().toString());
+            }
+        }
     }
 
     public class NioTestClient implements HandlerFactory {
@@ -177,18 +238,23 @@ public class NioTest extends TestCase {
             @Override
             public void doTask(final Task task) {
                 if (task.getType() == Task.Type.CONNECT) {
-                    s_logger.info("Client: Received CONNECT task");
-                    setClientLink(task.getLink());
+                    LOGGER.info("Client: Received CONNECT task");
+                    try {
+                        LOGGER.info("Sending data to server");
+                        task.getLink().send(getTestBytes());
+                    } catch (ClosedChannelException e) {
+                        LOGGER.error(e.getMessage());
+                        e.printStackTrace();
+                    }
                 } else if (task.getType() == Task.Type.DATA) {
-                    s_logger.info("Client: Received DATA task");
+                    LOGGER.info("Client: Received DATA task");
                 } else if (task.getType() == Task.Type.DISCONNECT) {
-                    s_logger.info("Client: Received DISCONNECT task");
+                    LOGGER.info("Client: Received DISCONNECT task");
                     stopClient();
                 } else if (task.getType() == Task.Type.OTHER) {
-                    s_logger.info("Client: Received OTHER task");
+                    LOGGER.info("Client: Received OTHER task");
                 }
             }
-
         }
     }
 
@@ -208,18 +274,17 @@ public class NioTest extends TestCase {
             @Override
             public void doTask(final Task task) {
                 if (task.getType() == Task.Type.CONNECT) {
-                    s_logger.info("Server: Received CONNECT task");
+                    LOGGER.info("Server: Received CONNECT task");
                 } else if (task.getType() == Task.Type.DATA) {
-                    s_logger.info("Server: Received DATA task");
+                    LOGGER.info("Server: Received DATA task");
                     doServerProcess(task.getData());
                 } else if (task.getType() == Task.Type.DISCONNECT) {
-                    s_logger.info("Server: Received DISCONNECT task");
+                    LOGGER.info("Server: Received DISCONNECT task");
                     stopServer();
                 } else if (task.getType() == Task.Type.OTHER) {
-                    s_logger.info("Server: Received OTHER task");
+                    LOGGER.info("Server: Received OTHER task");
                 }
             }
-
         }
     }
 }


Mime
View raw message