ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [43/50] [abbrv] ignite git commit: io
Date Mon, 10 Oct 2016 12:13:39 GMT
io


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a995d5b3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a995d5b3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a995d5b3

Branch: refs/heads/ignite-gg-8-io2-selNow
Commit: a995d5b3b6581e1b203d82043f1aca9941d5a4ca
Parents: 8b9c54d
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 5 08:29:18 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 5 08:29:18 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java |  24 ++--
 .../communication/tcp/TcpCommunicationSpi.java  | 126 ++++++++++++-------
 2 files changed, 93 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a995d5b3/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index ccaacf9..5efca2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -414,7 +414,7 @@ public class GridNioServer<T> {
      * @param msg Message.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) {
+    GridNioFuture<?> send(GridNioSession ses, ByteBuffer msg, boolean createFut) throws
IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl : ses;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -440,7 +440,7 @@ public class GridNioServer<T> {
      * @param msg Message.
      * @return Future for operation.
      */
-    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) {
+    GridNioFuture<?> send(GridNioSession ses, Message msg, boolean createFut) throws
IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -467,7 +467,7 @@ public class GridNioServer<T> {
      * @param fut Future.
      * @param sys System message flag.
      */
-    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest fut, boolean sys)
{
+    private void send0(GridSelectorNioSessionImpl ses, SessionWriteRequest fut, boolean sys)
throws IgniteCheckedException {
         assert ses != null;
         assert fut != null;
 
@@ -479,8 +479,14 @@ public class GridNioServer<T> {
             fut.ackClosure(ackC);
 
         if (ses.closed()) {
-            if (ses.removeFuture(fut))
+            if (ses.removeFuture(fut)) {
+                IOException err = new IOException("Failed to send message (connection was
closed): " + ses);
+
                 fut.connectionClosed();
+
+                if (!(fut instanceof GridNioFuture))
+                    throw new IgniteCheckedException(err);
+            }
         }
         else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true))
             clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)fut);
@@ -495,7 +501,7 @@ public class GridNioServer<T> {
      * @param ses Session.
      * @param msg Message.
      */
-    public void sendSystem(GridNioSession ses, Message msg) {
+    public void sendSystem(GridNioSession ses, Message msg) throws IgniteCheckedException
{
         sendSystem(ses, msg, null);
     }
 
@@ -508,7 +514,7 @@ public class GridNioServer<T> {
      */
     public void sendSystem(GridNioSession ses,
         Message msg,
-        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
+        @Nullable IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) throws
IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
 
         GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
@@ -551,13 +557,13 @@ public class GridNioServer<T> {
             for (SessionWriteRequest fut : futs) {
                 fut.messageThread(true);
 
-                ((NioOperationFuture)fut).resetSession(ses0);
+                fut.resetSession(ses0);
             }
 
             ses0.resend(futs);
 
             // Wake up worker.
-            clientWorkers.get(ses0.selectorIndex()).offer(((NioOperationFuture)fut0));
+            clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0));
         }
     }
 
@@ -2638,7 +2644,7 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg, boolean fut) {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object
msg, boolean fut) throws IgniteCheckedException {
             if (directMode) {
                 boolean sslSys = sslFilter != null && msg instanceof ByteBuffer;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a995d5b3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b5b9d40..79eeb4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -691,8 +691,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 nioSrvr.resend(ses);
 
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                try {
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
 
                 recovery.onConnected();
 
@@ -716,12 +721,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridNioRecoveryDescriptor recovery,
                 GridNioSession ses,
                 boolean sndRes) {
-                ses.inRecoveryDescriptor(recovery);
+                try {
+                    ses.inRecoveryDescriptor(recovery);
 
-                if (sndRes)
-                    nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
+                    if (sndRes)
+                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
 
-                recovery.onConnected();
+                    recovery.onConnected();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
             }
 
             /**
@@ -758,30 +768,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
-                    failed = !success;
+                    try {
+                        failed = !success;
 
-                    if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
-                            @Override public void apply(IgniteInternalFuture<?> msgFut)
{
-                                try {
-                                    msgFut.get();
+                        if (success) {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?>
msgFut) {
+                                    try {
+                                        msgFut.get();
 
-                                    connectedNew(recoveryDesc, ses, false);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+                                        connectedNew(recoveryDesc, ses, false);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake
" +
+                                                    "[rmtNode=" + rmtNode.id() + ", err="
+ e + ']');
 
-                                    recoveryDesc.release();
+                                        recoveryDesc.release();
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                        }
+                        else
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send message: " + e, e);
                     }
-                    else
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
                 }
             }
 
@@ -842,32 +857,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** {@inheritDoc} */
                 @Override public void apply(Boolean success) {
                     if (success) {
-                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>()
{
-                            @Override public void apply(IgniteInternalFuture<?> msgFut)
{
-                                try {
-                                    msgFut.get();
+                        try {
+                            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?>
msgFut) {
+                                    try {
+                                        msgFut.get();
 
-                                    GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, msg.received(),
false, createClient);
+                                        GridTcpNioCommunicationClient client =
+                                                connected(recoveryDesc, ses, rmtNode, msg.received(),
false, createClient);
 
-                                    fut.onDone(client);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to send recovery handshake " +
-                                            "[rmtNode=" + rmtNode.id() + ", err=" + e + ']');
+                                        fut.onDone(client);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to send recovery handshake
" +
+                                                    "[rmtNode=" + rmtNode.id() + ", err="
+ e + ']');
 
-                                    recoveryDesc.release();
+                                        recoveryDesc.release();
 
-                                    fut.onDone();
-                                }
-                                finally {
-                                    clientFuts.remove(connKey, fut);
+                                        fut.onDone();
+                                    }
+                                    finally {
+                                        clientFuts.remove(connKey, fut);
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()),
lsnr);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to send message: " + e, e);
+                        }
                     }
                     else {
                         try {
@@ -3511,9 +3531,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 log.debug("Send recovery acknowledgement on timeout [rmtNode="
+ nodeId +
                                     ", rcvCnt=" + msg.received() + ']');
 
-                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(),
msg);
+                            try {
+                                nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(),
msg);
 
-                            recovery.lastAcknowledged(msg.received());
+                                recovery.lastAcknowledged(msg.received());
+                            }
+                            catch (IgniteCheckedException err) {
+                                U.error(log, "Failed to send message: " + err, err);
+                            }
 
                             continue;
                         }
@@ -3569,9 +3594,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         ", lastAcked=" + recovery.lastAcknowledged() + ']');
                 }
 
-                nioSrvr.sendSystem(ses, msg);
+                try {
+                    nioSrvr.sendSystem(ses, msg);
 
-                recovery.lastAcknowledged(msg.received());
+                    recovery.lastAcknowledged(msg.received());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send message: " + e, e);
+                }
             }
         }
 


Mime
View raw message