Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6DB4C200C47 for ; Thu, 30 Mar 2017 16:38:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6C2DE160B78; Thu, 30 Mar 2017 14:38:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3B547160BA4 for ; Thu, 30 Mar 2017 16:38:16 +0200 (CEST) Received: (qmail 15577 invoked by uid 500); 30 Mar 2017 14:38:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 15451 invoked by uid 99); 30 Mar 2017 14:38:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Mar 2017 14:38:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34A3BE69B9; Thu, 30 Mar 2017 14:38:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agura@apache.org To: commits@ignite.apache.org Date: Thu, 30 Mar 2017 14:38:23 -0000 Message-Id: <9e10416029234feb8f46088bbc944128@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [9/9] ignite git commit: ignite-4003 Async outgoing connections for communication SPI archived-at: Thu, 30 Mar 2017 14:38:17 -0000 ignite-4003 Async outgoing connections for communication SPI Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04aca2e6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04aca2e6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04aca2e6 Branch: refs/heads/ignite-4003 Commit: 04aca2e62f6ac9c50fb0223fdd3d328a98ec81e0 Parents: 44cf1d2 Author: agura Authored: Tue Feb 7 14:45:57 2017 +0300 Committer: agura Committed: Thu Mar 30 17:31:42 2017 +0300 ---------------------------------------------------------------------- .../GridClientConnectionManagerAdapter.java | 1 - .../connection/GridClientNioTcpConnection.java | 2 +- .../managers/communication/GridIoManager.java | 4 + .../internal/util/GridSpinReadWriteLock.java | 2 +- .../util/nio/GridNioRecoveryDescriptor.java | 2 +- .../ignite/internal/util/nio/GridNioServer.java | 218 +- .../util/nio/GridSelectorNioSessionImpl.java | 2 - .../internal/util/nio/ssl/GridNioSslFilter.java | 12 +- .../communication/tcp/TcpCommunicationSpi.java | 1902 ++++++++++++------ .../IgniteCacheMessageWriteTimeoutTest.java | 4 +- .../internal/util/nio/GridNioSelfTest.java | 2 +- .../spi/GridTcpSpiForwardingSelfTest.java | 3 +- .../GridAbstractCommunicationSelfTest.java | 27 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 28 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 51 +- ...GridTcpCommunicationSpiRecoverySelfTest.java | 49 +- ...CommunicationRecoveryAckClosureSelfTest.java | 36 +- .../tcp/TcpCommunicationSpiDropNodesTest.java | 12 +- .../TcpCommunicationSpiFaultyClientTest.java | 5 +- .../HadoopExternalCommunication.java | 5 +- 20 files changed, 1659 insertions(+), 708 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index e325897..aa06322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -183,7 +183,6 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog); sslFilter.directMode(false); - sslFilter.clientMode(true); filters = new GridNioFilter[]{codecFilter, sslFilter}; } http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java index d3a30fb..73487db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java @@ -234,7 +234,7 @@ public class GridClientNioTcpConnection extends GridClientConnection { meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); } - ses = (GridNioSession)srv.createSession(ch, meta).get(); + ses = (GridNioSession)srv.createSession(ch, meta, false, null).get(); if (sslHandshakeFut != null) sslHandshakeFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 23738d7..40905f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -1298,6 +1299,9 @@ public class GridIoManager extends GridManagerAdapter { /** SSL write buf limit. */ private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey(); + /** Session future meta key. */ + private static final int SESSION_FUT_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Selection key meta key. */ + private static final int WORKER_IDX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + /** */ private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); @@ -462,7 +468,7 @@ public class GridNioServer { * @return Future for operation. */ public GridNioFuture close(GridNioSession ses) { - assert ses instanceof GridSelectorNioSessionImpl; + assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -706,6 +712,7 @@ public class GridNioServer { /** * */ + @SuppressWarnings("ForLoopReplaceableByForEach") public void dumpStats() { U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() + ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']'); @@ -719,17 +726,34 @@ public class GridNioServer { * * @param ch Channel to register within the server and create session for. * @param meta Optional meta for new session. + * @param async Async connection. + * @param lsnr Listener that should be invoked in NIO thread. * @return Future to get session. */ - public GridNioFuture createSession(final SocketChannel ch, - @Nullable Map meta) { + public GridNioFuture createSession( + final SocketChannel ch, + @Nullable Map meta, + boolean async, + @Nullable IgniteInClosure> lsnr + ) { try { if (!closed) { ch.configureBlocking(false); NioOperationFuture req = new NioOperationFuture<>(ch, false, meta); - offerBalanced(req); + if (async) { + assert meta != null; + + req.op = NioOperation.CONNECT; + + meta.put(SESSION_FUT_META_KEY, req); + } + + if (lsnr != null) + req.listen(lsnr); + + offerBalanced(req, meta); return req; } @@ -743,6 +767,29 @@ public class GridNioServer { } /** + * @param ch Channel. + * @param meta Session meta. + */ + public GridNioFuture cancelConnect(final SocketChannel ch, Map meta) { + if (!closed) { + NioOperationFuture req = new NioOperationFuture<>(ch, false, meta); + + req.op = NioOperation.CANCEL_CONNECT; + + Integer idx = (Integer)meta.get(WORKER_IDX_META_KEY); + + assert idx != null : meta; + + clientWorkers.get(idx).offer(req); + + return req; + } + else + return new GridNioFinishedFuture<>( + new IgniteCheckedException("Failed to cancel connection, server is stopped.")); + } + + /** * Gets configurable write timeout for this session. If not set, default value is {@link #DFLT_SES_WRITE_TIMEOUT}. * * @return Write timeout in milliseconds. @@ -828,9 +875,11 @@ public class GridNioServer { /** * @param req Request to balance. + * @param meta Session metadata. + * @return Worker index. */ - private synchronized void offerBalanced(NioOperationFuture req) { - assert req.operation() == NioOperation.REGISTER : req; + private synchronized int offerBalanced(NioOperationFuture req, @Nullable Map meta) { + assert req.operation() == NioOperation.REGISTER || req.operation() == NioOperation.CONNECT: req; assert req.socketChannel() != null : req; int workers = clientWorkers.size(); @@ -868,7 +917,12 @@ public class GridNioServer { else balanceIdx = 0; + if (meta != null) + meta.put(WORKER_IDX_META_KEY, balanceIdx); + clientWorkers.get(balanceIdx).offer(req); + + return balanceIdx; } /** {@inheritDoc} */ @@ -1692,6 +1746,38 @@ public class GridNioServer { while ((req0 = changeReqs.poll()) != null) { switch (req0.operation()) { + case CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + try { + ch.register(selector, SelectionKey.OP_CONNECT, req.meta()); + } + catch (IOException e) { + req.onDone(new IgniteCheckedException("Failed to register channel on selector", e)); + } + + break; + } + + case CANCEL_CONNECT: { + NioOperationFuture req = (NioOperationFuture)req0; + + SocketChannel ch = req.socketChannel(); + + SelectionKey key = ch.keyFor(selector); + + if (key != null) + key.cancel(); + + U.closeQuiet(ch); + + req.onDone(); + + break; + } + case REGISTER: { register((NioOperationFuture)req0); @@ -1898,8 +1984,12 @@ public class GridNioServer { log.debug("Closing all connected client sockets."); // Close all channels registered with selector. - for (SelectionKey key : selector.keys()) - close((GridSelectorNioSessionImpl)key.attachment(), null); + for (SelectionKey key : selector.keys()) { + Object attach = key.attachment(); + + if (attach instanceof GridSelectorNioSessionImpl) + close((GridSelectorNioSessionImpl)attach, null); + } if (log.isDebugEnabled()) log.debug("Closing NIO selector."); @@ -2022,11 +2112,19 @@ public class GridNioServer { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - assert ses != null; + GridSelectorNioSessionImpl ses = null; try { + if (key.isConnectable()) { + processConnect(key); + + continue; + } + + ses = (GridSelectorNioSessionImpl)key.attachment(); + + assert ses != null; + if (key.isReadable()) processRead(key); @@ -2038,9 +2136,11 @@ public class GridNioServer { throw e; } catch (Exception e) { - U.warn(log, "Failed to process selector key (will close): " + ses, e); + if (!closed) + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); - close(ses, new GridNioException(e)); + if (ses != null) + close(ses, new GridNioException(e)); } } } @@ -2067,11 +2167,19 @@ public class GridNioServer { if (!key.isValid()) continue; - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); - - assert ses != null; + GridSelectorNioSessionImpl ses = null; try { + if (key.isConnectable()) { + processConnect(key); + + continue; + } + + ses = (GridSelectorNioSessionImpl)key.attachment(); + + assert ses != null; + if (key.isReadable()) processRead(key); @@ -2084,9 +2192,10 @@ public class GridNioServer { } catch (Exception e) { if (!closed) - U.warn(log, "Failed to process selector key (will close): " + ses, e); + U.error(log, "Failed to process selector key [ses=" + ses + ']', e); - close(ses, new GridNioException(e)); + if (ses != null) + close(ses, new GridNioException(e)); } } } @@ -2100,7 +2209,12 @@ public class GridNioServer { long now = U.currentTimeMillis(); for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + Object obj = key.attachment(); + + if (!(obj instanceof GridSelectorNioSessionImpl)) + continue; + + GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)obj; try { long writeTimeout0 = writeTimeout; @@ -2181,12 +2295,32 @@ public class GridNioServer { ses.addMeta(e.getKey(), e.getValue()); } - SelectionKey key = sockCh.register(selector, SelectionKey.OP_READ, ses); + SelectionKey key; - ses.key(key); + if (!sockCh.isRegistered()) + key = sockCh.register(selector, SelectionKey.OP_READ, ses); + else { + key = sockCh.keyFor(selector); + + Map m = (Map)key.attachment(); + + NioOperationFuture fut = + (NioOperationFuture)m.remove(SESSION_FUT_META_KEY); + + assert fut != null; + + for (Entry e : m.entrySet()) + ses.addMeta(e.getKey(), e.getValue()); + + key.attach(ses); + + key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT)); + key.interestOps(key.interestOps() | SelectionKey.OP_READ); - if (!ses.accepted()) - resend(ses); + fut.onDone(ses); + } + + ses.key(key); sessions.add(ses); workerSessions.add(ses); @@ -2321,6 +2455,34 @@ public class GridNioServer { } /** + * @param key Key. + * @throws IOException If failed. + */ + @SuppressWarnings("unchecked") + private void processConnect(SelectionKey key) throws IOException { + SocketChannel ch = (SocketChannel)key.channel(); + + Map meta = (Map)key.attachment(); + + try { + if (ch.finishConnect()) + register(new NioOperationFuture(ch, false, meta)); + } + catch (IOException e) { + NioOperationFuture sesFut = + (NioOperationFuture)meta.get(SESSION_FUT_META_KEY); + + assert sesFut != null; + + U.closeQuiet(ch); + + sesFut.onDone(new GridNioException("Failed to connect to node", e)); + + throw e; + } + } + + /** * Processes read-available event on the key. * * @param key Key that is ready to be read. @@ -2537,14 +2699,20 @@ public class GridNioServer { * @param sockCh Socket channel to be registered on one of the selectors. */ private void addRegistrationReq(SocketChannel sockCh) { - offerBalanced(new NioOperationFuture(sockCh)); + offerBalanced(new NioOperationFuture(sockCh), null); } } /** * Asynchronous operation that may be requested on selector. */ - enum NioOperation { + private enum NioOperation { + /** Register connect key selection. */ + CONNECT, + + /** Cancel connect. */ + CANCEL_CONNECT, + /** Register read key selection. */ REGISTER, http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index 66f9176..2280321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -393,8 +393,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (!outRecovery.pairedConnections()) inRecovery = outRecovery; - outRecovery.onConnected(); - return null; } else http://git-wip-us.apache.org/repos/asf/ignite/blob/04aca2e6/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java index b4bd34a..f8a0dce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java @@ -69,9 +69,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { /** Allocate direct buffer or heap buffer. */ private boolean directBuf; - /** Whether SSLEngine should use client mode. */ - private boolean clientMode; - /** Whether direct mode is used. */ private boolean directMode; @@ -93,13 +90,6 @@ public class GridNioSslFilter extends GridNioFilterAdapter { } /** - * @param clientMode Flag indicating whether SSLEngine should use client mode.. - */ - public void clientMode(boolean clientMode) { - this.clientMode = clientMode; - } - - /** * * @param directMode Flag indicating whether direct mode is used. */ @@ -164,6 +154,8 @@ public class GridNioSslFilter extends GridNioFilterAdapter { if (sslMeta == null) { engine = sslCtx.createSSLEngine(); + boolean clientMode = !ses.accepted(); + engine.setUseClientMode(clientMode); if (!clientMode) {