ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [ignite] branch master updated: IGNITE-12739 Optimistic serializable transactions may fail infinitely when read-through is enabled. (#7575)
Date Mon, 13 Apr 2020 07:24:28 GMT
This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f64fe2c  IGNITE-12739 Optimistic serializable transactions may fail infinitely when
read-through is enabled. (#7575)
f64fe2c is described below

commit f64fe2c5ba1d52038920ee56f42738d0e8a5eb0b
Author: Vladsz83 <vladsz83@gmail.com>
AuthorDate: Mon Apr 13 10:24:09 2020 +0300

    IGNITE-12739 Optimistic serializable transactions may fail infinitely when read-through
is enabled. (#7575)
---
 .../processors/cache/GridCacheContext.java         |   8 +-
 .../distributed/dht/GridPartitionedGetFuture.java  |   2 +-
 .../dht/GridPartitionedSingleGetFuture.java        |   3 +-
 .../cache/distributed/near/GridNearGetFuture.java  |   3 +-
 .../near/GridNearTransactionalCache.java           |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java    |   5 +-
 .../transactions/TxOptimisticReadThroughTest.java  | 216 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite6.java      |   4 +
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   3 +
 9 files changed, 239 insertions(+), 9 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 924cd11..faf48c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2245,16 +2245,18 @@ public class GridCacheContext<K, V> implements Externalizable
{
      * @param affNodes All affinity nodes.
      * @param canRemap Flag indicating that 'get' should be done on a locked topology version.
      * @param partId Partition ID.
+     * @param forcePrimary Force primary flag.
      * @return Affinity node to get key from or {@code null} if there is no suitable alive
node.
      */
     @Nullable public ClusterNode selectAffinityNodeBalanced(
         List<ClusterNode> affNodes,
         Set<ClusterNode> invalidNodes,
         int partId,
-        boolean canRemap
+        boolean canRemap,
+        boolean forcePrimary
     ) {
         if (!readLoadBalancingEnabled) {
-            if (!canRemap) {
+            if (!canRemap && !forcePrimary) {
                 // Find next available node if we can not wait next topology version.
                 for (ClusterNode node : affNodes) {
                     if (ctx.discovery().alive(node) && !invalidNodes.contains(node))
@@ -2270,7 +2272,7 @@ public class GridCacheContext<K, V> implements Externalizable
{
             }
         }
 
-        if (!readFromBackup){
+        if (!readFromBackup || forcePrimary){
             ClusterNode first = affNodes.get(0);
 
             return !invalidNodes.contains(first) ? first : null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index d1706e1..da8a738 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -355,7 +355,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             if (tryLocalGet(key, part, topVer, affNodes, locVals))
                 return false;
 
-            node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, part, canRemap);
+            node = cctx.selectAffinityNodeBalanced(affNodes, invalidNodeSet, part, canRemap,
forcePrimary);
         }
 
         // Failed if none remote node found.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 08abf5e..b9e9364 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -377,7 +377,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
         if (tryLocalGet(key, part, topVer, affNodes))
             return null;
 
-        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, getInvalidNodes(),
part, canRemap);
+        ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, getInvalidNodes(),
part, canRemap,
+            forcePrimary);
 
         // Failed if none balanced node found.
         if (affNode == null) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index d4513b7..5b22ab1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -403,7 +403,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                     Set<ClusterNode> invalidNodesSet = getInvalidNodes(part, topVer);
 
-                    ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet,
part, canRemap);
+                    ClusterNode affNode = cctx.selectAffinityNodeBalanced(affNodes, invalidNodesSet,
part, canRemap,
+                        forcePrimary);
 
                     if (affNode == null) {
                         onDone(serverNotFoundError(part, topVer));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a9eec8e..c50edc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -177,6 +177,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K,
V>
      * @param tx Transaction.
      * @param keys Keys to load.
      * @param readThrough Read through flag.
+     * @param forcePrimary Force primary flag.
      * @param deserializeBinary Deserialize binary flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
@@ -187,6 +188,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K,
V>
         AffinityTopologyVersion topVer,
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
+        boolean forcePrimary,
         boolean deserializeBinary,
         boolean recovery,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
@@ -197,7 +199,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K,
V>
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             readThrough,
-            /*force primary*/needVer || !ctx.config().isReadFromBackup(),
+            forcePrimary,
             tx,
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 41cd597..e404b65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3101,6 +3101,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                 topVer,
                 keys,
                 readThrough,
+                needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() &&
serializable() && readThrough),
                 /*deserializeBinary*/false,
                 recovery,
                 expiryPlc0,
@@ -3165,7 +3166,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                 return cacheCtx.colocated().loadAsync(
                     key,
                     readThrough,
-                    /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
+                    needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() &&
serializable() && readThrough),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
@@ -3198,7 +3199,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements
GridTimeou
                 return cacheCtx.colocated().loadAsync(
                     keys,
                     readThrough,
-                    /*force primary*/needVer || !cacheCtx.config().isReadFromBackup(),
+                    needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() &&
serializable() && readThrough),
                     topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
new file mode 100644
index 0000000..f97a5ab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticReadThroughTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Tests optimistic tx with read/through cache.
+ */
+public class TxOptimisticReadThroughTest extends GridCommonAbstractTest {
+    /** Test nodes count. */
+    protected static final int NODE_CNT = 2;
+
+    /** Shared read/write-through store. */
+    private static final Map<Object, Object> storeMap = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        // Different adresses not to pickup only first node when searching value wihit transaction.
+        cfg.setUserAttributes(Collections.singletonMap(
+            IgniteNodeAttributes.ATTR_MACS_OVERRIDE, UUID.randomUUID().toString()));
+
+        return cfg;
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for actual value version
when read-through is
+     *  enabled for replicated transactional cache.
+     */
+    @Test
+    public void testReplicated() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(REPLICATED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, false);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for actual value version
when read-through is
+     *  enabled for partitioned transactional cache.
+     */
+    @Test
+    public void testPartitioned() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, false);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for actual value version
when read-through is
+     *  enabled for near partitioned transactional cache.
+     */
+    @Test
+    public void testNearPartitioned() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, true);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for actual value version
when read-through is
+     *  enabled for near replicated transactional cache.
+     */
+    @Test
+    public void testNearReplicated() throws Exception {
+        CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration();
+
+        cacheCfg.setCacheMode(REPLICATED);
+
+        checkOptimisticSerializableTransaction(cacheCfg, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids(true);
+    }
+
+    /**
+     *  Checks optimistic serializable transaction asks primary node for actual value version
when read-throug is
+     *  enabled for a transactional cache.
+     *
+     * @param cacheCfg Transactional cache configuration for the testing cache.
+     * @param near {@code True} to check transaction with near cache from a client node.
{@code False} for server node.
+     */
+    private void checkOptimisticSerializableTransaction(CacheConfiguration<Object, Object>
cacheCfg, boolean near)
+        throws Exception {
+        startGrids(NODE_CNT);
+
+        final IgniteCache<Object, Object> cache0 = grid(0).getOrCreateCache(cacheCfg);
+
+        final IgniteCache<Object, Object> txCache;
+        final IgniteEx txGrid;
+
+        if (near) {
+            txGrid = startClientGrid();
+
+            txCache = txGrid.createNearCache(cacheCfg.getName(), new NearCacheConfiguration<>());
+        }
+        else {
+            txGrid = grid(1);
+
+            txCache = grid(1).cache(cacheCfg.getName());
+        }
+
+        List<Integer> primaryKeys = primaryKeys(cache0, 3);
+
+        primaryKeys.forEach(k -> cache0.put(k, k));
+
+        primaryKeys.forEach(k -> cache0.localClear(k));
+
+        primaryKeys.forEach(k -> assertEquals(k, cache0.get(k)));
+
+        try (Transaction tx = txGrid.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+            // Force requesting value from primary node by one key.
+            txCache.get(primaryKeys.get(0));
+
+            // Force requesting value from primary node by keys batch.
+            txCache.getAll(new HashSet<>(primaryKeys.subList(1, primaryKeys.size())));
+
+            primaryKeys.forEach(k -> txCache.put(k, k + 1));
+
+            tx.commit();
+        }
+
+        for (int i = 0; i < NODE_CNT; ++i) {
+            IgniteCache<Object, Object> cache = grid(i).cache(cacheCfg.getName());
+
+            primaryKeys.forEach(k -> assertEquals(k + 1, cache.get(k)));
+        }
+    }
+
+    /** @return Default configuration of a transactional cache cache. */
+    private static CacheConfiguration<Object, Object> cacheConfiguration(){
+        return new CacheConfiguration<>("tx")
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setCacheStoreFactory(new TestStoreFactory())
+            .setAtomicityMode(TRANSACTIONAL)
+            .setBackups(1)
+            .setReadThrough(true)
+            .setWriteThrough(true);
+    }
+
+    /** Shared read/write-through store factory. */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>>
{
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter<Object, Object>() {
+                /** {@inheritDoc} */
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return storeMap.get(key);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException
{
+                    storeMap.put(entry.getKey(), entry.getValue());
+                }
+
+                /** {@inheritDoc} */
+                @Override public void delete(Object key) throws CacheWriterException {
+                    storeMap.remove(key);
+                }
+            };
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index bd46b72..949ff41 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat
 import org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutOnePhaseCommitTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
 import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -98,6 +99,9 @@ public class IgniteCacheMvccTestSuite6 {
         ignoredTests.add(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class);
// See PartitionedMvccTxPessimisticCacheGetsDistributionTest.
         ignoredTests.add(ReplicatedTransactionalPessimisticCacheGetsDistributionTest.class);
//See ReplicatedMvccTxPessimisticCacheGetsDistributionTest
 
+        // Read-through is not allowed with MVCC and transactional cache.
+        ignoredTests.add(TxOptimisticReadThroughTest.class);
+
         List<Class<?>> suite = new ArrayList<>((IgniteCacheTestSuite6.suite(ignoredTests)));
 
         // Add mvcc versions for skipped tests.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 450d766..4898008 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStartT
 import org.apache.ignite.internal.processors.cache.transactions.TxOnCachesStopTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticReadThroughTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncNearCacheTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncTest;
 import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
@@ -128,6 +129,8 @@ public class IgniteCacheTestSuite6 {
 
         GridTestUtils.addTestIfNeeded(suite, TxOptimisticOnPartitionExchangeTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, TxOptimisticReadThroughTest.class, ignoredTests);
+
         GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerCoordinatorFailTest.class,
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteExchangeLatchManagerDiscoHistoryTest.class,
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ExchangeLatchManagerTest.class, ignoredTests);


Mime
View raw message