ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ascherba...@apache.org
Subject [ignite] branch master updated: IGNITE-13501 Fixed AssertionError in CacheExchangeMergeTest - Fixes #8296.
Date Mon, 05 Oct 2020 07:57:01 GMT
This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c22d9  IGNITE-13501 Fixed AssertionError in CacheExchangeMergeTest - Fixes #8296.
51c22d9 is described below

commit 51c22d9a363360f39ee1b956feba677eb4419238
Author: Vladislav Pyatkov <vldpyatkov@gmail.com>
AuthorDate: Mon Oct 5 10:52:44 2020 +0300

    IGNITE-13501 Fixed AssertionError in CacheExchangeMergeTest - Fixes #8296.
    
    Signed-off-by: Alexey Scherbakov <alexey.scherbakoff@gmail.com>
---
 .../cache/distributed/CacheExchangeMergeTest.java  |  39 ++--
 .../distributed/OnePhaseCommitAndNodeLeftTest.java | 247 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite6.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   2 +
 4 files changed, 277 insertions(+), 13 deletions(-)

diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index dd71438..f170a6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -625,7 +625,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         for (Ignite node : nodes) {
             List<GridDhtPartitionsExchangeFuture> exchFuts =
-                    ((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
+                ((IgniteEx)node).context().cache().context().exchange().exchangeFutures();
 
             assertTrue("Unexpected size: " + exchFuts.size(), !exchFuts.isEmpty() &&
exchFuts.size() <= histSize);
         }
@@ -855,7 +855,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     private void mergeServersFail1(boolean waitRebalance, boolean delayRebalance, int mergeTopVer)
throws Exception {
         testSpi = true;
 
-        final Ignite srv0 = startGrids(5);
+        final IgniteEx srv0 = startGrids(5);
 
         if (waitRebalance)
             awaitPartitionMapExchange();
@@ -882,7 +882,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         if (mergeTopVer == 7) {
             waitForReadyTopology(grid(0).cachex(cacheNames[0]).context().topology(),
-                    new AffinityTopologyVersion(7, 0));
+                new AffinityTopologyVersion(7, 0));
         }
 
         stopGrid(getTestIgniteInstanceName(2), true, false);
@@ -905,7 +905,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         checkCaches0();
 
         assertTrue("Unexpected number of merged disco events: " + mergedEvts.size(),
-                mergedEvts.size() == mergeTopVer - 6);
+            mergedEvts.size() == mergeTopVer - 6);
 
         for (DiscoveryEvent discoEvt : mergedEvts) {
             ClusterNode evtNode = discoEvt.eventNode();
@@ -1458,15 +1458,21 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest
{
                 @Override public void run() {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
+                    CacheConfiguration cCfg = cache.getConfiguration(CacheConfiguration.class);
+
+                    boolean isTxCacheWithouBackups = cCfg.getCacheMode() == PARTITIONED &&
+                        cCfg.getAtomicityMode() == TRANSACTIONAL &&
+                        cCfg.getBackups() == 0;
+
                     assertNotNull("No cache [node=" + node.name() +
-                            ", client=" + node.configuration().isClientMode() +
-                            ", order=" + node.cluster().localNode().order() +
-                            ", cache=" + cacheName + ']', cache);
+                        ", client=" + node.configuration().isClientMode() +
+                        ", order=" + node.cluster().localNode().order() +
+                        ", cache=" + cacheName + ']', cache);
 
                     String err = "Invalid value [node=" + node.name() +
-                            ", client=" + node.configuration().isClientMode() +
-                            ", order=" + node.cluster().localNode().order() +
-                            ", cache=" + cacheName + ']';
+                        ", client=" + node.configuration().isClientMode() +
+                        ", order=" + node.cluster().localNode().order() +
+                        ", cache=" + cacheName + ']';
 
                     for (int i = 0; i < 5; i++) {
                         Integer key = rnd.nextInt(20_000);
@@ -1475,7 +1481,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest
{
 
                         Object val = cache.get(key);
 
-                        assertEquals(err, i, val);
+                        if (isTxCacheWithouBackups)
+                            assertTrue(err, val == null || val.equals(i));
+                        else
+                            assertEquals(err, i, val);
                     }
 
                     for (int i = 0; i < 5; i++) {
@@ -1491,8 +1500,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest
{
 
                         Map<Object, Object> res = cache.getAll(map.keySet());
 
-                        for (Map.Entry<Integer, Integer> e : map.entrySet())
-                            assertEquals(err, e.getValue(), res.get(e.getKey()));
+                        for (Map.Entry<Integer, Integer> e : map.entrySet()) {
+                            if (isTxCacheWithouBackups)
+                                assertTrue(err, res.get(e.getKey()) == null || e.getValue().equals(res.get(e.getKey())));
+                            else
+                                assertEquals(err, e.getValue(), res.get(e.getKey()));
+                        }
                     }
 
                     if (atomicityMode(cache) == TRANSACTIONAL) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OnePhaseCommitAndNodeLeftTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OnePhaseCommitAndNodeLeftTest.java
new file mode 100644
index 0000000..6d7ec5b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/OnePhaseCommitAndNodeLeftTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.typedef.CI3;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+
+/**
+ * Node left a topology when a one phase transaction committing.
+ */
+public class OnePhaseCommitAndNodeLeftTest extends GridCommonAbstractTest {
+    /** Message appears when all owner partition was lost so a cluster do an operation cannot
execute over them. */
+    public static final String LOST_ALL_QWNERS_MSG = "all partition owners have left the
grid, partition data has been lost";
+
+    /** Chache backup count. */
+    private int backups;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setBackups(backups));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Tests an implicit transaction on a cache without backups.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitlyTxZeroBackups() throws Exception {
+        backups = 0;
+
+        startTransactionAndFailPrimary((ignite, key, val) -> ignite.cache(DEFAULT_CACHE_NAME).put(key,
val));
+    }
+
+    /**
+     * Tests an implicit transaction on a cache with one backup.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testImplicitlyTxOneBackups() throws Exception {
+        backups = 1;
+
+        startTransactionAndFailPrimary((ignite, key, val) -> ignite.cache(DEFAULT_CACHE_NAME).put(key,
val));
+    }
+
+    /**
+     * Tests an explicit transaction on a cache without backups.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxZeroBackups() throws Exception {
+        backups = 0;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                startTransactionAndFailPrimary((ignite, key, val) -> {
+                    try (Transaction tx = ignite.transactions().txStart(concurrency, isolation))
{
+                        ignite.cache(DEFAULT_CACHE_NAME).put(key, val);
+
+                        tx.commit();
+                    }
+                });
+
+                stopAllGrids();
+            }
+        }
+    }
+
+    /**
+     * Tests an explicit transaction on a cache with one backup.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxOneBackups() throws Exception {
+        backups = 1;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                startTransactionAndFailPrimary((ignite, key, val) -> {
+                    info("Tx pu: [concurrency=" + concurrency + ", isolation=" + isolation
+ ']');
+
+                    try (Transaction tx = ignite.transactions().txStart(concurrency, isolation))
{
+                        ignite.cache(DEFAULT_CACHE_NAME).put(key, val);
+
+                        tx.commit();
+                    }
+                });
+
+                stopAllGrids();
+            }
+        }
+    }
+
+    /**
+     * Stars cluster and stops exactly primary node for a cache operation specified in parameters.
+     *
+     * @param cacheClosure Closure for a cache operation.
+     * @throws Exception If failed.
+     */
+    private void startTransactionAndFailPrimary(CI3<Ignite, Integer, String> cacheClosure)
throws Exception {
+        Ignite ignite0 = startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache cache = ignite0.cache(DEFAULT_CACHE_NAME);
+
+        Integer key = primaryKey(ignite(1).cache(DEFAULT_CACHE_NAME));
+
+        ClusterNode node1 = ignite0.affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key);
+
+        assertFalse("Found key is local: " + key + " on node " + node1, node1.isLocal());
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ignite0);
+
+        spi.blockMessages((node, msg) -> {
+            if (msg instanceof GridNearTxPrepareRequest) {
+                GridNearTxPrepareRequest putReq = (GridNearTxPrepareRequest)msg;
+
+                if (!F.isEmpty(putReq.writes())) {
+                    int cacheId = putReq.writes().iterator().next().cacheId();
+
+                    if (cacheId == CU.cacheId(DEFAULT_CACHE_NAME)) {
+                        assertTrue(putReq.onePhaseCommit());
+
+                        String nodeName = node.attribute(ATTR_IGNITE_INSTANCE_NAME);
+
+                        return true;
+                    }
+                }
+            }
+
+            return false;
+        });
+
+        String testVal = "Tets value";
+
+        IgniteInternalFuture putFut = GridTestUtils.runAsync(() -> {
+            try {
+                cacheClosure.apply(ignite0, key, testVal);
+
+                //We are not sure, operation completed correctly or not when backups are
zero.
+                //Exception could be thrown or not.
+                // assertTrue(((CacheConfiguration)cache.getConfiguration(CacheConfiguration.class)).getBackups()
!= 0);
+            }
+            catch (Exception e) {
+                checkException(cache, e);
+            }
+        });
+
+        spi.waitForBlocked();
+
+        assertFalse(putFut.isDone());
+
+        ignite(1).close();
+
+        spi.stopBlock();
+
+        try {
+            putFut.get();
+
+            try {
+                assertEquals(testVal, cache.get(key));
+
+                assertTrue(((CacheConfiguration)cache.getConfiguration(CacheConfiguration.class)).getBackups()
!= 0);
+            }
+            catch (Exception e) {
+                checkException(cache, e);
+            }
+        }
+        catch (Exception e) {
+            if (X.hasCause(e, TransactionRollbackException.class))
+                info("Transaction was rolled back [err=" + e.getMessage() + "]");
+        }
+    }
+
+    /**
+     * Checks an exception that happened in the cache specified.
+     *
+     * @param cache Ignite cache.
+     * @param e Checked exception.
+     */
+    private void checkException(IgniteCache cache, Exception e) {
+        log.error("Ex", e);
+
+        Exception ex = X.cause(e, CacheInvalidStateException.class);
+
+        if (ex == null)
+            throw new IgniteException(e);
+
+        assertTrue(ex.getMessage().contains(LOST_ALL_QWNERS_MSG));
+
+        assertEquals(0, ((CacheConfiguration)cache.getConfiguration(CacheConfiguration.class)).getBackups());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index d793ecd..4294a3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStal
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMultiClientsStartTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
+import org.apache.ignite.internal.processors.cache.distributed.OnePhaseCommitAndNodeLeftTest;
 import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
@@ -92,6 +93,7 @@ public class IgniteCacheMvccTestSuite6 {
         ignoredTests.add(CacheParallelStartTest.class);
         ignoredTests.add(IgniteCacheMultiClientsStartTest.class);
         ignoredTests.add(CacheIgniteOutOfMemoryExceptionTest.class);
+        ignoredTests.add(OnePhaseCommitAndNodeLeftTest.class);
 
         // Mixed local/dht tx test.
         ignoredTests.add(TxLocalDhtMixedCacheModesTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 68dca73..0d6eac0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMultiC
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
+import org.apache.ignite.internal.processors.cache.distributed.OnePhaseCommitAndNodeLeftTest;
 import org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManagerTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
@@ -92,6 +93,7 @@ public class IgniteCacheTestSuite6 {
         GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class,
ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OnePhaseCommitAndNodeLeftTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, PendingExchangeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ClientFastReplyCoordinatorFailureTest.class,
ignoredTests);


Mime
View raw message