ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [1/2] incubator-ignite git commit: IGNITE-323 Added ssl for TcpCommunication.
Date Mon, 20 Jul 2015 12:26:21 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-323 [created] c84784525


IGNITE-323 Added ssl for TcpCommunication.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/730b1046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/730b1046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/730b1046

Branch: refs/heads/ignite-323
Commit: 730b1046e32651d953587fb1f5851c299baa4902
Parents: 13e55b2
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Jul 20 13:56:47 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Jul 20 15:20:09 2015 +0300

----------------------------------------------------------------------
 .../configuration/IgniteConfiguration.java      |  27 ++
 .../ignite/internal/util/lang/GridFunc.java     |  32 ++
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/ssl/BlockingSslHandler.java        | 473 +++++++++++++++++++
 .../internal/util/nio/ssl/GridNioSslFilter.java |   2 +
 .../util/nio/ssl/GridNioSslHandler.java         |  12 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 166 ++++++-
 .../GridAbstractCommunicationSelfTest.java      |  16 +
 .../tcp/GridTcpCommunicationSpiSslSelfTest.java |  38 ++
 .../ignite/testframework/junits/IgniteMock.java |  13 +
 10 files changed, 761 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2d36c7a..73db5fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.client.ssl.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -400,6 +401,9 @@ public class IgniteConfiguration {
     /** Cache store session listeners. */
     private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
 
+    /** SSL connection factory. */
+    private GridSslContextFactory sslCtxFactory;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -480,6 +484,7 @@ public class IgniteConfiguration {
         segResolvers = cfg.getSegmentationResolvers();
         sndRetryCnt = cfg.getNetworkSendRetryCount();
         sndRetryDelay = cfg.getNetworkSendRetryDelay();
+        sslCtxFactory = cfg.getSslContextFactory();
         storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
         svcCfgs = cfg.getServiceConfiguration();
         sysPoolSize = cfg.getSystemThreadPoolSize();
@@ -1280,6 +1285,28 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Sets SSL context factory that will be used for creating a secure socket  layer.
+     *
+     * @param sslCtxFactory Ssl context factory.
+     * @see GridSslContextFactory
+     */
+    public IgniteConfiguration setSslContextFactory(GridSslContextFactory sslCtxFactory)
{
+        this.sslCtxFactory = sslCtxFactory;
+
+        return this;
+    }
+
+    /**
+     * Returns SSL context factory that will be used for creating a secure socket layer.
+     *
+     * @return SSL connection factory.
+     * @see GridSslContextFactory
+     */
+    public GridSslContextFactory getSslContextFactory() {
+        return sslCtxFactory;
+    }
+
+    /**
      * Returns a collection of life-cycle beans. These beans will be automatically
      * notified of grid life-cycle events. Use life-cycle beans whenever you
      * want to perform certain logic before and after grid startup and stopping

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..a202e9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3965,6 +3965,38 @@ public class GridFunc {
     }
 
     /**
+     * Creates map with given values.
+     *
+     * @param k1 Key 1.
+     * @param v1 Value 1.
+     * @param k2 Key 2.
+     * @param v2 Value 2.
+     * @param k3 Key 3.
+     * @param v3 Value 3.
+     * @param k4 Key 4.
+     * @param v4 Value 4.
+     * @param k5 Key 5.
+     * @param v5 Value 5.
+     * @param k6 Key 6.
+     * @param v6 Value 6.
+     * @param <K> Key's type.
+     * @param <V> Value's type.
+     * @return Created map.
+     */
+    public static <K, V> Map<K, V> asMap(K k1, V v1, K k2, V v2, K k3, V v3,
K k4, V v4, K k5, V v5, K k6, V v6) {
+        Map<K, V> map = new GridLeanMap<>(5);
+
+        map.put(k1, v1);
+        map.put(k2, v2);
+        map.put(k3, v3);
+        map.put(k4, v4);
+        map.put(k5, v5);
+        map.put(k6, v6);
+
+        return map;
+    }
+
+    /**
      * Convenience method to convert multiple elements into array.
      *
      * @param t Elements to convert into array.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index d7eb2f3..a4435f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -42,7 +42,10 @@ public enum GridNioSessionMetaKey {
     MARSHALLER_ID,
 
     /** Message writer. */
-    MSG_WRITER;
+    MSG_WRITER,
+
+    /** Ssl engine. */
+    SSL_ENGINE;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
new file mode 100644
index 0000000..fd4dc43
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio.ssl;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.net.ssl.*;
+import javax.net.ssl.SSLEngineResult.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.*;
+
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
+import static javax.net.ssl.SSLEngineResult.Status.*;
+import static org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.*;
+
+/**
+ *
+ */
+public class BlockingSslHandler {
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** */
+    private SocketChannel ch;
+
+    /** */
+    private GridFutureAdapter<ByteBuffer> fut;
+
+    /** SSL engine. */
+    private SSLEngine sslEngine;
+
+    /** Handshake completion flag. */
+    private boolean handshakeFinished;
+
+    /** Engine handshake status. */
+    private HandshakeStatus handshakeStatus;
+
+    /** Output buffer into which encrypted data will be written. */
+    private ByteBuffer outNetBuf;
+
+    /** Input buffer from which SSL engine will decrypt data. */
+    private ByteBuffer inNetBuf;
+
+    /** Empty buffer used in handshake procedure.  */
+    private ByteBuffer handshakeBuf = ByteBuffer.allocate(0);
+
+    /** Application buffer. */
+    private ByteBuffer appBuf;
+
+    /**
+     * @param sslEngine SSLEngine.
+     * @param ch Socket channel.
+     * @param fut Future.
+     * @param log Logger.
+     */
+    public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer>
fut,
+        IgniteLogger log) throws SSLException {
+        this.ch = ch;
+        this.fut = fut;
+        this.log = log;
+
+        this.sslEngine = sslEngine;
+
+        // Allocate a little bit more so SSL engine would not return buffer overflow status.
+        int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
+
+        outNetBuf = ByteBuffer.allocate(netBufSize);
+        inNetBuf = ByteBuffer.allocate(netBufSize);
+
+        // Initially buffer is empty.
+        outNetBuf.position(0);
+        outNetBuf.limit(0);
+
+        appBuf = allocateAppBuff();
+
+        handshakeStatus = sslEngine.getHandshakeStatus();
+
+        sslEngine.setUseClientMode(true);
+
+        if (log.isDebugEnabled())
+            log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" +
appBuf.capacity() + ']');
+    }
+
+    /**
+     * Performs handshake procedure with remote peer.
+     *
+     * @throws GridNioException If filter processing has thrown an exception.
+     * @throws SSLException If failed to process SSL data.
+     */
+    public boolean handshake() throws IgniteCheckedException, SSLException {
+        if (log.isDebugEnabled())
+            log.debug("Entered handshake(): [handshakeStatus=" + handshakeStatus + ']');
+
+        sslEngine.beginHandshake();
+
+        handshakeStatus = sslEngine.getHandshakeStatus();
+
+        boolean loop = true;
+
+        while (loop) {
+            switch (handshakeStatus) {
+                case NOT_HANDSHAKING:
+                case FINISHED: {
+                    handshakeFinished = true;
+
+                    if (fut != null) {
+                        appBuf.flip();
+
+                        fut.onDone(appBuf);
+                    }
+
+                    loop = false;
+
+                    break;
+                }
+
+                case NEED_TASK: {
+                    handshakeStatus = runTasks();
+
+                    break;
+                }
+
+                case NEED_UNWRAP: {
+                    Status status = unwrapHandshake();
+
+                    handshakeStatus = sslEngine.getHandshakeStatus();
+
+                    if (status == BUFFER_UNDERFLOW && sslEngine.isInboundDone())
+                        // Either there is no enough data in buffer or session was closed.
+                        loop = false;
+
+                    break;
+                }
+
+                case NEED_WRAP: {
+                    // If the output buffer has remaining data, clear it.
+                    if (outNetBuf.hasRemaining())
+                        U.warn(log, "Output net buffer has unsent bytes during handshake
(will clear). ");
+
+                    outNetBuf.clear();
+
+                    SSLEngineResult res = sslEngine.wrap(handshakeBuf, outNetBuf);
+
+                    outNetBuf.flip();
+
+                    handshakeStatus = res.getHandshakeStatus();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Wrapped handshake data [status=" + res.getStatus() + ",
handshakeStatus=" +
+                        handshakeStatus + ']');
+
+                    writeNetBuffer();
+
+                    break;
+                }
+
+                default: {
+                    throw new IllegalStateException("Invalid handshake status in handshake
method [handshakeStatus=" +
+                        handshakeStatus + ']');
+                }
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Leaved handshake(): [handshakeStatus=" + handshakeStatus + ']');
+
+        return handshakeFinished;
+    }
+
+    /**
+     * Encrypts data to be written to the network.
+     *
+     * @param src data to encrypt.
+     * @throws SSLException on errors.
+     * @return Output buffer with encrypted data.
+     */
+    public ByteBuffer encrypt(ByteBuffer src) throws SSLException {
+        assert handshakeFinished;
+
+        // The data buffer is (must be) empty, we can reuse the entire
+        // buffer.
+        outNetBuf.clear();
+
+        // Loop until there is no more data in src
+        while (src.hasRemaining()) {
+            int outNetRemaining = outNetBuf.capacity() - outNetBuf.position();
+
+            if (outNetRemaining < src.remaining() * 2) {
+                outNetBuf = expandBuffer(outNetBuf, Math.max(
+                    outNetBuf.position() + src.remaining() * 2, outNetBuf.capacity() * 2));
+
+                if (log.isDebugEnabled())
+                    log.debug("Expanded output net buffer: " + outNetBuf.capacity());
+            }
+
+            SSLEngineResult res = sslEngine.wrap(src, outNetBuf);
+
+            if (log.isDebugEnabled())
+                log.debug("Encrypted data [status=" + res.getStatus() + ", handshakeStaus="
+
+                    res.getHandshakeStatus() + ']');
+
+            if (res.getStatus() == OK) {
+                if (res.getHandshakeStatus() == NEED_TASK)
+                    runTasks();
+            }
+            else
+                throw new SSLException("Failed to encrypt data (SSL engine error) [status="
+ res.getStatus() +
+                    ", handshakeStatus=" + res.getHandshakeStatus() + ']');
+        }
+
+        outNetBuf.flip();
+
+        return outNetBuf;
+    }
+
+    /**
+     * Called by SSL filter when new message was received.
+     *
+     * @param buf Received message.
+     * @throws GridNioException If exception occurred while forwarding events to underlying
filter.
+     * @throws SSLException If failed to process SSL data.
+     */
+    public ByteBuffer decode(ByteBuffer buf) throws IgniteCheckedException, SSLException
{
+        inNetBuf.clear();
+
+        if (buf.limit() > inNetBuf.remaining()) {
+            inNetBuf = expandBuffer(inNetBuf, inNetBuf.capacity() + buf.limit() * 2);
+
+            appBuf = expandBuffer(appBuf, inNetBuf.capacity() * 2);
+
+            if (log.isDebugEnabled())
+                log.debug("Expanded buffers [inNetBufCapacity=" + inNetBuf.capacity() + ",
appBufCapacity=" +
+                    appBuf.capacity() + ']');
+        }
+
+        // append buf to inNetBuffer
+        inNetBuf.put(buf);
+
+        if (!handshakeFinished)
+            handshake();
+        else
+            unwrapData();
+
+        if (isInboundDone()) {
+            int newPosition = buf.position() - inNetBuf.position();
+
+            if (newPosition >= 0) {
+                buf.position(newPosition);
+
+                // If we received close_notify but not all bytes has been read by SSL engine,
print a warning.
+                if (buf.hasRemaining())
+                    U.warn(log, "Got unread bytes after receiving close_notify message (will
ignore).");
+            }
+
+            inNetBuf.clear();
+        }
+
+        appBuf.flip();
+
+        return appBuf;
+    }
+
+    /**
+     * @return {@code True} if inbound data stream has ended, i.e. SSL engine received
+     * <tt>close_notify</tt> message.
+     */
+    boolean isInboundDone() {
+        return sslEngine.isInboundDone();
+    }
+
+    /**
+     * Unwraps user data to the application buffer.
+     *
+     * @throws SSLException If failed to process SSL data.
+     * @throws GridNioException If failed to pass events to the next filter.
+     */
+    private void unwrapData() throws IgniteCheckedException, SSLException {
+        if (log.isDebugEnabled())
+            log.debug("Unwrapping received data.");
+
+        // Flip buffer so we can read it.
+        inNetBuf.flip();
+
+        SSLEngineResult res = unwrap0();
+
+        // prepare to be written again
+        inNetBuf.compact();
+
+        checkStatus(res);
+
+        renegotiateIfNeeded(res);
+    }
+
+    /**
+     * Runs all tasks needed to continue SSL work.
+     *
+     * @return Handshake status after running all tasks.
+     */
+    private HandshakeStatus runTasks() {
+        Runnable runnable;
+
+        while ((runnable = sslEngine.getDelegatedTask()) != null) {
+            if (log.isDebugEnabled())
+                log.debug("Running SSL engine task: " + runnable + '.');
+
+            runnable.run();
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Finished running SSL engine tasks. HandshakeStatus: " + sslEngine.getHandshakeStatus());
+
+        return sslEngine.getHandshakeStatus();
+    }
+
+
+    /**
+     * Unwraps handshake data and processes it.
+     *
+     * @return Status.
+     * @throws SSLException If SSL exception occurred while unwrapping.
+     * @throws GridNioException If failed to pass event to the next filter.
+     */
+    private Status unwrapHandshake() throws SSLException, IgniteCheckedException {
+        // Flip input buffer so we can read the collected data.
+        readFromNet();
+
+        inNetBuf.flip();
+
+        SSLEngineResult res = unwrap0();
+        handshakeStatus = res.getHandshakeStatus();
+
+        checkStatus(res);
+
+        // If handshake finished, no data was produced, and the status is still ok,
+        // try to unwrap more
+        if (handshakeStatus == FINISHED && res.getStatus() == OK && inNetBuf.hasRemaining())
{
+            res = unwrap0();
+
+            handshakeStatus = res.getHandshakeStatus();
+
+            // prepare to be written again
+            inNetBuf.compact();
+
+            renegotiateIfNeeded(res);
+        }
+        else
+            // prepare to be written again
+            inNetBuf.compact();
+
+        return res.getStatus();
+    }
+
+    /**
+     * Performs raw unwrap from network read buffer.
+     *
+     * @return Result.
+     * @throws SSLException If SSL exception occurs.
+     */
+    private SSLEngineResult unwrap0() throws SSLException {
+        SSLEngineResult res;
+
+        do {
+            res = sslEngine.unwrap(inNetBuf, appBuf);
+
+            if (log.isDebugEnabled())
+                log.debug("Unwrapped raw data [status=" + res.getStatus() + ", handshakeStatus="
+
+                    res.getHandshakeStatus() + ']');
+
+            if (res.getStatus() == Status.BUFFER_OVERFLOW)
+                appBuf = expandBuffer(appBuf, appBuf.capacity() * 2);
+        }
+        while ((res.getStatus() == OK || res.getStatus() == Status.BUFFER_OVERFLOW) &&
+            (handshakeFinished && res.getHandshakeStatus() == NOT_HANDSHAKING
+                || res.getHandshakeStatus() == NEED_UNWRAP));
+
+        return res;
+    }
+
+    /**
+     * @param res SSL engine result.
+     * @throws SSLException If status is not acceptable.
+     */
+    private void checkStatus(SSLEngineResult res)
+        throws SSLException {
+
+        Status status = res.getStatus();
+
+        if (status != OK && status != CLOSED && status != BUFFER_UNDERFLOW)
+            throw new SSLException("Failed to unwrap incoming data (SSL engine error). Status:
" + status);
+    }
+
+    /**
+     * Check status and retry the negotiation process if needed.
+     *
+     * @param res Result.
+     * @throws GridNioException If exception occurred during handshake.
+     * @throws SSLException If failed to process SSL data
+     */
+    private void renegotiateIfNeeded(SSLEngineResult res) throws IgniteCheckedException,
SSLException {
+        if (res.getStatus() != CLOSED && res.getStatus() != BUFFER_UNDERFLOW
+            && res.getHandshakeStatus() != NOT_HANDSHAKING) {
+            // Renegotiation required.
+            handshakeStatus = res.getHandshakeStatus();
+
+            if (log.isDebugEnabled())
+                log.debug("Renegotiation requested [status=" + res.getStatus() + ", handshakeStatus
= " +
+                    handshakeStatus + ']');
+
+            handshakeFinished = false;
+
+            handshake();
+        }
+    }
+
+    /**
+     * Allocate application buffer.
+     */
+    private ByteBuffer allocateAppBuff() {
+        int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;
+
+        int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50,
netBufSize * 2);
+
+        return ByteBuffer.allocate(appBufSize);
+    }
+
+    /**
+     * Read data from net buffer.
+     */
+    private void readFromNet() {
+        try {
+            inNetBuf.clear();
+
+            ch.read(inNetBuf);
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Copies data from out net buffer and passes it to the underlying chain.
+     *
+     * @return Nothing.
+     * @throws GridNioException If send failed.
+     */
+    private void writeNetBuffer() throws IgniteCheckedException {
+        try {
+            ch.write(outNetBuf);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to write byte to socket.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index be8a4e8..a05135f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -393,6 +393,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     public static ByteBuffer expandBuffer(ByteBuffer original, int cap) {
         ByteBuffer res = ByteBuffer.allocate(cap);
 
+        res.order(ByteOrder.nativeOrder());
+
         original.flip();
 
         res.put(original);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index ac22d74..dc3d870 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.*;
 import static javax.net.ssl.SSLEngineResult.*;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
 import static javax.net.ssl.SSLEngineResult.Status.*;
+import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
 import static org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter.*;
 
 /**
@@ -96,7 +97,14 @@ class GridNioSslHandler extends ReentrantLock {
 
         sslEngine = engine;
 
-        sslEngine.beginHandshake();
+        if (ses.meta(SSL_ENGINE.ordinal()) == null)
+            sslEngine.beginHandshake();
+        else {
+            sslEngine = ses.meta(SSL_ENGINE.ordinal());
+
+            handshakeFinished = true;
+            initHandshakeComplete = true;
+        }
 
         handshakeStatus = sslEngine.getHandshakeStatus();
 
@@ -114,6 +122,8 @@ class GridNioSslHandler extends ReentrantLock {
 
         appBuf = ByteBuffer.allocate(appBufSize);
 
+        appBuf.order(ByteOrder.nativeOrder());
+
         if (log.isDebugEnabled())
             log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" +
appBufSize + ']');
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4ca2995..52eeb65 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.client.ssl.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.util.ipc.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.nio.ssl.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
@@ -41,6 +43,7 @@ import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.net.ssl.*;
 import java.io.*;
 import java.net.*;
 import java.nio.*;
@@ -146,6 +149,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory
segment " +
         "(switching to TCP, may be slower).";
 
+    /** Node attribute that is set if using SSL (value is <tt>comm.tcp.ssl</tt>).
*/
+    public static final String ATTR_SSL = "comm.tcp.ssl";
+
     /** Node attribute that is mapped to node IP addresses (value is <tt>comm.tcp.addrs</tt>).
*/
     public static final String ATTR_ADDRS = "comm.tcp.addrs";
 
@@ -747,6 +753,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     };
 
     /**
+     * @return {@code True} if ssl enabled.
+     */
+    private boolean isSslEnabled() {
+        return ignite.configuration().getSslContextFactory() != null;
+    }
+
+    /**
      * Sets address resolver.
      *
      * @param addrRslvr Address resolver.
@@ -1298,7 +1311,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(),
                 createSpiAttributeName(ATTR_PORT), boundTcpPort,
                 createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort
: null,
-                createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs);
+                createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs,
+                createSpiAttributeName(ATTR_SSL), isSslEnabled());
         }
         catch (IOException | IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to resolve local host to addresses: " +
locHost, e);
@@ -1465,6 +1479,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     } :
                     null;
 
+                GridNioFilter[] filters;
+
+                if (isSslEnabled()) {
+                    GridNioSslFilter sslFilter =
+                        new GridNioSslFilter(ignite.configuration().getSslContextFactory().createSslContext(),
log);
+
+                    sslFilter.directMode(true);
+
+                    filters = new GridNioFilter[] {
+                        new GridNioCodecFilter(parser, log, true),
+                        new GridConnectionBytesVerifyFilter(log),
+                        sslFilter
+                    };
+                }
+                else
+                    filters = new GridNioFilter[] {
+                        new GridNioCodecFilter(parser, log, true),
+                        new GridConnectionBytesVerifyFilter(log)
+                    };
+
                 GridNioServer<Message> srvr =
                     GridNioServer.<Message>builder()
                         .address(locHost)
@@ -1482,8 +1516,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .directMode(true)
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
-                        .filters(new GridNioCodecFilter(parser, log, true),
-                            new GridConnectionBytesVerifyFilter(log))
+                        .filters(filters)
                         .messageFormatter(msgFormatter)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
@@ -1510,6 +1543,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 onException("Failed to bind to local port (will try next port within range)
[port=" + port +
                     ", locHost=" + locHost + ']', e);
             }
+            catch (SSLException e) {
+                throw new IgniteCheckedException("Failed to create SSL context. SSL factory:
"
+                    + ignite.configuration().getSslContextFactory() + '.', e);
+            }
         }
 
         // If free port wasn't found.
@@ -1872,7 +1909,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), connTimeout0);
+                safeHandshake(client, null, node.id(), connTimeout0, null);
             }
             catch (HandshakeTimeoutException e) {
                 if (log.isDebugEnabled())
@@ -2019,10 +2056,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     long rcvCnt = -1;
 
+                    GridTuple<SSLEngine> ssl = new GridTuple<>();
+
                     try {
                         ch.socket().connect(addr, (int)connTimeout);
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0);
+                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0,
ssl);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2037,6 +2076,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         meta.put(NODE_ID_META, node.id());
 
+                        if (isSslEnabled()) {
+                            assert ssl != null;
+                            assert ssl.get() != null;
+
+                            meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get());
+                        }
+
                         if (recoveryDesc != null) {
                             recoveryDesc.onHandshake(rcvCnt);
 
@@ -2161,6 +2207,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param recovery Recovery descriptor if use recovery handshake, otherwise {@code null}.
      * @param rmtNodeId Remote node.
      * @param timeout Timeout for handshake.
+     * @param ssl SSL engine.
      * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout.
      * @return Handshake response.
      */
@@ -2169,7 +2216,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         T client,
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
-        long timeout
+        long timeout,
+        @Nullable GridTuple<SSLEngine> ssl
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client,
U.currentTimeMillis() + timeout);
 
@@ -2186,15 +2234,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 boolean success = false;
 
                 try {
-                    ByteBuffer buf = ByteBuffer.allocate(17);
+                    BlockingSslHandler sslHnd = null;
+
+                    ByteBuffer buf;
+
+                    if (isSslEnabled()) {
+                        GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>();
+
+                        SSLEngine sslEngine = ignite.configuration().getSslContextFactory()
+                            .createSslContext().createSSLEngine();
+
+                        sslEngine.setUseClientMode(true);
+
+                        sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log);
+
+                        if (!sslHnd.handshake())
+                            throw new IgniteCheckedException("SSL handshake isn't completed.");
+
+                        ssl.set(sslEngine);
+
+                        ByteBuffer handBuff = handFut.get();
+
+                        if (handBuff.limit() < 17) {
+                            buf = ByteBuffer.allocate(1000);
+
+                            int read = ch.read(buf);
+
+                            if (read == -1)
+                                throw new IgniteCheckedException("Failed to read remote node
ID (connection closed).");
+
+                            buf.flip();
+
+                            buf = sslHnd.decode(buf);
+                        }
+                        else
+                            buf = handBuff;
+                    }
+                    else {
+                        buf = ByteBuffer.allocate(17);
 
-                    for (int i = 0; i < 17; ) {
-                        int read = ch.read(buf);
+                        for (int i = 0; i < 17; ) {
+                            int read = ch.read(buf);
 
-                        if (read == -1)
-                            throw new IgniteCheckedException("Failed to read remote node
ID (connection closed).");
+                            if (read == -1)
+                                throw new IgniteCheckedException("Failed to read remote node
ID (connection closed).");
 
-                        i += read;
+                            i += read;
+                        }
                     }
 
                     UUID rmtNodeId0 = U.bytesToUuid(buf.array(), 1);
@@ -2205,7 +2291,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
+                    if (isSslEnabled() ) {
+                        assert sslHnd != null;
+
+                        ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+                    }
+                    else
+                        ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
 
                     if (recovery != null) {
                         HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(),
@@ -2225,30 +2317,62 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         buf.flip();
 
-                        ch.write(buf);
+                        if (isSslEnabled()) {
+                            assert sslHnd != null;
+
+                            ch.write(sslHnd.encrypt(buf));
+                        }
+                        else
+                            ch.write(buf);
+                    }
+                    else {
+                        if (isSslEnabled()) {
+                            assert sslHnd != null;
+
+                            ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)));
+                        }
+                        else
+                            ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
                     }
-                    else
-                        ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType));
 
                     if (recovery != null) {
                         if (log.isDebugEnabled())
                             log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
 
-                        buf = ByteBuffer.allocate(9);
+                        if (isSslEnabled()) {
+                            assert sslHnd != null;
 
-                        buf.order(ByteOrder.nativeOrder());
+                            buf = ByteBuffer.allocate(1000);
+
+                            buf.order(ByteOrder.nativeOrder());
 
-                        for (int i = 0; i < 9; ) {
                             int read = ch.read(buf);
 
                             if (read == -1)
                                 throw new IgniteCheckedException("Failed to read remote node
recovery handshake " +
                                     "(connection closed).");
 
-                            i += read;
+                            buf.flip();
+
+                            rcvCnt = sslHnd.decode(buf).getLong(1);
                         }
+                        else {
+                            buf = ByteBuffer.allocate(9);
+
+                            buf.order(ByteOrder.nativeOrder());
+
+                            for (int i = 0; i < 9; ) {
+                                int read = ch.read(buf);
+
+                                if (read == -1)
+                                    throw new IgniteCheckedException("Failed to read remote
node recovery handshake " +
+                                        "(connection closed).");
 
-                        rcvCnt = buf.getLong(1);
+                                i += read;
+                            }
+
+                            rcvCnt = buf.getLong(1);
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Received handshake message [rmtNode=" + rmtNodeId
+ ", rcvCnt=" + rcvCnt + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index bfed977..d3a2521 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.communication;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -32,6 +33,7 @@ import org.apache.ignite.testframework.junits.spi.*;
 import java.net.*;
 import java.util.*;
 import java.util.Map.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 
@@ -59,6 +61,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     /** */
     private static final Object mux = new Object();
 
+    /** */
+    protected boolean useSsl = false;
+
     /**
      *
      */
@@ -181,6 +186,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     public void testSendToManyNodes() throws Exception {
         msgDestMap.clear();
 
+        int cnt = 0;
+
         // Send message from each SPI to all SPI's, including itself.
         for (Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet())
{
             UUID sndId = entry.getKey();
@@ -299,6 +306,15 @@ public abstract class GridAbstractCommunicationSelfTest<T extends
CommunicationS
 
             rsrcs.inject(spi);
 
+            if (useSsl) {
+                IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class,
"ignite");
+
+                IgniteConfiguration cfg = ignite.configuration()
+                    .setSslContextFactory(GridTestUtils.sslContextFactory());
+
+                ignite.setStaticCfg(cfg);
+            }
+
             spi.setListener(new MessageListener(rsrcs.getNodeId()));
 
             node.setAttributes(spi.getNodeAttributes());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
new file mode 100644
index 0000000..e5f8bb3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiSslSelfTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.testframework.junits.spi.*;
+
+/**
+ *
+ */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
+public class GridTcpCommunicationSpiSslSelfTest extends GridTcpCommunicationSpiAbstractTest
{
+    /** */
+    public GridTcpCommunicationSpiSslSelfTest() {
+        super(false);
+
+        this.useSsl = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean tcpNoDelay() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/730b1046/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 2451f59..1471faa 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -52,6 +52,9 @@ public class IgniteMock implements Ignite {
     /** */
     private final String home;
 
+    /** */
+    private IgniteConfiguration staticCfg;
+
     /**
      * Mock values
      *
@@ -84,6 +87,9 @@ public class IgniteMock implements Ignite {
 
     /** {@inheritDoc} */
     @Override public IgniteConfiguration configuration() {
+        if (staticCfg != null)
+            return staticCfg;
+
         IgniteConfiguration cfg = new IgniteConfiguration();
 
         cfg.setMarshaller(marshaller);
@@ -298,4 +304,11 @@ public class IgniteMock implements Ignite {
     @Override public <K> Affinity<K> affinity(String cacheName) {
         return null;
     }
+
+    /**
+     * @param staticCfg Configuration.
+     */
+    public void setStaticCfg(IgniteConfiguration staticCfg) {
+        this.staticCfg = staticCfg;
+    }
 }



Mime
View raw message