cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sw...@apache.org
Subject [4/4] git commit: updated refs/heads/master to b4ad38d
Date Mon, 16 May 2016 21:31:56 GMT
Revert "Merge pull request #1493 from shapeblue/nio-fix"

This reverts commit 7ce0e10fbcd949375e43535aae168421ecdaa562, reversing
changes made to 29ba71f2db3a3b7dcdad3a7c1f83725cffd56261.

This was reverted because it seemed to be related to an issue
when doing a DeployDC, causing an `addHost` error.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/b4ad38d6
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/b4ad38d6
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/b4ad38d6

Branch: refs/heads/master
Commit: b4ad38d687abc7173c7231748cb52135149415ca
Parents: f175ad1
Author: Will Stevens <williamstevens@gmail.com>
Authored: Mon May 16 17:30:44 2016 -0400
Committer: Will Stevens <williamstevens@gmail.com>
Committed: Mon May 16 17:30:44 2016 -0400

----------------------------------------------------------------------
 .../manager/ClusteredAgentManagerImpl.java      |   9 +-
 utils/pom.xml                                   |   1 +
 .../src/main/java/com/cloud/utils/nio/Link.java | 294 +++++++------------
 .../java/com/cloud/utils/nio/NioClient.java     |  31 +-
 .../java/com/cloud/utils/nio/NioConnection.java |  84 +++---
 .../java/com/cloud/utils/nio/NioServer.java     |   8 +-
 .../java/com/cloud/utils/testcase/NioTest.java  | 249 ++++++----------
 7 files changed, 270 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 9239adc..75f860d 100644
--- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -499,7 +499,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements
Clust
                 SocketChannel ch1 = null;
                 try {
                     ch1 = SocketChannel.open(new InetSocketAddress(addr, Port.value()));
-                    ch1.configureBlocking(false);
+                    ch1.configureBlocking(true); // make sure we are working at blocking
mode
                     ch1.socket().setKeepAlive(true);
                     ch1.socket().setSoTimeout(60 * 1000);
                     try {
@@ -507,11 +507,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements
Clust
                         sslEngine = sslContext.createSSLEngine(ip, Port.value());
                         sslEngine.setUseClientMode(true);
                         sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(ch1, sslEngine, true)) {
-                            ch1.close();
-                            throw new IOException("SSL handshake failed!");
-                        }
+
+                        Link.doHandshake(ch1, sslEngine, true);
                         s_logger.info("SSL: Handshake done");
                     } catch (final Exception e) {
                         ch1.close();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/pom.xml
----------------------------------------------------------------------
diff --git a/utils/pom.xml b/utils/pom.xml
index 9e23586..206eb18 100755
--- a/utils/pom.xml
+++ b/utils/pom.xml
@@ -208,6 +208,7 @@
           <excludes>
             <exclude>com/cloud/utils/testcase/*TestCase*</exclude>
             <exclude>com/cloud/utils/db/*Test*</exclude>
+            <exclude>com/cloud/utils/testcase/NioTest.java</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/src/main/java/com/cloud/utils/nio/Link.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/Link.java b/utils/src/main/java/com/cloud/utils/nio/Link.java
index f297d52..6d6306a 100644
--- a/utils/src/main/java/com/cloud/utils/nio/Link.java
+++ b/utils/src/main/java/com/cloud/utils/nio/Link.java
@@ -19,33 +19,37 @@
 
 package com.cloud.utils.nio;
 
-import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.db.DbProperties;
-import org.apache.cloudstack.utils.security.SSLUtils;
-import org.apache.log4j.Logger;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.PropertiesUtil;
+import com.cloud.utils.db.DbProperties;
+
 /**
  */
 public class Link {
@@ -449,185 +453,115 @@ public class Link {
         return sslContext;
     }
 
-    public static ByteBuffer enlargeBuffer(ByteBuffer buffer, final int sessionProposedCapacity)
{
-        if (buffer == null || sessionProposedCapacity < 0) {
-            return buffer;
-        }
-        if (sessionProposedCapacity > buffer.capacity()) {
-            buffer = ByteBuffer.allocate(sessionProposedCapacity);
-        } else {
-            buffer = ByteBuffer.allocate(buffer.capacity() * 2);
+    public static void doHandshake(SocketChannel ch, SSLEngine sslEngine, boolean isClient)
throws IOException {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("SSL: begin Handshake, isClient: " + isClient);
         }
-        return buffer;
-    }
 
-    public static ByteBuffer handleBufferUnderflow(final SSLEngine engine, ByteBuffer buffer)
{
-        if (engine == null || buffer == null) {
-            return buffer;
-        }
-        if (buffer.position() < buffer.limit()) {
-            return buffer;
+        SSLEngineResult engResult;
+        SSLSession sslSession = sslEngine.getSession();
+        HandshakeStatus hsStatus;
+        ByteBuffer in_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
+        ByteBuffer in_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize()
+ 40);
+        ByteBuffer out_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40);
+        ByteBuffer out_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize()
+ 40);
+        int count;
+        ch.socket().setSoTimeout(60 * 1000);
+        InputStream inStream = ch.socket().getInputStream();
+        // Use readCh to make sure the timeout on reading is working
+        ReadableByteChannel readCh = Channels.newChannel(inStream);
+
+        if (isClient) {
+            hsStatus = SSLEngineResult.HandshakeStatus.NEED_WRAP;
+        } else {
+            hsStatus = SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
         }
-        ByteBuffer replaceBuffer = enlargeBuffer(buffer, engine.getSession().getPacketBufferSize());
-        buffer.flip();
-        replaceBuffer.put(buffer);
-        return replaceBuffer;
-    }
 
-    private static boolean doHandshakeUnwrap(final SocketChannel socketChannel, final SSLEngine
sslEngine,
-                                             ByteBuffer peerAppData, ByteBuffer peerNetData,
final int appBufferSize) throws IOException {
-        if (socketChannel == null || sslEngine == null || peerAppData == null || peerNetData
== null || appBufferSize < 0) {
-            return false;
-        }
-        if (socketChannel.read(peerNetData) < 0) {
-            if (sslEngine.isInboundDone() && sslEngine.isOutboundDone()) {
-                return false;
-            }
-            try {
-                sslEngine.closeInbound();
-            } catch (SSLException e) {
-                s_logger.warn("This SSL engine was forced to close inbound due to end of
stream.");
+        while (hsStatus != SSLEngineResult.HandshakeStatus.FINISHED) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("SSL: Handshake status " + hsStatus);
             }
-            sslEngine.closeOutbound();
-            // After closeOutbound the engine will be set to WRAP state,
-            // in order to try to send a close message to the client.
-            return true;
-        }
-        peerNetData.flip();
-        SSLEngineResult result = null;
-        try {
-            result = sslEngine.unwrap(peerNetData, peerAppData);
-            peerNetData.compact();
-        } catch (SSLException sslException) {
-            s_logger.error("SSL error occurred while processing unwrap data: " + sslException.getMessage());
-            sslEngine.closeOutbound();
-            return true;
-        }
-        switch (result.getStatus()) {
-            case OK:
-                break;
-            case BUFFER_OVERFLOW:
-                // Will occur when peerAppData's capacity is smaller than the data derived
from peerNetData's unwrap.
-                peerAppData = enlargeBuffer(peerAppData, appBufferSize);
-                break;
-            case BUFFER_UNDERFLOW:
-                // Will occur either when no data was read from the peer or when the peerNetData
buffer
-                // was too small to hold all peer's data.
-                peerNetData = handleBufferUnderflow(sslEngine, peerNetData);
-                break;
-            case CLOSED:
-                if (sslEngine.isOutboundDone()) {
-                    return false;
-                } else {
-                    sslEngine.closeOutbound();
-                    break;
-                }
-            default:
-                throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
-        }
-        return true;
-    }
-
-    private static boolean doHandshakeWrap(final SocketChannel socketChannel, final SSLEngine
sslEngine,
-                                           ByteBuffer myAppData, ByteBuffer myNetData, ByteBuffer
peerNetData,
-                                           final int netBufferSize) throws IOException {
-        if (socketChannel == null || sslEngine == null || myNetData == null || peerNetData
== null
-                || myAppData == null || netBufferSize < 0) {
-            return false;
-        }
-        myNetData.clear();
-        SSLEngineResult result = null;
-        try {
-            result = sslEngine.wrap(myAppData, myNetData);
-        } catch (SSLException sslException) {
-            s_logger.error("SSL error occurred while processing wrap data: " + sslException.getMessage());
-            sslEngine.closeOutbound();
-            return true;
-        }
-        switch (result.getStatus()) {
-            case OK :
-                myNetData.flip();
-                while (myNetData.hasRemaining()) {
-                    socketChannel.write(myNetData);
+            engResult = null;
+            if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
+                out_pkgBuf.clear();
+                out_appBuf.clear();
+                out_appBuf.put("Hello".getBytes());
+                engResult = sslEngine.wrap(out_appBuf, out_pkgBuf);
+                out_pkgBuf.flip();
+                int remain = out_pkgBuf.limit();
+                while (remain != 0) {
+                    remain -= ch.write(out_pkgBuf);
+                    if (remain < 0) {
+                        throw new IOException("Too much bytes sent?");
+                    }
                 }
-                break;
-            case BUFFER_OVERFLOW:
-                // Will occur if there is not enough space in myNetData buffer to write all
the data
-                // that would be generated by the method wrap. Since myNetData is set to
session's packet
-                // size we should not get to this point because SSLEngine is supposed to
produce messages
-                // smaller or equal to that, but a general handling would be the following:
-                myNetData = enlargeBuffer(myNetData, netBufferSize);
-                break;
-            case BUFFER_UNDERFLOW:
-                throw new SSLException("Buffer underflow occurred after a wrap. We should
not reach here.");
-            case CLOSED:
-                try {
-                    myNetData.flip();
-                    while (myNetData.hasRemaining()) {
-                        socketChannel.write(myNetData);
+            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
+                in_appBuf.clear();
+                // One packet may contained multiply operation
+                if (in_pkgBuf.position() == 0 || !in_pkgBuf.hasRemaining()) {
+                    in_pkgBuf.clear();
+                    count = 0;
+                    try {
+                        count = readCh.read(in_pkgBuf);
+                    } catch (SocketTimeoutException ex) {
+                        if (s_logger.isTraceEnabled()) {
+                            s_logger.trace("Handshake reading time out! Cut the connection");
+                        }
+                        count = -1;
+                    }
+                    if (count == -1) {
+                        throw new IOException("Connection closed with -1 on reading size.");
                     }
-                    // At this point the handshake status will probably be NEED_UNWRAP
-                    // so we make sure that peerNetData is clear to read.
-                    peerNetData.clear();
-                } catch (Exception e) {
-                    s_logger.error("Failed to send server's CLOSE message due to socket channel's
failure.");
+                    in_pkgBuf.flip();
                 }
-                break;
-            default:
-                throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
-        }
-        return true;
-    }
-
-    public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine
sslEngine, final boolean isClient) throws IOException {
-        if (socketChannel == null || sslEngine == null) {
-            return false;
-        }
-        final int appBufferSize = sslEngine.getSession().getApplicationBufferSize();
-        final int netBufferSize = sslEngine.getSession().getPacketBufferSize();
-        ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
-        ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
-        ByteBuffer myNetData = ByteBuffer.allocate(netBufferSize);
-        ByteBuffer peerNetData = ByteBuffer.allocate(netBufferSize);
-
-        final long startTimeMills = System.currentTimeMillis();
-
-        HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
-        while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
-                && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING)
{
-            final long timeTaken = System.currentTimeMillis() - startTimeMills;
-            if (timeTaken > 60000L) {
-                s_logger.warn("SSL Handshake has taken more than 60s to connect to: " + socketChannel.getRemoteAddress()
+
-                        ". Please investigate this connection.");
-                return false;
-            }
-            switch (handshakeStatus) {
-                case NEED_UNWRAP:
-                    if (!doHandshakeUnwrap(socketChannel, sslEngine, peerAppData, peerNetData,
appBufferSize)) {
-                        return false;
+                engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
+                ByteBuffer tmp_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize()
+ 40);
+                int loop_count = 0;
+                while (engResult.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW)
{
+                    // The client is too slow? Cut it and let it reconnect
+                    if (loop_count > 10) {
+                        throw new IOException("Too many times in SSL BUFFER_UNDERFLOW, disconnect
guest.");
                     }
-                    break;
-                case NEED_WRAP:
-                    if (!doHandshakeWrap(socketChannel, sslEngine,  myAppData, myNetData,
peerNetData, netBufferSize)) {
-                        return false;
+                    // We need more packets to complete this operation
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("SSL: Buffer underflowed, getting more packets");
                     }
-                    break;
-                case NEED_TASK:
-                    Runnable task;
-                    while ((task = sslEngine.getDelegatedTask()) != null) {
-                        new Thread(task).run();
+                    tmp_pkgBuf.clear();
+                    count = ch.read(tmp_pkgBuf);
+                    if (count == -1) {
+                        throw new IOException("Connection closed with -1 on reading size.");
                     }
-                    break;
-                case FINISHED:
-                    break;
-                case NOT_HANDSHAKING:
-                    break;
-                default:
-                    throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
+                    tmp_pkgBuf.flip();
+
+                    in_pkgBuf.mark();
+                    in_pkgBuf.position(in_pkgBuf.limit());
+                    in_pkgBuf.limit(in_pkgBuf.limit() + tmp_pkgBuf.limit());
+                    in_pkgBuf.put(tmp_pkgBuf);
+                    in_pkgBuf.reset();
+
+                    in_appBuf.clear();
+                    engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf);
+                    loop_count++;
+                }
+            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_TASK) {
+                Runnable run;
+                while ((run = sslEngine.getDelegatedTask()) != null) {
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("SSL: Running delegated task!");
+                    }
+                    run.run();
+                }
+            } else if (hsStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+                throw new IOException("NOT a handshaking!");
+            }
+            if (engResult != null && engResult.getStatus() != SSLEngineResult.Status.OK)
{
+                throw new IOException("Fail to handshake! " + engResult.getStatus());
             }
-            handshakeStatus = sslEngine.getHandshakeStatus();
+            if (engResult != null)
+                hsStatus = engResult.getHandshakeStatus();
+            else
+                hsStatus = sslEngine.getHandshakeStatus();
         }
-        return true;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/src/main/java/com/cloud/utils/nio/NioClient.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
index dc4f670..d989f30 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java
@@ -36,6 +36,7 @@ public class NioClient extends NioConnection {
     private static final Logger s_logger = Logger.getLogger(NioClient.class);
 
     protected String _host;
+    protected String _bindAddress;
     protected SocketChannel _clientConnection;
 
     public NioClient(final String name, final String host, final int port, final int workers,
final HandlerFactory factory) {
@@ -43,6 +44,10 @@ public class NioClient extends NioConnection {
         _host = host;
     }
 
+    public void setBindAddress(final String ipAddress) {
+        _bindAddress = ipAddress;
+    }
+
     @Override
     protected void init() throws IOException {
         _selector = Selector.open();
@@ -50,25 +55,33 @@ public class NioClient extends NioConnection {
 
         try {
             _clientConnection = SocketChannel.open();
-
+            _clientConnection.configureBlocking(true);
             s_logger.info("Connecting to " + _host + ":" + _port);
+
+            if (_bindAddress != null) {
+                s_logger.info("Binding outbound interface at " + _bindAddress);
+
+                final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0);
+                _clientConnection.socket().bind(bindAddr);
+            }
+
             final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
             _clientConnection.connect(peerAddr);
-            _clientConnection.configureBlocking(false);
+
+            SSLEngine sslEngine = null;
+            // Begin SSL handshake in BLOCKING mode
+            _clientConnection.configureBlocking(true);
 
             final SSLContext sslContext = Link.initSSLContext(true);
-            SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port);
+            sslEngine = sslContext.createSSLEngine(_host, _port);
             sslEngine.setUseClientMode(true);
             sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-            sslEngine.beginHandshake();
-            if (!Link.doHandshake(_clientConnection, sslEngine, true)) {
-                s_logger.error("SSL Handshake failed while connecting to host: " + _host
+ " port: " + _port);
-                _selector.close();
-                throw new IOException("SSL Handshake failed while connecting to host: " +
_host + " port: " + _port);
-            }
+
+            Link.doHandshake(_clientConnection, sslEngine, true);
             s_logger.info("SSL: Handshake done");
             s_logger.info("Connected to " + _host + ":" + _port);
 
+            _clientConnection.configureBlocking(false);
             final Link link = new Link(peerAddr, this);
             link.setSSLEngine(sslEngine);
             final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
index 6fdb473..249f512 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java
@@ -19,13 +19,8 @@
 
 package com.cloud.utils.nio;
 
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.exception.NioConnectionException;
-import org.apache.cloudstack.utils.security.SSLUtils;
-import org.apache.log4j.Logger;
+import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -49,7 +44,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.exception.NioConnectionException;
 
 /**
  * NioConnection abstracts the NIO socket operations.  The Java implementation
@@ -69,7 +71,6 @@ public abstract class NioConnection implements Callable<Boolean> {
     protected HandlerFactory _factory;
     protected String _name;
     protected ExecutorService _executor;
-    protected ExecutorService _sslHandshakeExecutor;
 
     public NioConnection(final String name, final int port, final int workers, final HandlerFactory
factory) {
         _name = name;
@@ -78,7 +79,6 @@ public abstract class NioConnection implements Callable<Boolean> {
         _port = port;
         _factory = factory;
         _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name + "-Handler"));
-        _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name
+ "-SSLHandshakeHandler"));
     }
 
     public void start() throws NioConnectionException {
@@ -185,9 +185,8 @@ public abstract class NioConnection implements Callable<Boolean>
{
 
     protected void accept(final SelectionKey key) throws IOException {
         final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
-        final SocketChannel socketChannel = serverSocketChannel.accept();
-        socketChannel.configureBlocking(false);
 
+        final SocketChannel socketChannel = serverSocketChannel.accept();
         final Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
 
@@ -195,52 +194,43 @@ public abstract class NioConnection implements Callable<Boolean>
{
             s_logger.trace("Connection accepted for " + socket);
         }
 
-        final SSLEngine sslEngine;
+        // Begin SSL handshake in BLOCKING mode
+        socketChannel.configureBlocking(true);
+
+        SSLEngine sslEngine = null;
         try {
             final SSLContext sslContext = Link.initSSLContext(false);
             sslEngine = sslContext.createSSLEngine();
             sslEngine.setUseClientMode(false);
             sslEngine.setNeedClientAuth(false);
             sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
-            final NioConnection nioConnection = this;
-            _sslHandshakeExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    _selector.wakeup();
-                    try {
-                        sslEngine.beginHandshake();
-                        if (!Link.doHandshake(socketChannel, sslEngine, false)) {
-                            throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress());
-                        }
-                        if (s_logger.isTraceEnabled()) {
-                            s_logger.trace("SSL: Handshake done");
-                        }
-                        final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
-                        final Link link = new Link(saddr, nioConnection);
-                        link.setSSLEngine(sslEngine);
-                        link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ,
link));
-                        final Task task = _factory.create(Task.Type.CONNECT, link, null);
-                        registerLink(saddr, link);
-                        _executor.submit(task);
-                    } catch (IOException e) {
-                        if (s_logger.isTraceEnabled()) {
-                            s_logger.trace("Connection closed due to failure: " + e.getMessage());
-                        }
-                        closeAutoCloseable(socket, "accepting socket");
-                        closeAutoCloseable(socketChannel, "accepting socketChannel");
-                    } finally {
-                        _selector.wakeup();
-                    }
-                }
-            });
+
+            Link.doHandshake(socketChannel, sslEngine, false);
+
         } catch (final Exception e) {
             if (s_logger.isTraceEnabled()) {
-                s_logger.trace("Connection closed due to failure: " + e.getMessage());
+                s_logger.trace("Socket " + socket + " closed on read.  Probably -1 returned:
" + e.getMessage());
             }
-            closeAutoCloseable(socket, "accepting socket");
             closeAutoCloseable(socketChannel, "accepting socketChannel");
-        } finally {
-            _selector.wakeup();
+            closeAutoCloseable(socket, "opened socket");
+            return;
+        }
+
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("SSL: Handshake done");
+        }
+        socketChannel.configureBlocking(false);
+        final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress();
+        final Link link = new Link(saddr, this);
+        link.setSSLEngine(sslEngine);
+        link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link));
+        final Task task = _factory.create(Task.Type.CONNECT, link, null);
+        registerLink(saddr, link);
+
+        try {
+            _executor.submit(task);
+        } catch (final Exception e) {
+            s_logger.warn("Exception occurred when submitting the task", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/src/main/java/com/cloud/utils/nio/NioServer.java
----------------------------------------------------------------------
diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
index 13d5476..539c2bb 100644
--- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java
+++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java
@@ -43,10 +43,6 @@ public class NioServer extends NioConnection {
         _links = new WeakHashMap<InetSocketAddress, Link>(1024);
     }
 
-    public int getPort() {
-        return _serverSocket.socket().getLocalPort();
-    }
-
     @Override
     protected void init() throws IOException {
         _selector = SelectorProvider.provider().openSelector();
@@ -57,9 +53,9 @@ public class NioServer extends NioConnection {
         _localAddr = new InetSocketAddress(_port);
         _serverSocket.socket().bind(_localAddr);
 
-        _serverSocket.register(_selector, SelectionKey.OP_ACCEPT);
+        _serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null);
 
-        s_logger.info("NioConnection started and listening on " + _serverSocket.socket().getLocalSocketAddress());
+        s_logger.info("NioConnection started and listening on " + _localAddr.toString());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b4ad38d6/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
----------------------------------------------------------------------
diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
index 20c31e2..d8510cf 100644
--- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
+++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java
@@ -19,7 +19,14 @@
 
 package com.cloud.utils.testcase;
 
-import com.cloud.utils.concurrency.NamedThreadFactory;
+import java.nio.channels.ClosedChannelException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+
 import com.cloud.utils.exception.NioConnectionException;
 import com.cloud.utils.nio.HandlerFactory;
 import com.cloud.utils.nio.Link;
@@ -27,200 +34,131 @@ import com.cloud.utils.nio.NioClient;
 import com.cloud.utils.nio.NioServer;
 import com.cloud.utils.nio.Task;
 import com.cloud.utils.nio.Task.Type;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
- * NioTest demonstrates that NioServer can function without getting its main IO
- * loop blocked when an aggressive or malicious client connects to the server but
- * fail to participate in SSL handshake. In this test, we run bunch of clients
- * that send a known payload to the server, to which multiple malicious clients
- * also try to connect and hang.
- * A malicious client could cause denial-of-service if the server's main IO loop
- * along with SSL handshake was blocking. A passing tests shows that NioServer
- * can still function in case of connection load and that the main IO loop along
- * with SSL handshake is non-blocking with some internal timeout mechanism.
+ *
+ *
+ *
+ *
  */
 
-public class NioTest {
-
-    private static final Logger LOGGER = Logger.getLogger(NioTest.class);
-
-    // Test should fail in due time instead of looping forever
-    private static final int TESTTIMEOUT = 300000;
+public class NioTest extends TestCase {
 
-    final private int totalTestCount = 5;
-    private int completedTestCount = 0;
+    private static final Logger s_logger = Logger.getLogger(NioTest.class);
 
-    private NioServer server;
-    private List<NioClient> clients = new ArrayList<>();
-    private List<NioClient> maliciousClients = new ArrayList<>();
+    private NioServer _server;
+    private NioClient _client;
 
-    private ExecutorService clientExecutor = Executors.newFixedThreadPool(totalTestCount,
new NamedThreadFactory("NioClientHandler"));;
-    private ExecutorService maliciousExecutor = Executors.newFixedThreadPool(5*totalTestCount,
new NamedThreadFactory("MaliciousNioClientHandler"));;
+    private Link _clientLink;
 
-    private Random randomGenerator = new Random();
-    private byte[] testBytes;
+    private int _testCount;
+    private int _completedCount;
 
     private boolean isTestsDone() {
         boolean result;
         synchronized (this) {
-            result = totalTestCount == completedTestCount;
+            result = _testCount == _completedCount;
         }
         return result;
     }
 
+    private void getOneMoreTest() {
+        synchronized (this) {
+            _testCount++;
+        }
+    }
+
     private void oneMoreTestDone() {
         synchronized (this) {
-            completedTestCount++;
+            _completedCount++;
         }
     }
 
-    @Before
+    @Override
     public void setUp() {
-        LOGGER.info("Setting up Benchmark Test");
+        s_logger.info("Test");
 
-        completedTestCount = 0;
-        testBytes = new byte[1000000];
-        randomGenerator.nextBytes(testBytes);
+        _testCount = 0;
+        _completedCount = 0;
 
-        server = new NioServer("NioTestServer", 0, 1, new NioTestServer());
+        _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer());
         try {
-            server.start();
+            _server.start();
         } catch (final NioConnectionException e) {
-            Assert.fail(e.getMessage());
+            fail(e.getMessage());
         }
 
-        for (int i = 0; i < totalTestCount; i++) {
-            for (int j = 0; j < 4; j++) {
-                final NioClient maliciousClient = new NioMaliciousClient("NioMaliciousTestClient-"
+ i, "127.0.0.1", server.getPort(), 1, new NioMaliciousTestClient());
-                maliciousClients.add(maliciousClient);
-                maliciousExecutor.submit(new ThreadedNioClient(maliciousClient));
-            }
-            final NioClient client = new NioClient("NioTestClient-" + i, "127.0.0.1", server.getPort(),
1, new NioTestClient());
-            clients.add(client);
-            clientExecutor.submit(new ThreadedNioClient(client));
+        _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient());
+        try {
+            _client.start();
+        } catch (final NioConnectionException e) {
+            fail(e.getMessage());
         }
-    }
-
-    @After
-    public void tearDown() {
-        stopClient();
-        stopServer();
-    }
 
-    protected void stopClient() {
-        for (NioClient client : clients) {
-            client.stop();
-        }
-        for (NioClient maliciousClient : maliciousClients) {
-            maliciousClient.stop();
+        while (_clientLink == null) {
+            try {
+                s_logger.debug("Link is not up! Waiting ...");
+                Thread.sleep(1000);
+            } catch (final InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
         }
-        LOGGER.info("Clients stopped.");
-    }
-
-    protected void stopServer() {
-        server.stop();
-        LOGGER.info("Server stopped.");
     }
 
-    @Test(timeout=TESTTIMEOUT)
-    public void testConnection() {
+    @Override
+    public void tearDown() {
         while (!isTestsDone()) {
             try {
-                LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done. Waiting
for completion");
+                s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting
for completion");
                 Thread.sleep(1000);
             } catch (final InterruptedException e) {
-                Assert.fail(e.getMessage());
+                // TODO Auto-generated catch block
+                e.printStackTrace();
             }
         }
-        LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done.");
+        stopClient();
+        stopServer();
     }
 
-    protected void doServerProcess(final byte[] data) {
-        oneMoreTestDone();
-        Assert.assertArrayEquals(testBytes, data);
-        LOGGER.info("Verify data received by server done.");
+    protected void stopClient() {
+        _client.stop();
+        s_logger.info("Client stopped.");
     }
 
-    public byte[] getTestBytes() {
-        return testBytes;
+    protected void stopServer() {
+        _server.stop();
+        s_logger.info("Server stopped.");
     }
 
-    public class ThreadedNioClient implements Runnable {
-        final private NioClient client;
-        ThreadedNioClient(final NioClient client) {
-            this.client = client;
-        }
-
-        @Override
-        public void run() {
-            try {
-                client.start();
-            } catch (NioConnectionException e) {
-                Assert.fail(e.getMessage());
-            }
-        }
+    protected void setClientLink(final Link link) {
+        _clientLink = link;
     }
 
-    public class NioMaliciousClient extends NioClient {
+    Random randomGenerator = new Random();
 
-        public NioMaliciousClient(String name, String host, int port, int workers, HandlerFactory
factory) {
-            super(name, host, port, workers, factory);
-        }
+    byte[] _testBytes;
 
-        @Override
-        protected void init() throws IOException {
-            _selector = Selector.open();
-            try {
-                _clientConnection = SocketChannel.open();
-                LOGGER.info("Connecting to " + _host + ":" + _port);
-                final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port);
-                _clientConnection.connect(peerAddr);
-                // This is done on purpose, the malicious client would connect
-                // to the server and then do nothing, hence using a large sleep value
-                Thread.sleep(Long.MAX_VALUE);
-            } catch (final IOException e) {
-                _selector.close();
-                throw e;
-            } catch (InterruptedException e) {
-                LOGGER.debug(e.getMessage());
-            }
+    public void testConnection() {
+        _testBytes = new byte[1000000];
+        randomGenerator.nextBytes(_testBytes);
+        try {
+            getOneMoreTest();
+            _clientLink.send(_testBytes);
+            s_logger.info("Client: Data sent");
+            getOneMoreTest();
+            _clientLink.send(_testBytes);
+            s_logger.info("Client: Data sent");
+        } catch (final ClosedChannelException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
         }
     }
 
-    public class NioMaliciousTestClient implements HandlerFactory {
-
-        @Override
-        public Task create(final Type type, final Link link, final byte[] data) {
-            return new NioMaliciousTestClientHandler(type, link, data);
-        }
-
-        public class NioMaliciousTestClientHandler extends Task {
-
-            public NioMaliciousTestClientHandler(final Type type, final Link link, final
byte[] data) {
-                super(type, link, data);
-            }
-
-            @Override
-            public void doTask(final Task task) {
-                LOGGER.info("Malicious Client: Received task " + task.getType().toString());
-            }
-        }
+    protected void doServerProcess(final byte[] data) {
+        oneMoreTestDone();
+        Assert.assertArrayEquals(_testBytes, data);
+        s_logger.info("Verify done.");
     }
 
     public class NioTestClient implements HandlerFactory {
@@ -239,23 +177,18 @@ public class NioTest {
             @Override
             public void doTask(final Task task) {
                 if (task.getType() == Task.Type.CONNECT) {
-                    LOGGER.info("Client: Received CONNECT task");
-                    try {
-                        LOGGER.info("Sending data to server");
-                        task.getLink().send(getTestBytes());
-                    } catch (ClosedChannelException e) {
-                        LOGGER.error(e.getMessage());
-                        e.printStackTrace();
-                    }
+                    s_logger.info("Client: Received CONNECT task");
+                    setClientLink(task.getLink());
                 } else if (task.getType() == Task.Type.DATA) {
-                    LOGGER.info("Client: Received DATA task");
+                    s_logger.info("Client: Received DATA task");
                 } else if (task.getType() == Task.Type.DISCONNECT) {
-                    LOGGER.info("Client: Received DISCONNECT task");
+                    s_logger.info("Client: Received DISCONNECT task");
                     stopClient();
                 } else if (task.getType() == Task.Type.OTHER) {
-                    LOGGER.info("Client: Received OTHER task");
+                    s_logger.info("Client: Received OTHER task");
                 }
             }
+
         }
     }
 
@@ -275,15 +208,15 @@ public class NioTest {
             @Override
             public void doTask(final Task task) {
                 if (task.getType() == Task.Type.CONNECT) {
-                    LOGGER.info("Server: Received CONNECT task");
+                    s_logger.info("Server: Received CONNECT task");
                 } else if (task.getType() == Task.Type.DATA) {
-                    LOGGER.info("Server: Received DATA task");
+                    s_logger.info("Server: Received DATA task");
                     doServerProcess(task.getData());
                 } else if (task.getType() == Task.Type.DISCONNECT) {
-                    LOGGER.info("Server: Received DISCONNECT task");
+                    s_logger.info("Server: Received DISCONNECT task");
                     stopServer();
                 } else if (task.getType() == Task.Type.OTHER) {
-                    LOGGER.info("Server: Received OTHER task");
+                    s_logger.info("Server: Received OTHER task");
                 }
             }
 


Mime
View raw message