Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D6B5C200AC0 for ; Tue, 24 May 2016 11:49:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D53D4160A30; Tue, 24 May 2016 09:49:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 55F6D160A11 for ; Tue, 24 May 2016 11:49:17 +0200 (CEST) Received: (qmail 2562 invoked by uid 500); 24 May 2016 09:49:09 -0000 Mailing-List: contact commits-help@cloudstack.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cloudstack.apache.org Delivered-To: mailing list commits@cloudstack.apache.org Received: (qmail 904 invoked by uid 99); 24 May 2016 09:49:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 09:49:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCF9CE01F4; Tue, 24 May 2016 09:49:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dahn@apache.org To: commits@cloudstack.apache.org Date: Tue, 24 May 2016 09:49:39 -0000 Message-Id: <78a783bc60224652bee25fc519495406@git.apache.org> In-Reply-To: <9f6d5bfbd239492394945380f6b96af9@git.apache.org> References: <9f6d5bfbd239492394945380f6b96af9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/48] git commit: updated refs/heads/4.9-bountycastle-daan to 98bf0ca archived-at: Tue, 24 May 2016 09:49:19 -0000 CLOUDSTACK-9348: NioConnection improvements - Unit test to demonstrate denial of service attack The NioConnection uses blocking handlers for various events such as connect, accept, read, write. In case a client connects NioServer (used by agent mgr to service agents on port 8250) but fails to participate in SSL handshake or just sits idle, this would block the main IO/selector loop in NioConnection. Such a client could be either malicious or aggresive. This unit test demonstrates such a malicious client that can perform a denial-of-service attack on NioServer that blocks it to serve any other client. - Use non-blocking SSL handshake - Uses non-blocking socket config in NioClient and NioServer/NioConnection - Scalable connectivity from agents and peer clustered-management server - Removes blocking ssl handshake code with a non-blocking code - Protects from denial-of-service issues that can degrade mgmt server responsiveness due to an aggressive/malicious client - Uses separate executor services for handling ssl handshakes Signed-off-by: Rohit Yadav Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/9c751869 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/9c751869 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/9c751869 Branch: refs/heads/4.9-bountycastle-daan Commit: 9c7518698d2f4a9fcc6a83fd22dd5b2fc4260232 Parents: b4ad38d Author: Rohit Yadav Authored: Fri Apr 15 00:24:53 2016 +0530 Committer: Rohit Yadav Committed: Thu May 19 16:45:16 2016 +0530 ---------------------------------------------------------------------- .../manager/ClusteredAgentManagerImpl.java | 9 +- .../src/main/java/com/cloud/utils/nio/Link.java | 297 ++++++++++++------- .../java/com/cloud/utils/nio/NioClient.java | 31 +- .../java/com/cloud/utils/nio/NioConnection.java | 86 +++--- .../java/com/cloud/utils/nio/NioServer.java | 6 +- .../java/com/cloud/utils/testcase/NioTest.java | 249 ++++++++++------ 6 files changed, 408 insertions(+), 270 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 75f860d..9239adc 100644 --- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -499,7 +499,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust SocketChannel ch1 = null; try { ch1 = SocketChannel.open(new InetSocketAddress(addr, Port.value())); - ch1.configureBlocking(true); // make sure we are working at blocking mode + ch1.configureBlocking(false); ch1.socket().setKeepAlive(true); ch1.socket().setSoTimeout(60 * 1000); try { @@ -507,8 +507,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust sslEngine = sslContext.createSSLEngine(ip, Port.value()); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); - - Link.doHandshake(ch1, sslEngine, true); + sslEngine.beginHandshake(); + if (!Link.doHandshake(ch1, sslEngine, true)) { + ch1.close(); + throw new IOException("SSL handshake failed!"); + } s_logger.info("SSL: Handshake done"); } catch (final Exception e) { ch1.close(); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/Link.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/Link.java b/utils/src/main/java/com/cloud/utils/nio/Link.java index 6d6306a..02ffaab 100644 --- a/utils/src/main/java/com/cloud/utils/nio/Link.java +++ b/utils/src/main/java/com/cloud/utils/nio/Link.java @@ -19,37 +19,33 @@ package com.cloud.utils.nio; +import com.cloud.utils.PropertiesUtil; +import com.cloud.utils.db.DbProperties; +import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.util.concurrent.ConcurrentLinkedQueue; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.cloudstack.utils.security.SSLUtils; -import org.apache.log4j.Logger; - -import com.cloud.utils.PropertiesUtil; -import com.cloud.utils.db.DbProperties; - /** */ public class Link { @@ -453,115 +449,188 @@ public class Link { return sslContext; } - public static void doHandshake(SocketChannel ch, SSLEngine sslEngine, boolean isClient) throws IOException { - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: begin Handshake, isClient: " + isClient); + public static ByteBuffer enlargeBuffer(ByteBuffer buffer, final int sessionProposedCapacity) { + if (buffer == null || sessionProposedCapacity < 0) { + return buffer; } - - SSLEngineResult engResult; - SSLSession sslSession = sslEngine.getSession(); - HandshakeStatus hsStatus; - ByteBuffer in_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40); - ByteBuffer in_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40); - ByteBuffer out_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40); - ByteBuffer out_appBuf = ByteBuffer.allocate(sslSession.getApplicationBufferSize() + 40); - int count; - ch.socket().setSoTimeout(60 * 1000); - InputStream inStream = ch.socket().getInputStream(); - // Use readCh to make sure the timeout on reading is working - ReadableByteChannel readCh = Channels.newChannel(inStream); - - if (isClient) { - hsStatus = SSLEngineResult.HandshakeStatus.NEED_WRAP; + if (sessionProposedCapacity > buffer.capacity()) { + buffer = ByteBuffer.allocate(sessionProposedCapacity); } else { - hsStatus = SSLEngineResult.HandshakeStatus.NEED_UNWRAP; + buffer = ByteBuffer.allocate(buffer.capacity() * 2); } + return buffer; + } - while (hsStatus != SSLEngineResult.HandshakeStatus.FINISHED) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: Handshake status " + hsStatus); + public static ByteBuffer handleBufferUnderflow(final SSLEngine engine, ByteBuffer buffer) { + if (engine == null || buffer == null) { + return buffer; + } + if (buffer.position() < buffer.limit()) { + return buffer; + } + ByteBuffer replaceBuffer = enlargeBuffer(buffer, engine.getSession().getPacketBufferSize()); + buffer.flip(); + replaceBuffer.put(buffer); + return replaceBuffer; + } + + private static boolean doHandshakeUnwrap(final SocketChannel socketChannel, final SSLEngine sslEngine, + ByteBuffer peerAppData, ByteBuffer peerNetData, final int appBufferSize) throws IOException { + if (socketChannel == null || sslEngine == null || peerAppData == null || peerNetData == null || appBufferSize < 0) { + return false; + } + if (socketChannel.read(peerNetData) < 0) { + if (sslEngine.isInboundDone() && sslEngine.isOutboundDone()) { + return false; } - engResult = null; - if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_WRAP) { - out_pkgBuf.clear(); - out_appBuf.clear(); - out_appBuf.put("Hello".getBytes()); - engResult = sslEngine.wrap(out_appBuf, out_pkgBuf); - out_pkgBuf.flip(); - int remain = out_pkgBuf.limit(); - while (remain != 0) { - remain -= ch.write(out_pkgBuf); - if (remain < 0) { - throw new IOException("Too much bytes sent?"); - } - } - } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { - in_appBuf.clear(); - // One packet may contained multiply operation - if (in_pkgBuf.position() == 0 || !in_pkgBuf.hasRemaining()) { - in_pkgBuf.clear(); - count = 0; - try { - count = readCh.read(in_pkgBuf); - } catch (SocketTimeoutException ex) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("Handshake reading time out! Cut the connection"); - } - count = -1; - } - if (count == -1) { - throw new IOException("Connection closed with -1 on reading size."); - } - in_pkgBuf.flip(); + try { + sslEngine.closeInbound(); + } catch (SSLException e) { + s_logger.warn("This SSL engine was forced to close inbound due to end of stream."); + } + sslEngine.closeOutbound(); + // After closeOutbound the engine will be set to WRAP state, + // in order to try to send a close message to the client. + return true; + } + peerNetData.flip(); + SSLEngineResult result = null; + try { + result = sslEngine.unwrap(peerNetData, peerAppData); + peerNetData.compact(); + } catch (SSLException sslException) { + s_logger.error("SSL error occurred while processing unwrap data: " + sslException.getMessage()); + sslEngine.closeOutbound(); + return true; + } + switch (result.getStatus()) { + case OK: + break; + case BUFFER_OVERFLOW: + // Will occur when peerAppData's capacity is smaller than the data derived from peerNetData's unwrap. + peerAppData = enlargeBuffer(peerAppData, appBufferSize); + break; + case BUFFER_UNDERFLOW: + // Will occur either when no data was read from the peer or when the peerNetData buffer + // was too small to hold all peer's data. + peerNetData = handleBufferUnderflow(sslEngine, peerNetData); + break; + case CLOSED: + if (sslEngine.isOutboundDone()) { + return false; + } else { + sslEngine.closeOutbound(); + break; } - engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf); - ByteBuffer tmp_pkgBuf = ByteBuffer.allocate(sslSession.getPacketBufferSize() + 40); - int loop_count = 0; - while (engResult.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) { - // The client is too slow? Cut it and let it reconnect - if (loop_count > 10) { - throw new IOException("Too many times in SSL BUFFER_UNDERFLOW, disconnect guest."); - } - // We need more packets to complete this operation - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: Buffer underflowed, getting more packets"); - } - tmp_pkgBuf.clear(); - count = ch.read(tmp_pkgBuf); - if (count == -1) { - throw new IOException("Connection closed with -1 on reading size."); - } - tmp_pkgBuf.flip(); - - in_pkgBuf.mark(); - in_pkgBuf.position(in_pkgBuf.limit()); - in_pkgBuf.limit(in_pkgBuf.limit() + tmp_pkgBuf.limit()); - in_pkgBuf.put(tmp_pkgBuf); - in_pkgBuf.reset(); + default: + throw new IllegalStateException("Invalid SSL status: " + result.getStatus()); + } + return true; + } - in_appBuf.clear(); - engResult = sslEngine.unwrap(in_pkgBuf, in_appBuf); - loop_count++; + private static boolean doHandshakeWrap(final SocketChannel socketChannel, final SSLEngine sslEngine, + ByteBuffer myAppData, ByteBuffer myNetData, ByteBuffer peerNetData, + final int netBufferSize) throws IOException { + if (socketChannel == null || sslEngine == null || myNetData == null || peerNetData == null + || myAppData == null || netBufferSize < 0) { + return false; + } + myNetData.clear(); + SSLEngineResult result = null; + try { + result = sslEngine.wrap(myAppData, myNetData); + } catch (SSLException sslException) { + s_logger.error("SSL error occurred while processing wrap data: " + sslException.getMessage()); + sslEngine.closeOutbound(); + return true; + } + switch (result.getStatus()) { + case OK : + myNetData.flip(); + while (myNetData.hasRemaining()) { + socketChannel.write(myNetData); } - } else if (hsStatus == SSLEngineResult.HandshakeStatus.NEED_TASK) { - Runnable run; - while ((run = sslEngine.getDelegatedTask()) != null) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: Running delegated task!"); + break; + case BUFFER_OVERFLOW: + // Will occur if there is not enough space in myNetData buffer to write all the data + // that would be generated by the method wrap. Since myNetData is set to session's packet + // size we should not get to this point because SSLEngine is supposed to produce messages + // smaller or equal to that, but a general handling would be the following: + myNetData = enlargeBuffer(myNetData, netBufferSize); + break; + case BUFFER_UNDERFLOW: + throw new SSLException("Buffer underflow occurred after a wrap. We should not reach here."); + case CLOSED: + try { + myNetData.flip(); + while (myNetData.hasRemaining()) { + socketChannel.write(myNetData); } - run.run(); + // At this point the handshake status will probably be NEED_UNWRAP + // so we make sure that peerNetData is clear to read. + peerNetData.clear(); + } catch (Exception e) { + s_logger.error("Failed to send server's CLOSE message due to socket channel's failure."); } - } else if (hsStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { - throw new IOException("NOT a handshaking!"); + break; + default: + throw new IllegalStateException("Invalid SSL status: " + result.getStatus()); + } + return true; + } + + public static boolean doHandshake(final SocketChannel socketChannel, final SSLEngine sslEngine, final boolean isClient) throws IOException { + if (socketChannel == null || sslEngine == null) { + return false; + } + final int appBufferSize = sslEngine.getSession().getApplicationBufferSize(); + final int netBufferSize = sslEngine.getSession().getPacketBufferSize(); + ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize); + ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize); + ByteBuffer myNetData = ByteBuffer.allocate(netBufferSize); + ByteBuffer peerNetData = ByteBuffer.allocate(netBufferSize); + + final long startTimeMills = System.currentTimeMillis(); + + HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); + while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED + && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) { + final long timeTaken = System.currentTimeMillis() - startTimeMills; + if (timeTaken > 15000L) { + s_logger.warn("SSL Handshake has taken more than 15s to connect to: " + socketChannel.getRemoteAddress() + + ". Please investigate this connection."); + return false; } - if (engResult != null && engResult.getStatus() != SSLEngineResult.Status.OK) { - throw new IOException("Fail to handshake! " + engResult.getStatus()); + switch (handshakeStatus) { + case NEED_UNWRAP: + if (!doHandshakeUnwrap(socketChannel, sslEngine, peerAppData, peerNetData, appBufferSize)) { + return false; + } + break; + case NEED_WRAP: + if (!doHandshakeWrap(socketChannel, sslEngine, myAppData, myNetData, peerNetData, netBufferSize)) { + return false; + } + break; + case NEED_TASK: + Runnable task; + while ((task = sslEngine.getDelegatedTask()) != null) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("SSL: Running delegated task!"); + } + task.run(); + } + break; + case FINISHED: + break; + case NOT_HANDSHAKING: + break; + default: + throw new IllegalStateException("Invalid SSL status: " + handshakeStatus); } - if (engResult != null) - hsStatus = engResult.getHandshakeStatus(); - else - hsStatus = sslEngine.getHandshakeStatus(); + handshakeStatus = sslEngine.getHandshakeStatus(); } + return true; } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioClient.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioClient.java b/utils/src/main/java/com/cloud/utils/nio/NioClient.java index d989f30..dc4f670 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioClient.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioClient.java @@ -36,7 +36,6 @@ public class NioClient extends NioConnection { private static final Logger s_logger = Logger.getLogger(NioClient.class); protected String _host; - protected String _bindAddress; protected SocketChannel _clientConnection; public NioClient(final String name, final String host, final int port, final int workers, final HandlerFactory factory) { @@ -44,10 +43,6 @@ public class NioClient extends NioConnection { _host = host; } - public void setBindAddress(final String ipAddress) { - _bindAddress = ipAddress; - } - @Override protected void init() throws IOException { _selector = Selector.open(); @@ -55,33 +50,25 @@ public class NioClient extends NioConnection { try { _clientConnection = SocketChannel.open(); - _clientConnection.configureBlocking(true); - s_logger.info("Connecting to " + _host + ":" + _port); - - if (_bindAddress != null) { - s_logger.info("Binding outbound interface at " + _bindAddress); - - final InetSocketAddress bindAddr = new InetSocketAddress(_bindAddress, 0); - _clientConnection.socket().bind(bindAddr); - } + s_logger.info("Connecting to " + _host + ":" + _port); final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); _clientConnection.connect(peerAddr); - - SSLEngine sslEngine = null; - // Begin SSL handshake in BLOCKING mode - _clientConnection.configureBlocking(true); + _clientConnection.configureBlocking(false); final SSLContext sslContext = Link.initSSLContext(true); - sslEngine = sslContext.createSSLEngine(_host, _port); + SSLEngine sslEngine = sslContext.createSSLEngine(_host, _port); sslEngine.setUseClientMode(true); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); - - Link.doHandshake(_clientConnection, sslEngine, true); + sslEngine.beginHandshake(); + if (!Link.doHandshake(_clientConnection, sslEngine, true)) { + s_logger.error("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); + _selector.close(); + throw new IOException("SSL Handshake failed while connecting to host: " + _host + " port: " + _port); + } s_logger.info("SSL: Handshake done"); s_logger.info("Connected to " + _host + ":" + _port); - _clientConnection.configureBlocking(false); final Link link = new Link(peerAddr, this); link.setSSLEngine(sslEngine); final SelectionKey key = _clientConnection.register(_selector, SelectionKey.OP_READ); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioConnection.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 249f512..9d755d6 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -19,8 +19,13 @@ package com.cloud.utils.nio; -import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable; +import com.cloud.utils.concurrency.NamedThreadFactory; +import com.cloud.utils.exception.NioConnectionException; +import org.apache.cloudstack.utils.security.SSLUtils; +import org.apache.log4j.Logger; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -44,14 +49,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.apache.cloudstack.utils.security.SSLUtils; -import org.apache.log4j.Logger; - -import com.cloud.utils.concurrency.NamedThreadFactory; -import com.cloud.utils.exception.NioConnectionException; +import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable; /** * NioConnection abstracts the NIO socket operations. The Java implementation @@ -71,6 +69,7 @@ public abstract class NioConnection implements Callable { protected HandlerFactory _factory; protected String _name; protected ExecutorService _executor; + protected ExecutorService _sslHandshakeExecutor; public NioConnection(final String name, final int port, final int workers, final HandlerFactory factory) { _name = name; @@ -79,6 +78,7 @@ public abstract class NioConnection implements Callable { _port = port; _factory = factory; _executor = new ThreadPoolExecutor(workers, 5 * workers, 1, TimeUnit.DAYS, new LinkedBlockingQueue(), new NamedThreadFactory(name + "-Handler")); + _sslHandshakeExecutor = Executors.newCachedThreadPool(new NamedThreadFactory(name + "-SSLHandshakeHandler")); } public void start() throws NioConnectionException { @@ -125,7 +125,7 @@ public abstract class NioConnection implements Callable { public Boolean call() throws NioConnectionException { while (_isRunning) { try { - _selector.select(); + _selector.select(1000); // Someone is ready for I/O, get the ready keys final Set readyKeys = _selector.selectedKeys(); @@ -185,8 +185,9 @@ public abstract class NioConnection implements Callable { protected void accept(final SelectionKey key) throws IOException { final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel(); - final SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + final Socket socket = socketChannel.socket(); socket.setKeepAlive(true); @@ -194,43 +195,52 @@ public abstract class NioConnection implements Callable { s_logger.trace("Connection accepted for " + socket); } - // Begin SSL handshake in BLOCKING mode - socketChannel.configureBlocking(true); - - SSLEngine sslEngine = null; + final SSLEngine sslEngine; try { final SSLContext sslContext = Link.initSSLContext(false); sslEngine = sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); sslEngine.setNeedClientAuth(false); sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); - - Link.doHandshake(socketChannel, sslEngine, false); - + final NioConnection nioConnection = this; + _sslHandshakeExecutor.submit(new Runnable() { + @Override + public void run() { + _selector.wakeup(); + try { + sslEngine.beginHandshake(); + if (!Link.doHandshake(socketChannel, sslEngine, false)) { + throw new IOException("SSL handshake timed out with " + socketChannel.getRemoteAddress()); + } + if (s_logger.isTraceEnabled()) { + s_logger.trace("SSL: Handshake done"); + } + final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); + final Link link = new Link(saddr, nioConnection); + link.setSSLEngine(sslEngine); + link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); + final Task task = _factory.create(Task.Type.CONNECT, link, null); + registerLink(saddr, link); + _executor.submit(task); + } catch (IOException e) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Connection closed due to failure: " + e.getMessage()); + } + closeAutoCloseable(socket, "accepting socket"); + closeAutoCloseable(socketChannel, "accepting socketChannel"); + } finally { + _selector.wakeup(); + } + } + }); } catch (final Exception e) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Socket " + socket + " closed on read. Probably -1 returned: " + e.getMessage()); + s_logger.trace("Connection closed due to failure: " + e.getMessage()); } + closeAutoCloseable(socket, "accepting socket"); closeAutoCloseable(socketChannel, "accepting socketChannel"); - closeAutoCloseable(socket, "opened socket"); - return; - } - - if (s_logger.isTraceEnabled()) { - s_logger.trace("SSL: Handshake done"); - } - socketChannel.configureBlocking(false); - final InetSocketAddress saddr = (InetSocketAddress)socket.getRemoteSocketAddress(); - final Link link = new Link(saddr, this); - link.setSSLEngine(sslEngine); - link.setKey(socketChannel.register(key.selector(), SelectionKey.OP_READ, link)); - final Task task = _factory.create(Task.Type.CONNECT, link, null); - registerLink(saddr, link); - - try { - _executor.submit(task); - } catch (final Exception e) { - s_logger.warn("Exception occurred when submitting the task", e); + } finally { + _selector.wakeup(); } } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/main/java/com/cloud/utils/nio/NioServer.java ---------------------------------------------------------------------- diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index 539c2bb..b655f18 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -43,6 +43,10 @@ public class NioServer extends NioConnection { _links = new WeakHashMap(1024); } + public int getPort() { + return _serverSocket.socket().getLocalPort(); + } + @Override protected void init() throws IOException { _selector = SelectorProvider.provider().openSelector(); @@ -55,7 +59,7 @@ public class NioServer extends NioConnection { _serverSocket.register(_selector, SelectionKey.OP_ACCEPT, null); - s_logger.info("NioConnection started and listening on " + _localAddr.toString()); + s_logger.info("NioConnection started and listening on " + _serverSocket.socket().getLocalSocketAddress()); } @Override http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9c751869/utils/src/test/java/com/cloud/utils/testcase/NioTest.java ---------------------------------------------------------------------- diff --git a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java index d8510cf..894aa1a 100644 --- a/utils/src/test/java/com/cloud/utils/testcase/NioTest.java +++ b/utils/src/test/java/com/cloud/utils/testcase/NioTest.java @@ -19,14 +19,7 @@ package com.cloud.utils.testcase; -import java.nio.channels.ClosedChannelException; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.log4j.Logger; -import org.junit.Assert; - +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.NioConnectionException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; @@ -34,131 +27,199 @@ import com.cloud.utils.nio.NioClient; import com.cloud.utils.nio.NioServer; import com.cloud.utils.nio.Task; import com.cloud.utils.nio.Task.Type; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** - * - * - * - * + * NioTest demonstrates that NioServer can function without getting its main IO + * loop blocked when an aggressive or malicious client connects to the server but + * fail to participate in SSL handshake. In this test, we run bunch of clients + * that send a known payload to the server, to which multiple malicious clients + * also try to connect and hang. + * A malicious client could cause denial-of-service if the server's main IO loop + * along with SSL handshake was blocking. A passing tests shows that NioServer + * can still function in case of connection load and that the main IO loop along + * with SSL handshake is non-blocking with some internal timeout mechanism. */ -public class NioTest extends TestCase { +public class NioTest { + + private static final Logger LOGGER = Logger.getLogger(NioTest.class); - private static final Logger s_logger = Logger.getLogger(NioTest.class); + // Test should fail in due time instead of looping forever + private static final int TESTTIMEOUT = 60000; - private NioServer _server; - private NioClient _client; + final private int totalTestCount = 2; + private int completedTestCount = 0; - private Link _clientLink; + private NioServer server; + private List clients = new ArrayList<>(); + private List maliciousClients = new ArrayList<>(); - private int _testCount; - private int _completedCount; + private ExecutorService clientExecutor = Executors.newFixedThreadPool(totalTestCount, new NamedThreadFactory("NioClientHandler"));; + private ExecutorService maliciousExecutor = Executors.newFixedThreadPool(totalTestCount, new NamedThreadFactory("MaliciousNioClientHandler"));; + + private Random randomGenerator = new Random(); + private byte[] testBytes; private boolean isTestsDone() { boolean result; synchronized (this) { - result = _testCount == _completedCount; + result = totalTestCount == completedTestCount; } return result; } - private void getOneMoreTest() { - synchronized (this) { - _testCount++; - } - } - private void oneMoreTestDone() { synchronized (this) { - _completedCount++; + completedTestCount++; } } - @Override + @Before public void setUp() { - s_logger.info("Test"); + LOGGER.info("Setting up Benchmark Test"); - _testCount = 0; - _completedCount = 0; + completedTestCount = 0; + testBytes = new byte[1000000]; + randomGenerator.nextBytes(testBytes); - _server = new NioServer("NioTestServer", 7777, 5, new NioTestServer()); + server = new NioServer("NioTestServer", 0, 1, new NioTestServer()); try { - _server.start(); + server.start(); } catch (final NioConnectionException e) { - fail(e.getMessage()); + Assert.fail(e.getMessage()); } - _client = new NioClient("NioTestServer", "127.0.0.1", 7777, 5, new NioTestClient()); - try { - _client.start(); - } catch (final NioConnectionException e) { - fail(e.getMessage()); - } + for (int i = 0; i < totalTestCount; i++) { + final NioClient maliciousClient = new NioMaliciousClient("NioMaliciousTestClient-" + i, "127.0.0.1", server.getPort(), 1, new NioMaliciousTestClient()); + maliciousClients.add(maliciousClient); + maliciousExecutor.submit(new ThreadedNioClient(maliciousClient)); - while (_clientLink == null) { - try { - s_logger.debug("Link is not up! Waiting ..."); - Thread.sleep(1000); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + final NioClient client = new NioClient("NioTestClient-" + i, "127.0.0.1", server.getPort(), 1, new NioTestClient()); + clients.add(client); + clientExecutor.submit(new ThreadedNioClient(client)); } } - @Override + @After public void tearDown() { + stopClient(); + stopServer(); + } + + protected void stopClient() { + for (NioClient client : clients) { + client.stop(); + } + for (NioClient maliciousClient : maliciousClients) { + maliciousClient.stop(); + } + LOGGER.info("Clients stopped."); + } + + protected void stopServer() { + server.stop(); + LOGGER.info("Server stopped."); + } + + @Test(timeout=TESTTIMEOUT) + public void testConnection() { while (!isTestsDone()) { try { - s_logger.debug(_completedCount + "/" + _testCount + " tests done. Waiting for completion"); + LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done. Waiting for completion"); Thread.sleep(1000); } catch (final InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + Assert.fail(e.getMessage()); } } - stopClient(); - stopServer(); + LOGGER.debug(completedTestCount + "/" + totalTestCount + " tests done."); } - protected void stopClient() { - _client.stop(); - s_logger.info("Client stopped."); + protected void doServerProcess(final byte[] data) { + oneMoreTestDone(); + Assert.assertArrayEquals(testBytes, data); + LOGGER.info("Verify data received by server done."); } - protected void stopServer() { - _server.stop(); - s_logger.info("Server stopped."); + public byte[] getTestBytes() { + return testBytes; } - protected void setClientLink(final Link link) { - _clientLink = link; + public class ThreadedNioClient implements Runnable { + final private NioClient client; + ThreadedNioClient(final NioClient client) { + this.client = client; + } + + @Override + public void run() { + try { + client.start(); + } catch (NioConnectionException e) { + Assert.fail(e.getMessage()); + } + } } - Random randomGenerator = new Random(); + public class NioMaliciousClient extends NioClient { - byte[] _testBytes; + public NioMaliciousClient(String name, String host, int port, int workers, HandlerFactory factory) { + super(name, host, port, workers, factory); + } - public void testConnection() { - _testBytes = new byte[1000000]; - randomGenerator.nextBytes(_testBytes); - try { - getOneMoreTest(); - _clientLink.send(_testBytes); - s_logger.info("Client: Data sent"); - getOneMoreTest(); - _clientLink.send(_testBytes); - s_logger.info("Client: Data sent"); - } catch (final ClosedChannelException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + @Override + protected void init() throws IOException { + _selector = Selector.open(); + try { + _clientConnection = SocketChannel.open(); + LOGGER.info("Connecting to " + _host + ":" + _port); + final InetSocketAddress peerAddr = new InetSocketAddress(_host, _port); + _clientConnection.connect(peerAddr); + // This is done on purpose, the malicious client would connect + // to the server and then do nothing, hence using a large sleep value + Thread.sleep(Long.MAX_VALUE); + } catch (final IOException e) { + _selector.close(); + throw e; + } catch (InterruptedException e) { + LOGGER.debug(e.getMessage()); + } } } - protected void doServerProcess(final byte[] data) { - oneMoreTestDone(); - Assert.assertArrayEquals(_testBytes, data); - s_logger.info("Verify done."); + public class NioMaliciousTestClient implements HandlerFactory { + + @Override + public Task create(final Type type, final Link link, final byte[] data) { + return new NioMaliciousTestClientHandler(type, link, data); + } + + public class NioMaliciousTestClientHandler extends Task { + + public NioMaliciousTestClientHandler(final Type type, final Link link, final byte[] data) { + super(type, link, data); + } + + @Override + public void doTask(final Task task) { + LOGGER.info("Malicious Client: Received task " + task.getType().toString()); + } + } } public class NioTestClient implements HandlerFactory { @@ -177,18 +238,23 @@ public class NioTest extends TestCase { @Override public void doTask(final Task task) { if (task.getType() == Task.Type.CONNECT) { - s_logger.info("Client: Received CONNECT task"); - setClientLink(task.getLink()); + LOGGER.info("Client: Received CONNECT task"); + try { + LOGGER.info("Sending data to server"); + task.getLink().send(getTestBytes()); + } catch (ClosedChannelException e) { + LOGGER.error(e.getMessage()); + e.printStackTrace(); + } } else if (task.getType() == Task.Type.DATA) { - s_logger.info("Client: Received DATA task"); + LOGGER.info("Client: Received DATA task"); } else if (task.getType() == Task.Type.DISCONNECT) { - s_logger.info("Client: Received DISCONNECT task"); + LOGGER.info("Client: Received DISCONNECT task"); stopClient(); } else if (task.getType() == Task.Type.OTHER) { - s_logger.info("Client: Received OTHER task"); + LOGGER.info("Client: Received OTHER task"); } } - } } @@ -208,18 +274,17 @@ public class NioTest extends TestCase { @Override public void doTask(final Task task) { if (task.getType() == Task.Type.CONNECT) { - s_logger.info("Server: Received CONNECT task"); + LOGGER.info("Server: Received CONNECT task"); } else if (task.getType() == Task.Type.DATA) { - s_logger.info("Server: Received DATA task"); + LOGGER.info("Server: Received DATA task"); doServerProcess(task.getData()); } else if (task.getType() == Task.Type.DISCONNECT) { - s_logger.info("Server: Received DISCONNECT task"); + LOGGER.info("Server: Received DISCONNECT task"); stopServer(); } else if (task.getType() == Task.Type.OTHER) { - s_logger.info("Server: Received OTHER task"); + LOGGER.info("Server: Received OTHER task"); } } - } } }