Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 212532004A1 for ; Thu, 24 Aug 2017 14:17:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 15AD716ABAE; Thu, 24 Aug 2017 12:17:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1858016ABA3 for ; Thu, 24 Aug 2017 14:17:17 +0200 (CEST) Received: (qmail 58830 invoked by uid 500); 24 Aug 2017 12:17:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 58821 invoked by uid 99); 24 Aug 2017 12:17:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Aug 2017 12:17:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5926AE0395; Thu, 24 Aug 2017 12:17:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-4003 Changes in GridNioServer for async connect Date: Thu, 24 Aug 2017 12:17:14 +0000 (UTC) archived-at: Thu, 24 Aug 2017 12:17:20 -0000 Repository: ignite Updated Branches: refs/heads/ignite-4003-1 [created] eaf16c99e ignite-4003 Changes in GridNioServer for async connect Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eaf16c99 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eaf16c99 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eaf16c99 Branch: refs/heads/ignite-4003-1 Commit: eaf16c99ef88635e2b950ef5964d9a0bd1299dd1 Parents: e1bf8d7 Author: sboikov Authored: Thu Aug 24 15:17:04 2017 +0300 Committer: sboikov Committed: Thu Aug 24 15:17:04 2017 +0300 ---------------------------------------------------------------------- .../GridClientConnectionManagerAdapter.java | 1 - .../connection/GridClientNioTcpConnection.java | 2 +- .../internal/util/GridSpinReadWriteLock.java | 2 +- .../internal/util/nio/GridNioKeyAttachment.java | 32 +++ .../util/nio/GridNioRecoveryDescriptor.java | 3 +- .../ignite/internal/util/nio/GridNioServer.java | 248 +++++++++++++++---- .../util/nio/GridSelectorNioSessionImpl.java | 28 +-- .../internal/util/nio/ssl/GridNioSslFilter.java | 12 +- .../communication/tcp/TcpCommunicationSpi.java | 4 +- .../IgniteCacheMessageWriteTimeoutTest.java | 4 +- .../internal/util/nio/GridNioSelfTest.java | 2 +- .../spi/GridTcpSpiForwardingSelfTest.java | 1 + .../GridAbstractCommunicationSelfTest.java | 27 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 28 ++- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 39 ++- ...GridTcpCommunicationSpiRecoverySelfTest.java | 47 +++- ...CommunicationRecoveryAckClosureSelfTest.java | 36 ++- .../tcp/TcpCommunicationSpiDropNodesTest.java | 3 +- .../HadoopExternalCommunication.java | 5 +- 19 files changed, 427 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index e325897..aa06322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog); sslFilter.directMode(false); - sslFilter.clientMode(true); filters = new GridNioFilter[]{codecFilter, sslFilter}; } http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index 18ce6e4..f72a009 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -235,7 +235,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); } - ses = (GridNioSession)srv.createSession(ch, meta).get(); + ses = (GridNioSession)srv.createSession(ch, meta, false, null).get(); if (sslHandshakeFut != null) sslHandshakeFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java index 4f23979..8fef887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinReadWriteLock.java @@ -70,7 +70,7 @@ public class GridSpinReadWriteLock { private int writeLockEntryCnt; /** - * Acquires write lock. + * Acquires read lock. */ @SuppressWarnings("BusyWait") public void readLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java new file mode 100644 index 0000000..c3f6bb2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioKeyAttachment.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.jetbrains.annotations.Nullable; + +/** + * + */ +interface GridNioKeyAttachment { + boolean hasSession(); + + /** + * @return Session if it was created. + */ + @Nullable GridSelectorNioSessionImpl session(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 7496728..af7b757 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -31,7 +31,6 @@ import org.jetbrains.annotations.Nullable; /** * Recovery information for single node. */ -@Deprecated // To be splitted into separate classes for in/out data when do not need maintain backward compatibility. public class GridNioRecoveryDescriptor { /** Number of acknowledged messages. */ private long acked; @@ -260,7 +259,7 @@ public class GridNioRecoveryDescriptor { /** * @param node Node. - * @return {@code True} if node is not null and has the same order as initial remtoe node. + * @return {@code True} if node is not null and has the same order as initial remote node. */ public boolean nodeAlive(@Nullable ClusterNode node) { return node != null && node.order() == this.node.order(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index a04a735..0dd7dd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -113,6 +113,12 @@ public class GridNioServer { /** SSL write buf limit. */ private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); + /** Session future meta key. */ + public static final int RECOVERY_DESC_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Selection key meta key. */ + private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** */ private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); @@ -465,7 +471,7 @@ public class GridNioServer { * @return Future for operation. */ public GridNioFuture close(GridNioSession ses) { - assert ses instanceof GridSelectorNioSessionImpl; + assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -811,17 +817,32 @@ public class GridNioServer { * * @param ch Channel to register within the server and create session for. * @param meta Optional meta for new session. + * @param async Async connection. + * @param lsnr Listener that should be invoked in NIO thread. * @return Future to get session. */ - public GridNioFuture createSession(final SocketChannel ch, - @Nullable Map meta) { + public GridNioFuture createSession( + final SocketChannel ch, + @Nullable Map meta, + boolean async, + @Nullable IgniteInClosure> lsnr + ) { try { if (!closed) { ch.configureBlocking(false); NioOperationFuture req = new NioOperationFuture<>(ch, false, meta); - offerBalanced(req); + if (async) { + assert meta != null; + + req.op = NioOperation.CONNECT; + } + + if (lsnr != null) + req.listen(lsnr); + + offerBalanced(req, meta); return req; } @@ -835,6 +856,29 @@ public class GridNioServer { } /** + * @param ch Channel. + * @param meta Session meta. + */ + public GridNioFuture cancelConnect(final SocketChannel ch, Map meta) { + if (!closed) { + NioOperationFuture req = new NioOperationFuture<>(ch, false, meta); + + req.op = NioOperation.CANCEL_CONNECT; + + Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY); + + assert idx != null : meta; + + clientWorkers.get(idx).offer(req); + + return req; + } + else + return new GridNioFinishedFuture<>( + new IgniteCheckedException("Failed to cancel connection, server is stopped.")); + } + + /** * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}. * * @return Write timeout in milliseconds. @@ -920,9 +964,10 @@ public class GridNioServer { /** * @param req Request to balance. + * @param meta Session metadata. */ - private synchronized void offerBalanced(NioOperationFuture req) { - assert req.operation() == NioOperation.REGISTER : req; + private synchronized void offerBalanced(NioOperationFuture req, @Nullable Map meta) { + assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT : req; assert req.socketChannel() != null : req; int workers = clientWorkers.size(); @@ -960,6 +1005,9 @@ public class GridNioServer { else balanceIdx = 0; + if (meta != null) + meta.put(WORKER_IDX_META_KEY, balanceIdx); + clientWorkers.get(balanceIdx).offer(req); } @@ -1788,6 +1836,38 @@ public class GridNioServer { while ((req0 = changeReqs.poll()) != null) { switch (req0.operation()) { + case CONNECT: { + NioOperationFuture fut = (NioOperationFuture)req0; + + SocketChannel ch = fut.socketChannel(); + + try { + ch.register(selector, SelectionKey.OP_CONNECT, fut); + } + catch (IOException e) { + fut.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); + } + + break; + } + + case CANCEL_CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + SelectionKey key = ch.keyFor(selector); + + if (key != null) + key.cancel(); + + U.closeQuiet(ch); + + req.onDone(); + + break; + } + case REGISTER: { register((NioOperationFuture)req0); @@ -1998,8 +2078,12 @@ public class GridNioServer { log.debug("Closing all connected client sockets."); // Close all channels registered with selector. - for (SelectionKey key : selector.keys()) - close((GridSelectorNioSessionImpl)key.attachment(), null); + for (SelectionKey key : selector.keys()) { + GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); + + if (attach != null && attach.hasSession()) + close(attach.session(), null); + } if (log.isDebugEnabled()) log.debug("Closing NIO selector."); @@ -2173,11 +2257,17 @@ public class GridNioServer { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); - assert ses != null; + assert attach != null; try { + if (!attach.hasSession() && key.isConnectable()) { + processConnect(key); + + continue; + } + if (key.isReadable()) processRead(key); @@ -2196,7 +2286,12 @@ public class GridNioServer { // No-op. } - U.warn(log, "Failed to process selector key (will close): " + ses, e); + GridSelectorNioSessionImpl ses = attach.session(); + + if (!closed) + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); + else if (log.isDebugEnabled()) + log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']'); close(ses, new GridNioException(e)); } @@ -2225,11 +2320,17 @@ public class GridNioServer { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); - assert ses != null; + assert attach != null; try { + if (!attach.hasSession() && key.isConnectable()) { + processConnect(key); + + continue; + } + if (key.isReadable()) processRead(key); @@ -2248,10 +2349,12 @@ public class GridNioServer { // No-op. } - if (!closed) - U.warn(log, "Failed to process selector key (will close): " + ses, e); + GridSelectorNioSessionImpl ses = attach.session(); - close(ses, new GridNioException(e)); + if (!closed) + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); + else if (log.isDebugEnabled()) + log.debug("Failed to process selector key [ses=" + ses + ", err=" + e + ']'); } } } @@ -2265,7 +2368,12 @@ public class GridNioServer { long now = U.currentTimeMillis(); for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); + + if (attach == null || !attach.hasSession()) + continue; + + GridSelectorNioSessionImpl ses = attach.session(); try { long writeTimeout0 = writeTimeout; @@ -2303,12 +2411,12 @@ public class GridNioServer { /** * Registers given socket channel to the selector, creates a session and notifies the listener. * - * @param req Registration request. + * @param fut Registration future. */ - private void register(NioOperationFuture req) { - assert req != null; + private void register(NioOperationFuture fut) { + assert fut != null; - SocketChannel sockCh = req.socketChannel(); + SocketChannel sockCh = fut.socketChannel(); assert sockCh != null; @@ -2334,24 +2442,53 @@ public class GridNioServer { filterChain, (InetSocketAddress)sockCh.getLocalAddress(), (InetSocketAddress)sockCh.getRemoteAddress(), - req.accepted(), + fut.accepted(), sndQueueLimit, writeBuf, readBuf); - Map meta = req.meta(); + Map meta = fut.meta(); if (meta != null) { for (Entry e : meta.entrySet()) ses.addMeta(e.getKey(), e.getValue()); + + if (!ses.accepted()) { + GridNioRecoveryDescriptor desc = + (GridNioRecoveryDescriptor)meta.get(RECOVERY_DESC_META_KEY); + + if (desc != null) { + ses.outRecoveryDescriptor(desc); + + if (!desc.pairedConnections()) + ses.inRecoveryDescriptor(desc); + } + } } - SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses); + SelectionKey key; - ses.key(key); + if (!sockCh.isRegistered()) { + assert fut.op == NioOperation.REGISTER : fut.op; + + key = sockCh.register(selector, SelectionKey.OP_READ, ses); + + ses.key(key); - if (!ses.accepted()) resend(ses); + } + else { + assert fut.op == NioOperation.CONNECT : fut.op; + + key = sockCh.keyFor(selector); + + key.attach(ses); + + key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + + ses.key(key); + } sessions.add(ses); workerSessions.add(ses); @@ -2359,12 +2496,12 @@ public class GridNioServer { try { filterChain.onSessionOpened(ses); - req.onDone(ses); + fut.onDone(ses); } catch (IgniteCheckedException e) { close(ses, e); - req.onDone(e); + fut.onDone(e); } if (closed) @@ -2486,6 +2623,31 @@ public class GridNioServer { } /** + * @param key Key. + * @throws IOException If failed. + */ + @SuppressWarnings("unchecked") + private void processConnect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel)key.channel(); + + NioOperationFuture sesFut = (NioOperationFuture)key.attachment(); + + assert sesFut != null; + + try { + if (ch.finishConnect()) + register(sesFut); + } + catch (IOException e) { + U.closeQuiet(ch); + + sesFut.onDone(new GridNioException("Failed to connect to node", e)); + + throw e; + } + } + + /** * Processes read-available event on the key. * * @param key Key that is ready to be read. @@ -2690,7 +2852,7 @@ public class GridNioServer { if (log.isDebugEnabled()) log.debug("Accepted new client connection: " + sockCh.socket().getRemoteSocketAddress()); - addRegistrationReq(sockCh); + addRegistrationRequest(sockCh); } } } @@ -2701,15 +2863,21 @@ public class GridNioServer { * * @param sockCh Socket channel to be registered on one of the selectors. */ - private void addRegistrationReq(SocketChannel sockCh) { - offerBalanced(new NioOperationFuture(sockCh)); + private void addRegistrationRequest(SocketChannel sockCh) { + offerBalanced(new NioOperationFuture<>(sockCh, true, null), null); } } /** * Asynchronous operation that may be requested on selector. */ - enum NioOperation { + private enum NioOperation { + /** Register connect key selection. */ + CONNECT, + + /** Cancel connect. */ + CANCEL_CONNECT, + /** Register read key selection. */ REGISTER, @@ -2914,7 +3082,7 @@ public class GridNioServer { * Class for requesting write and session close operations. */ private static class NioOperationFuture extends GridNioFutureImpl implements SessionWriteRequest, - SessionChangeRequest { + SessionChangeRequest, GridNioKeyAttachment { /** Socket channel in register request. */ @GridToStringExclude private SocketChannel sockCh; @@ -2942,15 +3110,6 @@ public class GridNioServer { private boolean skipRecovery; /** - * Creates registration request for a given socket channel. - * - * @param sockCh Socket channel to register on selector. - */ - NioOperationFuture(SocketChannel sockCh) { - this(sockCh, true, null); - } - - /** * @param sockCh Socket channel. * @param accepted {@code True} if socket has been accepted. * @param meta Optional meta. @@ -3038,6 +3197,11 @@ public class GridNioServer { } /** {@inheritDoc} */ + @Override public boolean hasSession() { + return ses != null; + } + + /** {@inheritDoc} */ public NioOperation operation() { return op; } http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 66f9176..71a3b8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -37,7 +37,7 @@ import org.jsr166.ConcurrentLinkedDeque8; * Note that this implementation requires non-null values for local and remote * socket addresses. */ -class GridSelectorNioSessionImpl extends GridNioSessionImpl { +class GridSelectorNioSessionImpl extends GridNioSessionImpl implements GridNioKeyAttachment { /** Pending write requests. */ private final ConcurrentLinkedDeque8 queue = new ConcurrentLinkedDeque8<>(); @@ -129,6 +129,16 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { } } + /** {@inheritDoc} */ + @Override public boolean hasSession() { + return true; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridSelectorNioSessionImpl session() { + return this; + } + /** * @return Worker. */ @@ -385,22 +395,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { return inRecovery; } - /** {@inheritDoc} */ - @Override public T addMeta(int key, @Nullable T val) { - if (!accepted() && val instanceof GridNioRecoveryDescriptor) { - outRecovery = (GridNioRecoveryDescriptor)val; - - if (!outRecovery.pairedConnections()) - inRecovery = outRecovery; - - outRecovery.onConnected(); - - return null; - } - else - return super.addMeta(key, val); - } - /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index b4bd34a..f8a0dce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { /** Allocate direct buffer or heap buffer. */ private boolean directBuf; - /** Whether SSLEngine should use client mode. */ - private boolean clientMode; - /** Whether direct mode is used. */ private boolean directMode; @@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { } /** - * @param clientMode Flag indicating whether SSLEngine should use client mode.. - */ - public void clientMode(boolean clientMode) { - this.clientMode = clientMode; - } - - /** * * @param directMode Flag indicating whether direct mode is used. */ @@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter { if (sslMeta == null) { engine = sslCtx.createSSLEngine(); + boolean clientMode = !ses.accepted(); + engine.setUseClientMode(clientMode); if (!clientMode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index bab9cfa..0e712ae 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3162,10 +3162,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); - meta.put(-1, recoveryDesc); + meta.put(GridNioServer.RECOVERY_DESC_META_KEY, recoveryDesc); } - GridNioSession ses = nioSrvr.createSession(ch, meta).get(); + GridNioSession ses = nioSrvr.createSession(ch, meta, false, null).get(); client = new GridTcpNioCommunicationClient(connIdx, ses, log); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java index b3b5d1a..3ba319b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java @@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { // Try provoke connection close on socket writeTimeout. commSpi.setSharedMemoryPort(-1); commSpi.setMessageQueueLimit(10); - commSpi.setSocketReceiveBuffer(40); - commSpi.setSocketSendBuffer(40); + commSpi.setSocketReceiveBuffer(64); + commSpi.setSocketSendBuffer(64); commSpi.setSocketWriteTimeout(100); commSpi.setUnacknowledgedMessagesBufferSize(1000); commSpi.setConnectTimeout(10_000); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index 4dbb7ce..e623467 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { try { SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port())); - GridNioFuture fut = srvr1.createSession(ch, null); + GridNioFuture fut = srvr1.createSession(ch, null, false, null); ses = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 1e25003..5d8e316 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.BasicAddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteCallable; http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 88276c2..e51dac8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(true); + for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi spi = getSpi(i); @@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest spi : spis.values()) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 78bf869..ce96c55 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static int port = 60_000; /** Use ssl. */ @@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(true); + for (int i = 0; i < SPI_CNT; i++) { CommunicationSpi spi = createSpi(); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false); + node.order(i + 1); GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + info(">>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index feaae11..1467c29 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -64,6 +66,9 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static final int SPI_CNT = 2; /** @@ -159,6 +164,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest sessions = GridTestUtils.getFieldValue(srv, "sessions"); + final Collection sessions = GridTestUtils.getFieldValue(srv, "sessions"); - assertFalse(sessions.isEmpty()); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !sessions.isEmpty(); + } + }, 5_000); boolean found = false; @@ -282,7 +293,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(true); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -392,14 +407,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 2a043ee..c722577 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest /** Use ssl. */ protected boolean useSsl; + /** */ + private static GridTimeoutProcessor timeoutProcessor; + /** * */ @@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest * @return Timeout. */ protected long awaitForSocketWriteTimeout() { - return 8000; + return 20_000; } /** @@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + expCnt1.incrementAndGet(); int errCnt = 0; @@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest ses1.resumeReads().get(); } catch (IgniteCheckedException ignore) { - // Can fail is ses1 was closed. + // Can fail if ses1 was closed. } // Wait when session is closed, then try to open new connection from node1. @@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest Map ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(true); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(i); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index 3f58055..d1689bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest= 1 && lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + } } expMsgs += msgPerIter; @@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(true); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); @@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java index e215a34..842f283 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java @@ -202,8 +202,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); grid(0).events().localListen(new IgnitePredicate() { - @Override - public boolean apply(Event event) { + @Override public boolean apply(Event evt) { latch.countDown(); return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf16c99/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 8a20eec..a241a04 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication { HandshakeFinish fin = new HandshakeFinish(); - GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get(); + GridNioFuture sesFut = + nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin), false, null); + + GridNioSession ses = sesFut.get(); client = new HadoopTcpNioCommunicationClient(ses);