ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: IGNITE-8739 Implemented WA for tcp communication SPI hanging on descriptor reservation - Fixes #4148.
Date Fri, 08 Jun 2018 15:08:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 4f6e02cdd -> 70216517a


IGNITE-8739 Implemented WA for tcp communication SPI hanging on descriptor reservation - Fixes
#4148.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70216517
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70216517
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70216517

Branch: refs/heads/master
Commit: 70216517af357a8d77f1811f986f133c4694bdb4
Parents: 4f6e02c
Author: Anton Kalashnikov <kaa.dev@yandex.ru>
Authored: Fri Jun 8 18:06:42 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Fri Jun 8 18:06:42 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  52 ++++++-
 .../internal/util/nio/GridNioSessionImpl.java   |   5 +
 .../util/nio/GridSelectorNioSessionImpl.java    |  23 +++
 .../communication/tcp/TcpCommunicationSpi.java  |  20 ++-
 ...onnectionConcurrentReserveAndRemoveTest.java | 146 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 7 files changed, 248 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 4ed4717..9ff216b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -892,6 +892,12 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING";
 
     /**
+     * Sets timeout for TCP client recovery descriptor reservation.
+     */
+    public static final String IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT =
+            "IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT";
+
+    /**
      * When set to {@code true}, Ignite will skip partitions sizes check on partition validation
after rebalance has finished.
      * Partitions sizes may differs on nodes when Expiry Policy is in use and it is ok due
to lazy entry eviction mechanics.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index af7b757..bd1291a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -22,16 +22,24 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT;
+
 /**
  * Recovery information for single node.
  */
 public class GridNioRecoveryDescriptor {
+    /** Timeout for outgoing recovery descriptor reservation. */
+    private static final long DESC_RESERVATION_TIMEOUT =
+        Math.max(1_000, IgniteSystemProperties.getLong(IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT,
5_000));
+
     /** Number of acknowledged messages. */
     private long acked;
 
@@ -80,6 +88,10 @@ public class GridNioRecoveryDescriptor {
     /** */
     private final boolean pairedConnections;
 
+    /** Session for the descriptor. */
+    @GridToStringExclude
+    private GridNioSession ses;
+
     /**
      * @param pairedConnections {@code True} if in/out connections pair is used for communication
with node.
      * @param queueLimit Maximum size of unacknowledged messages queue.
@@ -271,8 +283,19 @@ public class GridNioRecoveryDescriptor {
      */
     public boolean reserve() throws InterruptedException {
         synchronized (this) {
-            while (!connected && reserved)
-                wait();
+            long t0 = System.nanoTime();
+
+            while (!connected && reserved) {
+                wait(DESC_RESERVATION_TIMEOUT);
+
+                if ((System.nanoTime() - t0) / 1_000_000 >= DESC_RESERVATION_TIMEOUT -
100) {
+                    // Dumping a descriptor.
+                    log.error("Failed to wait for recovery descriptor reservation " +
+                        "[desc=" + this + ", ses=" + ses + ']');
+
+                    return false;
+                }
+            }
 
             if (!connected) {
                 reserved = true;
@@ -354,6 +377,8 @@ public class GridNioRecoveryDescriptor {
         SessionWriteRequest[] futs = null;
 
         synchronized (this) {
+            ses = null;
+
             connected = false;
 
             if (handshakeReq != null) {
@@ -440,16 +465,35 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
+     * @return Current session.
+     */
+    public synchronized GridNioSession session() {
+        return ses;
+    }
+
+    /**
+     * @param ses Session.
+     */
+    public synchronized void session(GridNioSession ses) {
+        this.ses = ses;
+    }
+
+    /**
      * @param reqs Requests to notify about error.
      */
     private void notifyOnNodeLeft(SessionWriteRequest[] reqs) {
         IOException e = new IOException("Failed to send message, node has left: " + node.id());
+        IgniteException cloErr = null;
 
         for (SessionWriteRequest req : reqs) {
             req.onError(e);
 
-            if (req.ackClosure() != null)
-                req.ackClosure().apply(new IgniteException(e));
+            if (req.ackClosure() != null) {
+                if (cloErr == null)
+                    cloErr = new IgniteException(e);
+
+                req.ackClosure().apply(cloErr);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 98a22d6..c6410c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -77,6 +77,9 @@ public class GridNioSessionImpl implements GridNioSession {
     /** Accepted flag. */
     private final boolean accepted;
 
+    /** For debug purposes. */
+    private volatile boolean markedForClose;
+
     /**
      * @param filterChain Chain.
      * @param locAddr Local address.
@@ -156,6 +159,8 @@ public class GridNioSessionImpl implements GridNioSession {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public GridNioFuture<Boolean> close() {
+        markedForClose = true;
+
         try {
             return filterChain.onSessionClose(this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index d30b122..d9c3cae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -377,6 +378,8 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements
GridNioKe
         assert recoveryDesc != null;
 
         outRecovery = recoveryDesc;
+
+        outRecovery.session(this);
     }
 
     /** {@inheritDoc} */
@@ -437,6 +440,26 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl implements
GridNioKe
     }
 
     /** {@inheritDoc} */
+    @Override public GridNioFuture<Boolean> close() {
+        GridNioFuture<Boolean> fut = super.close();
+
+        if (!fut.isDone()) {
+            fut.listen(fut0 -> {
+                try {
+                    fut0.get();
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to close session [ses=" + GridSelectorNioSessionImpl.this
+ ']', e);
+                }
+            });
+        }
+        else if (fut.error() != null)
+            log.error("Failed to close session [ses=" + GridSelectorNioSessionImpl.this +
']', fut.error());
+
+        return fut;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f9fd6fd..7583d96 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2714,8 +2714,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                 }
                 while (retry);
             }
-            catch (IgniteCheckedException e) {
-                throw new IgniteSpiException("Failed to send message to remote node: " +
node, e);
+            catch (Throwable t) {
+                log.error("Failed to send message to remote node [node=" + node + ", msg="
+ msg + ']', t);
+
+                if (t instanceof Error)
+                    throw (Error)t;
+
+                if (t instanceof RuntimeException)
+                    throw (RuntimeException)t;
+
+                throw new IgniteSpiException("Failed to send message to remote node: " +
node, t);
             }
             finally {
                 if (client != null && removeNodeClient(node.id(), client))
@@ -3249,6 +3257,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
 
+                        // Ensure the session is closed.
+                        GridNioSession ses = recoveryDesc.session();
+
+                        if (ses != null) {
+                            while(ses.closeTime() == 0)
+                                ses.close();
+                        }
+
                         return null;
                     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/test/java/org/apache/ignite/internal/IgniteConnectionConcurrentReserveAndRemoveTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteConnectionConcurrentReserveAndRemoveTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteConnectionConcurrentReserveAndRemoveTest.java
new file mode 100644
index 0000000..fb19449
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteConnectionConcurrentReserveAndRemoveTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ */
+public class IgniteConnectionConcurrentReserveAndRemoveTest extends GridCommonAbstractTest
{
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
+
+        DataStorageConfiguration memCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+            new DataRegionConfiguration().setMaxSize(50 * 1024 * 1024));
+
+        c.setDataStorageConfiguration(memCfg);
+
+        c.setClientMode(igniteInstanceName.startsWith("client"));
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+        spi.setIdleConnectionTimeout(Integer.MAX_VALUE);
+
+        c.setCommunicationSpi(spi);
+
+        return c;
+    }
+
+    /** */
+    private static final class TestClosure implements IgniteCallable<Integer> {
+        /** Serial version uid. */
+        private static final long serialVersionUid = 0L;
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return 1;
+        }
+    }
+
+
+    public void test() throws Exception {
+        IgniteEx svr = startGrid(0);
+
+        Ignite c1 = startGrid("client1");
+
+        assertTrue(c1.configuration().isClientMode());
+
+        Ignite c2 = startGrid("client2");
+
+        assertTrue(c2.configuration().isClientMode());
+
+        TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)c1.configuration().getCommunicationSpi();
+
+        spi2.blockMessages(HandshakeMessage2.class, c1.name());
+
+        AtomicInteger cnt = new AtomicInteger();
+
+        cnt.getAndAdd(c1.compute(c1.cluster().forNodeId(c2.cluster().localNode().id())).call(new
TestClosure()));
+
+        TcpCommunicationSpi spi1 = (TcpCommunicationSpi)c1.configuration().getCommunicationSpi();
+
+        ConcurrentMap<UUID, GridCommunicationClient[]> clientsMap = U.field(spi1, "clients");
+
+        GridCommunicationClient[] arr = clientsMap.get(c2.cluster().localNode().id());
+
+        GridTcpNioCommunicationClient client = null;
+
+        for (GridCommunicationClient c : arr) {
+            client = (GridTcpNioCommunicationClient)c;
+
+            if(client != null) {
+                assertTrue(client.session().outRecoveryDescriptor().reserved());
+
+                assertFalse(client.session().outRecoveryDescriptor().connected());
+            }
+        }
+
+        assertNotNull(client);
+
+        //spi1.failSend = true;
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                doSleep(1000);
+
+                //spi1.failSend = false;
+
+                cnt.getAndAdd(c1.compute(c1.cluster().forNodeId(c2.cluster().localNode().id())).call(new
TestClosure()));
+            }
+        }, 1, "hang-thread");
+
+        try {
+            cnt.getAndAdd(c1.compute(c1.cluster().forNodeId(c2.cluster().localNode().id())).call(new
TestClosure()));
+
+            //fail();
+        }
+        catch (IgniteException e) {
+            // Expected.
+        }
+
+        fut.get();
+
+        assertEquals(3, cnt.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/70216517/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 8e3411a..2a66833 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.GridSameVmStartupSelfTest;
 import org.apache.ignite.internal.GridSpiExceptionSelfTest;
 import org.apache.ignite.internal.GridVersionSelfTest;
 import org.apache.ignite.internal.IgniteConcurrentEntryProcessorAccessStopTest;
+import org.apache.ignite.internal.IgniteConnectionConcurrentReserveAndRemoveTest;
 import org.apache.ignite.internal.IgniteUpdateNotifierPerClusterSettingSelfTest;
 import org.apache.ignite.internal.managers.GridManagerStopSelfTest;
 import org.apache.ignite.internal.managers.communication.GridCommunicationSendMessageSelfTest;
@@ -127,6 +128,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridLocalEventListenerSelfTest.class);
         suite.addTestSuite(IgniteTopologyPrintFormatSelfTest.class);
         suite.addTestSuite(ComputeJobCancelWithServiceSelfTest.class);
+        suite.addTestSuite(IgniteConnectionConcurrentReserveAndRemoveTest.class);
 
         // Managed Services.
         suite.addTestSuite(GridServiceProcessorSingleNodeSelfTest.class);


Mime
View raw message