Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E8F7B200BAD for ; Mon, 10 Oct 2016 14:13:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E7A8D160ACA; Mon, 10 Oct 2016 12:13:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01503160AFE for ; Mon, 10 Oct 2016 14:12:59 +0200 (CEST) Received: (qmail 48169 invoked by uid 500); 10 Oct 2016 12:12:58 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 47338 invoked by uid 99); 10 Oct 2016 12:12:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 12:12:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1164EEE21; Mon, 10 Oct 2016 12:12:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 10 Oct 2016 12:13:39 -0000 Message-Id: <4f10a590ac3e4a6ea2fa55a64b41bd21@git.apache.org> In-Reply-To: <1c2b871572b547e0bad93846577cf97b@git.apache.org> References: <1c2b871572b547e0bad93846577cf97b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] ignite git commit: io archived-at: Mon, 10 Oct 2016 12:13:02 -0000 io Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a995d5b3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a995d5b3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a995d5b3 Branch: refs/heads/ignite-gg-8-io2-selNow Commit: a995d5b3b6581e1b203d82043f1aca9941d5a4ca Parents: 8b9c54d Author: sboikov Authored: Wed Oct 5 08:29:18 2016 +0300 Committer: sboikov Committed: Wed Oct 5 08:29:18 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/nio/GridNioServer.java | 24 ++-- .../communication/tcp/TcpCommunicationSpi.java | 126 ++++++++++++------- 2 files changed, 93 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a995d5b3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index ccaacf9..5efca2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -414,7 +414,7 @@ public class GridNioServer { * @param msg Message. * @return Future for operation. */ - GridNioFuture send(GridNioSession ses, ByteBuffer msg, boolean createFut) { + GridNioFuture send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl : ses; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -440,7 +440,7 @@ public class GridNioServer { * @param msg Message. * @return Future for operation. */ - GridNioFuture send(GridNioSession ses, Message msg, boolean createFut) { + GridNioFuture send(GridNioSession ses, Message msg, boolean createFut) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -467,7 +467,7 @@ public class GridNioServer { * @param fut Future. * @param sys System message flag. */ - private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest fut, boolean sys) { + private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest fut, boolean sys) throws IgniteCheckedException { assert ses != null; assert fut != null; @@ -479,8 +479,14 @@ public class GridNioServer { fut.ackClosure(ackC); if (ses.closed()) { - if (ses.removeFuture(fut)) + if (ses.removeFuture(fut)) { + IOException err = new IOException("Failed to send message (connection was closed): " + ses); + fut.connectionClosed(); + + if (!(fut instanceof GridNioFuture)) + throw new IgniteCheckedException(err); + } } else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)fut); @@ -495,7 +501,7 @@ public class GridNioServer { * @param ses Session. * @param msg Message. */ - public void sendSystem(GridNioSession ses, Message msg) { + public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException { sendSystem(ses, msg, null); } @@ -508,7 +514,7 @@ public class GridNioServer { */ public void sendSystem(GridNioSession ses, Message msg, - @Nullable IgniteInClosure> lsnr) { + @Nullable IgniteInClosure> lsnr) throws IgniteCheckedException { assert ses instanceof GridSelectorNioSessionImpl; GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses; @@ -551,13 +557,13 @@ public class GridNioServer { for (SessionWriteRequest fut : futs) { fut.messageThread(true); - ((NioOperationFuture)fut).resetSession(ses0); + fut.resetSession(ses0); } ses0.resend(futs); // Wake up worker. - clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0)); + clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0)); } } @@ -2638,7 +2644,7 @@ public class GridNioServer { } /** {@inheritDoc} */ - @Override public GridNioFuture onSessionWrite(GridNioSession ses, Object msg, boolean fut) { + @Override public GridNioFuture onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { if (directMode) { boolean sslSys = sslFilter != null && msg instanceof ByteBuffer; http://git-wip-us.apache.org/repos/asf/ignite/blob/a995d5b3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index b5b9d40..79eeb4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -691,8 +691,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.resend(ses); - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + try { + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } recovery.onConnected(); @@ -716,12 +721,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery, GridNioSession ses, boolean sndRes) { - ses.inRecoveryDescriptor(recovery); + try { + ses.inRecoveryDescriptor(recovery); - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); - recovery.onConnected(); + recovery.onConnected(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } /** @@ -758,30 +768,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { - failed = !success; + try { + failed = !success; - if (success) { - IgniteInClosure> lsnr = new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture msgFut) { - try { - msgFut.get(); + if (success) { + IgniteInClosure> lsnr = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture msgFut) { + try { + msgFut.get(); - connectedNew(recoveryDesc, ses, false); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + connectedNew(recoveryDesc, ses, false); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - recoveryDesc.release(); + recoveryDesc.release(); + } } - } - }; + }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + else + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); } - else - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); } } @@ -842,32 +857,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { if (success) { - IgniteInClosure> lsnr = new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture msgFut) { - try { - msgFut.get(); + try { + IgniteInClosure> lsnr = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture msgFut) { + try { + msgFut.get(); - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); - fut.onDone(client); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + fut.onDone(client); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - recoveryDesc.release(); + recoveryDesc.release(); - fut.onDone(); - } - finally { - clientFuts.remove(connKey, fut); + fut.onDone(); + } + finally { + clientFuts.remove(connKey, fut); + } } - } - }; + }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } else { try { @@ -3511,9 +3531,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + ", rcvCnt=" + msg.received() + ']'); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + try { + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to send message: " + err, err); + } continue; } @@ -3569,9 +3594,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", lastAcked=" + recovery.lastAcknowledged() + ']'); } - nioSrvr.sendSystem(ses, msg); + try { + nioSrvr.sendSystem(ses, msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } }