ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [2/4] ignite git commit: Implemented GridClientOptimizedMarshaller that zip content.
Date Tue, 14 Jun 2016 02:54:07 GMT
Implemented GridClientOptimizedMarshaller that zip content.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c599cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c599cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c599cf

Branch: refs/heads/master
Commit: 96c599cf177321f4dd2a402f696fe4f0cafd9224
Parents: 80d19f0
Author: Alexey Kuznetsov <akuznetsov@apache.org>
Authored: Tue Jun 14 09:34:34 2016 +0700
Committer: Alexey Kuznetsov <akuznetsov@apache.org>
Committed: Tue Jun 14 09:34:34 2016 +0700

----------------------------------------------------------------------
 .../GridClientConnectionManagerAdapter.java     |  25 ++-
 .../connection/GridClientNioTcpConnection.java  |   3 +
 .../GridClientOptimizedMarshaller.java          |   4 +-
 .../GridClientZipOptimizedMarshaller.java       | 167 +++++++++++++++++++
 .../impl/GridTcpRouterNioListenerAdapter.java   |  11 +-
 .../message/GridClientHandshakeRequest.java     |   4 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |  19 ++-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |  12 +-
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 9 files changed, 230 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 1bea3cc..6ea7c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.client.GridClientProtocol;
 import org.apache.ignite.internal.client.GridServerUnreachableException;
 import org.apache.ignite.internal.client.impl.GridClientFutureAdapter;
 import org.apache.ignite.internal.client.impl.GridClientThreadFactory;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.client.util.GridClientStripedLock;
 import org.apache.ignite.internal.client.util.GridClientUtils;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
@@ -460,9 +462,26 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
             GridClientConnection conn;
 
             if (cfg.getProtocol() == GridClientProtocol.TCP) {
-                conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
-                    cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
-                    cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepBinariesThreadLocal());
+                GridClientMarshaller marsh = cfg.getMarshaller();
+
+                try {
+                    conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+                        cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+                        cfg.isTcpNoDelay(), marsh, marshId, top, cred, keepBinariesThreadLocal());
+                }
+                catch (GridClientException e) {
+                    if (marsh instanceof GridClientZipOptimizedMarshaller) {
+                        log.warning("Failed to connect with GridClientZipOptimizedMarshaller,"
+
+                            " trying to fallback to default marshaller: " + e);
+
+                        conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx,
pingExecutor,
+                            cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+                            cfg.isTcpNoDelay(), ((GridClientZipOptimizedMarshaller)marsh).defaultMarshaller(),
marshId,
+                            top, cred, keepBinariesThreadLocal());
+                    }
+                    else
+                        throw e;
+                }
             }
             else
                 throw new GridServerUnreachableException("Failed to create client (protocol
is not supported): " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index cfcb07f..8937504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.client.impl.GridClientNodeMetricsAdapter;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientAuthenticationRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
@@ -243,6 +244,8 @@ public class GridClientNioTcpConnection extends GridClientConnection {
             if (marshId != null)
                 req.marshallerId(marshId);
             // marsh != null.
+            else if (marsh instanceof GridClientZipOptimizedMarshaller)
+                req.marshallerId(GridClientZipOptimizedMarshaller.ID);
             else if (marsh instanceof GridClientOptimizedMarshaller)
                 req.marshallerId(GridClientOptimizedMarshaller.ID);
             else if (marsh instanceof GridClientJdkMarshaller)

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index 4bc1dac..a112736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@ -38,7 +38,7 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller
{
     public static final byte ID = 1;
 
     /** Optimized marshaller. */
-    private final OptimizedMarshaller opMarsh;
+    protected final OptimizedMarshaller opMarsh;
 
     /**
      * Default constructor.
@@ -136,4 +136,4 @@ public class GridClientOptimizedMarshaller implements GridClientMarshaller
{
             throw new UnsupportedOperationException();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
new file mode 100644
index 0000000..d9ce60e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientZipOptimizedMarshaller.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.marshaller.optimized;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper, that adapts {@link OptimizedMarshaller} to {@link GridClientMarshaller} interface.
+ */
+public class GridClientZipOptimizedMarshaller extends GridClientOptimizedMarshaller {
+    /** ID. */
+    public static final byte ID = 3;
+
+    /** Default buffer size. */
+    private static final int DFLT_BUFFER_SIZE = 4096;
+
+    /** Default client marshaller to fallback. */
+    private final GridClientMarshaller dfltMarsh;
+
+    /**
+     * Constructor.
+     *
+     * @param dfltMarsh Marshaller to fallback to.
+     * @param plugins Plugins.
+     */
+    public GridClientZipOptimizedMarshaller(GridClientMarshaller dfltMarsh, @Nullable List<PluginProvider>
plugins) {
+        super(plugins);
+
+        assert dfltMarsh!= null;
+
+        this.dfltMarsh = dfltMarsh;
+    }
+
+    /**
+     * Default marshaller that will be used in case of backward compatibility.
+     *
+     * @return Marshaller to fallback.
+     */
+    public GridClientMarshaller defaultMarshaller() {
+        return dfltMarsh;
+    }
+
+    /**
+     * Zips bytes.
+     *
+     * @param input Input bytes.
+     * @return Zipped byte array.
+     * @throws IOException If failed.
+     */
+    public static byte[] zipBytes(byte[] input) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+        try (ZipOutputStream zos = new ZipOutputStream(baos)) {
+            ZipEntry entry = new ZipEntry("");
+
+            try {
+                entry.setSize(input.length);
+
+                zos.putNextEntry(entry);
+                zos.write(input);
+            }
+            finally {
+                zos.closeEntry();
+            }
+        }
+
+        return baos.toByteArray();
+    }
+
+    /**
+     * Unzip bytes.
+     *
+     * @param input Zipped bytes.
+     * @return Unzipped byte array.
+     * @throws IOException
+     */
+    private static byte[] unzipBytes(byte[] input) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(input);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+        try(ZipInputStream zis = new ZipInputStream(bais)) {
+            zis.getNextEntry();
+
+            byte[] buf = new byte[DFLT_BUFFER_SIZE];
+
+            int len = zis.read(buf);
+
+            while (len > 0) {
+                baos.write(buf, 0, len);
+
+                len = zis.read(buf);
+            }
+        }
+
+        return baos.toByteArray();
+    }
+    /** {@inheritDoc} */
+    @Override public ByteBuffer marshal(Object obj, int off) throws IOException {
+        try {
+            if (!(obj instanceof GridClientMessage))
+                throw new IOException("Message serialization of given type is not supported:
" +
+                    obj.getClass().getName());
+
+            byte[] marshBytes = opMarsh.marshal(obj);
+
+            boolean zip = marshBytes.length > 512;
+
+            byte[] bytes = zip ? zipBytes(marshBytes) : marshBytes;
+
+            ByteBuffer buf = ByteBuffer.allocate(off + bytes.length + 1);
+
+            buf.position(off);
+            buf.put((byte)(zip ? 1 : 0));
+            buf.put(bytes);
+            buf.flip();
+
+            return buf;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unmarshal(byte[] bytes) throws IOException {
+        try {
+            boolean unzip = bytes[0] > 0;
+
+            byte[] marshBytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+
+            return opMarsh.unmarshal(unzip ? unzipBytes(marshBytes) : marshBytes, null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 02b63ad..6bcea09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.client.router.impl;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteLogger;
@@ -29,6 +30,7 @@ import org.apache.ignite.internal.client.GridClientFutureListener;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.rest.client.message.GridRouterRespo
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.PluginProvider;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MARSHALLER;
@@ -78,7 +81,11 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
 
         marshMap = new HashMap<>();
 
-        marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller(U.allPluginProviders()));
+        List<PluginProvider> providers = U.allPluginProviders();
+        GridClientOptimizedMarshaller optdMarsh = new GridClientOptimizedMarshaller(providers);
+
+        marshMap.put(GridClientOptimizedMarshaller.ID, optdMarsh);
+        marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optdMarsh,
providers));
         marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
 
         init();
@@ -213,4 +220,4 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
index 3790dd0..4e1ba91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequest.java
@@ -66,7 +66,7 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage
{
      * @param marshId Marshaller ID.
      */
     public void marshallerId(byte marshId) {
-        assert marshId >= 0 && marshId <= 2;
+        assert marshId >= 0 && marshId <= 3;
 
         this.marshId = marshId;
     }
@@ -104,4 +104,4 @@ public class GridClientHandshakeRequest extends GridClientAbstractMessage
{
     @Override public String toString() {
         return getClass().getSimpleName() + " [arr=" + Arrays.toString(arr) + ']';
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
index bf177cf..2cfdb75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java
@@ -250,14 +250,17 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
                             GridNioFuture<?> sf = ses.send(res);
 
                             // Check if send failed.
-                            if (sf.isDone())
-                                try {
-                                    sf.get();
-                                }
-                                catch (Exception e) {
-                                    U.error(log, "Failed to process client request [ses="
+ ses + ", msg=" + msg + ']',
-                                        e);
+                            sf.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?>
fut) {
+                                    try {
+                                        fut.get();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        U.error(log, "Failed to process client request [ses="
+ ses +
+                                            ", msg=" + msg + ']', e);
+                                    }
                                 }
+                            });
                         }
                     });
                 else
@@ -360,4 +363,4 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli
     @Override public void onSessionIdleTimeout(GridNioSession ses) {
         ses.close();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index a4a51ea..6338fcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.client.marshaller.GridClientMarshaller;
 import org.apache.ignite.internal.client.marshaller.jdk.GridClientJdkMarshaller;
 import org.apache.ignite.internal.client.marshaller.optimized.GridClientOptimizedMarshaller;
+import org.apache.ignite.internal.client.marshaller.optimized.GridClientZipOptimizedMarshaller;
 import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
 import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
 import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.IgnitePortProtocol;
 import org.jetbrains.annotations.Nullable;
 
@@ -170,8 +172,12 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
 
         Map<Byte, GridClientMarshaller> marshMap = new HashMap<>();
 
-        marshMap.put(GridClientOptimizedMarshaller.ID,
-            new GridClientOptimizedMarshaller(new ArrayList<>(ctx.plugins().allProviders())));
+        ArrayList<PluginProvider> providers = new ArrayList<>(ctx.plugins().allProviders());
+
+        GridClientOptimizedMarshaller optMarsh = new GridClientOptimizedMarshaller(providers);
+
+        marshMap.put(GridClientOptimizedMarshaller.ID, optMarsh);
+        marshMap.put(GridClientZipOptimizedMarshaller.ID, new GridClientZipOptimizedMarshaller(optMarsh,
providers));
         marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
 
         lsnr.marshallers(marshMap);
@@ -291,4 +297,4 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
     @Override protected String getPortPropertyName() {
         return IgniteNodeAttributes.ATTR_REST_TCP_PORT;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c599cf/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a32f04f..9fd5e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1572,8 +1572,7 @@ public class GridNioServer<T> {
                     throw e;
                 }
                 catch (Exception e) {
-                    if (!closed)
-                        U.warn(log, "Failed to process selector key (will close): " + ses,
e);
+                    U.warn(log, "Failed to process selector key (will close): " + ses, e);
 
                     close(ses, new GridNioException(e));
                 }
@@ -1640,9 +1639,10 @@ public class GridNioServer<T> {
                 try {
                     long writeTimeout0 = writeTimeout;
 
+                    boolean opWrite = key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE)
!= 0;
+
                     // If we are writing and timeout passed.
-                    if (key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE)
!= 0 &&
-                        now - ses.lastSendTime() > writeTimeout0) {
+                    if (opWrite && now - ses.lastSendTime() > writeTimeout0) {
                         filterChain.onSessionWriteTimeout(ses);
 
                         // Update timestamp to avoid multiple notifications within one timeout
interval.
@@ -1653,7 +1653,7 @@ public class GridNioServer<T> {
 
                     long idleTimeout0 = idleTimeout;
 
-                    if (now - ses.lastReceiveTime() > idleTimeout0 && now - ses.lastSendScheduleTime()
> idleTimeout0) {
+                    if (!opWrite && now - ses.lastReceiveTime() > idleTimeout0
&& now - ses.lastSendScheduleTime() > idleTimeout0) {
                         filterChain.onSessionIdleTimeout(ses);
 
                         // Update timestamp to avoid multiple notifications within one timeout
interval.


Mime
View raw message