ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [2/5] ignite git commit: IGNITE-8476 AssertionError exception occurs when trying to remove node from baseline under loading
Date Sat, 09 Jun 2018 20:28:02 GMT
IGNITE-8476 AssertionError exception occurs when trying to remove node from baseline under loading


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d519ef51
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d519ef51
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d519ef51

Branch: refs/heads/ignite-2.6
Commit: d519ef5164f6241f3a98671281103398f154a470
Parents: b1c6f3d
Author: Ivan Rakov <irakov@apache.org>
Authored: Thu May 31 16:56:14 2018 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Sat Jun 9 23:22:50 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  12 +-
 .../ignite/internal/IgniteNodeAttributes.java   |   3 +
 .../processors/cache/CacheGroupContext.java     |  13 +-
 .../processors/cache/ClusterCachesInfo.java     |  58 +-
 .../processors/cache/GridCacheProcessor.java    |   1 +
 ...lientAffinityAssignmentWithBaselineTest.java | 974 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   2 +
 7 files changed, 1056 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 046c4b8..e42e5dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -200,10 +200,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.mxbean.ClusterMetricsMXBean;
 import org.apache.ignite.mxbean.IgniteMXBean;
 import org.apache.ignite.mxbean.StripedExecutorMXBean;
-import org.apache.ignite.mxbean.WorkersControlMXBean;
 import org.apache.ignite.mxbean.ThreadPoolMXBean;
-import org.apache.ignite.mxbean.TransactionsMXBean;
 import org.apache.ignite.mxbean.TransactionMetricsMxBean;
+import org.apache.ignite.mxbean.TransactionsMXBean;
+import org.apache.ignite.mxbean.WorkersControlMXBean;
 import org.apache.ignite.plugin.IgnitePlugin;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
@@ -1569,7 +1569,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         // Stick in network context into attributes.
         add(ATTR_IPS, (ips.isEmpty() ? "" : ips));
-        add(ATTR_MACS, (macs.isEmpty() ? "" : macs));
+
+        Map<String, ?> userAttrs = configuration().getUserAttributes();
+
+        if (userAttrs != null && userAttrs.get(IgniteNodeAttributes.ATTR_MACS_OVERRIDE) != null)
+            add(ATTR_MACS, (Serializable)userAttrs.get(IgniteNodeAttributes.ATTR_MACS_OVERRIDE));
+        else
+            add(ATTR_MACS, (macs.isEmpty() ? "" : macs));
 
         // Stick in some system level attributes
         add(ATTR_JIT_NAME, U.getCompilerMx() == null ? "" : U.getCompilerMx().getName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 073369f..6a4beeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -120,6 +120,9 @@ public final class IgniteNodeAttributes {
     /** Internal attribute name constant. */
     public static final String ATTR_MACS = ATTR_PREFIX + ".macs";
 
+    /** Allows to override {@link #ATTR_MACS} by adding this attribute in the user attributes. */
+    public static final String ATTR_MACS_OVERRIDE = "override." + ATTR_MACS;
+
     /** Internal attribute name constant. */
     public static final String ATTR_PHY_RAM = ATTR_PREFIX + ".phy.ram";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 5f750d5..4f9b7f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -133,6 +133,9 @@ public class CacheGroupContext {
     /** */
     private final DataRegion dataRegion;
 
+    /** Persistence enabled flag. */
+    private final boolean persistenceEnabled;
+
     /** */
     private final CacheObjectContext cacheObjCtx;
 
@@ -158,8 +161,8 @@ public class CacheGroupContext {
     private volatile boolean globalWalEnabled;
 
     /**
-     * @param grpId Group ID.
      * @param ctx Context.
+     * @param grpId Group ID.
      * @param rcvdFrom Node ID cache group was received from.
      * @param cacheType Cache type.
      * @param ccfg Cache configuration.
@@ -169,6 +172,7 @@ public class CacheGroupContext {
      * @param freeList Free list.
      * @param reuseList Reuse list.
      * @param locStartVer Topology version when group was started on local node.
+     * @param persistenceEnabled Persistence enabled flag.
      * @param walEnabled Wal enabled flag.
      */
     CacheGroupContext(
@@ -183,7 +187,9 @@ public class CacheGroupContext {
         FreeList freeList,
         ReuseList reuseList,
         AffinityTopologyVersion locStartVer,
-        boolean walEnabled) {
+        boolean persistenceEnabled,
+        boolean walEnabled
+    ) {
         assert ccfg != null;
         assert dataRegion != null || !affNode;
         assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']';
@@ -200,6 +206,7 @@ public class CacheGroupContext {
         this.locStartVer = locStartVer;
         this.cacheType = cacheType;
         this.globalWalEnabled = walEnabled;
+        this.persistenceEnabled = persistenceEnabled;
         this.localWalEnabled = true;
 
         persistGlobalWalState(walEnabled);
@@ -918,7 +925,7 @@ public class CacheGroupContext {
      * @return Persistence enabled flag.
      */
     public boolean persistenceEnabled() {
-        return dataRegion != null && dataRegion.config().isPersistenceEnabled();
+        return persistenceEnabled;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 975617e..9ff84e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -38,10 +38,12 @@ import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridCachePluginContext;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.CachePluginContext;
 import org.apache.ignite.plugin.CachePluginProvider;
 import org.apache.ignite.plugin.PluginProvider;
@@ -1752,7 +1755,7 @@ class ClusterCachesInfo {
 
         Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId);
 
-        boolean persistent = CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration());
+        boolean persistent = resolvePersistentFlag(exchActions, startedCacheCfg);
 
         CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
             startedCacheCfg,
@@ -1782,6 +1785,59 @@ class ClusterCachesInfo {
     }
 
     /**
+     * Resolves persistent flag for new cache group descriptor.
+     *
+     * @param exchActions Optional exchange actions to update if new group was added.
+     * @param startedCacheCfg Started cache configuration.
+     */
+    private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions, CacheConfiguration<?, ?> startedCacheCfg) {
+        if (!ctx.clientNode()) {
+            // On server, we always can determine whether cache is persistent by local storage configuration.
+            return CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration());
+        }
+        else if (exchActions == null) {
+            // It's either client local join event or cache is statically configured on another node.
+            // No need to resolve on client - we'll anyway receive group descriptor from server with correct flag.
+            return false;
+        }
+        else {
+            // Dynamic cache start. Initiator of the start may not have known whether cache should be persistent.
+            // On client, we should peek attributes of any affinity server node to get data storage configuration.
+            Collection<ClusterNode> aliveSrvNodes = ctx.discovery().aliveServerNodes();
+
+            assert !aliveSrvNodes.isEmpty() : "No alive server nodes";
+
+            for (ClusterNode srvNode : aliveSrvNodes) {
+                if (CU.affinityNode(srvNode, startedCacheCfg.getNodeFilter())) {
+                    Object dsCfgBytes = srvNode.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG);
+
+                    if (dsCfgBytes instanceof byte[]) {
+                        try {
+                            DataStorageConfiguration crdDsCfg = new JdkMarshaller().unmarshal(
+                                (byte[])dsCfgBytes, U.resolveClassLoader(ctx.config()));
+
+                            return CU.isPersistentCache(startedCacheCfg, crdDsCfg);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to unmarshal remote data storage configuration [remoteNode=" +
+                                srvNode + ", cacheName=" + startedCacheCfg.getName() + "]", e);
+                        }
+                    }
+                    else {
+                        U.error(log, "Remote marshalled data storage configuration is absent [remoteNode=" + srvNode +
+                            ", cacheName=" + startedCacheCfg.getName() + ", dsCfg=" + dsCfgBytes + "]");
+                    }
+                }
+            }
+
+            U.error(log, "Failed to find affinity server node with data storage configuration for starting cache " +
+                "[cacheName=" + startedCacheCfg.getName() + ", aliveSrvNodes=" + aliveSrvNodes + "]");
+
+            return false;
+        }
+    }
+
+    /**
      * @param ccfg Cache configuration to start.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ccccdec..b8e1bcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2037,6 +2037,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             freeList,
             reuseList,
             exchTopVer,
+            desc.persistenceEnabled(),
             desc.walEnabled()
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
new file mode 100644
index 0000000..15ec415
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java
@@ -0,0 +1,974 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.baseline;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Checks that client affinity assignment cache is calculated correctly regardless of current baseline topology.
+ */
+public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int DEFAULT_NODES_COUNT = 5;
+
+    /** Tx cache name. */
+    private static final String PARTITIONED_TX_CACHE_NAME = "p-tx-cache";
+
+    /** Tx cache name with shifted affinity. */
+    private static final String PARTITIONED_TX_PRIM_SYNC_CACHE_NAME = "prim-sync";
+
+    /** Tx cache name from client static configuration. */
+    private static final String PARTITIONED_TX_CLIENT_CACHE_NAME = "p-tx-client-cache";
+
+    /** Atomic cache name. */
+    private static final String PARTITIONED_ATOMIC_CACHE_NAME = "p-atomic-cache";
+
+    /** Tx cache name. */
+    private static final String REPLICATED_TX_CACHE_NAME = "r-tx-cache";
+
+    /** Atomic cache name. */
+    private static final String REPLICATED_ATOMIC_CACHE_NAME = "r-atomic-cache";
+
+    /** Client grid name. */
+    private static final String CLIENT_GRID_NAME = "client";
+
+    /** Flaky node name */
+    private static final String FLAKY_NODE_NAME = "flaky";
+
+    /** Entries. */
+    private static final int ENTRIES = 3_000;
+
+    /** Flaky node wal path. */
+    public static final String FLAKY_WAL_PATH = "flakywal";
+
+    /** Flaky node wal archive path. */
+    public static final String FLAKY_WAL_ARCHIVE_PATH = "flakywalarchive";
+
+    /** Flaky node storage path. */
+    public static final String FLAKY_STORAGE_PATH = "flakystorage";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.startsWith(CLIENT_GRID_NAME)) {
+            // Intentionally skipping data storage in client configuration.
+            cfg.setClientMode(true);
+        }
+        else {
+            cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                            .setMaxSize(200 * 1024 * 1024)
+                    )
+            );
+        }
+
+        if (igniteInstanceName.contains(FLAKY_NODE_NAME)) {
+            File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+            cfg.getDataStorageConfiguration().setWalPath(new File(store, FLAKY_WAL_PATH).getAbsolutePath());
+            cfg.getDataStorageConfiguration().setWalArchivePath(new File(store, FLAKY_WAL_ARCHIVE_PATH).getAbsolutePath());
+            cfg.getDataStorageConfiguration().setStoragePath(new File(store, FLAKY_STORAGE_PATH).getAbsolutePath());
+        }
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        List<CacheConfiguration> srvConfigs = new ArrayList<>();
+        srvConfigs.add(cacheConfig(PARTITIONED_TX_CACHE_NAME));
+        srvConfigs.add(cacheConfig(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME));
+        srvConfigs.add(cacheConfig(REPLICATED_ATOMIC_CACHE_NAME));
+
+        List<CacheConfiguration> clientConfigs = new ArrayList<>(srvConfigs);
+
+        // Skip some configs in client static configuration to check that clients receive correct cache descriptors.
+        srvConfigs.add(cacheConfig(PARTITIONED_ATOMIC_CACHE_NAME));
+        srvConfigs.add(cacheConfig(REPLICATED_TX_CACHE_NAME));
+
+        // Skip config in server static configuration to check that caches received on client join start correctly.
+        clientConfigs.add(cacheConfig(PARTITIONED_TX_CLIENT_CACHE_NAME));
+
+        if (igniteInstanceName.startsWith(CLIENT_GRID_NAME))
+            cfg.setCacheConfiguration(clientConfigs.toArray(new CacheConfiguration[clientConfigs.size()]));
+        else
+            cfg.setCacheConfiguration(srvConfigs.toArray(new CacheConfiguration[srvConfigs.size()]));
+
+        // Enforce different mac adresses to emulate distributed environment by default.
+        cfg.setUserAttributes(Collections.singletonMap(
+            IgniteNodeAttributes.ATTR_MACS_OVERRIDE, UUID.randomUUID().toString()));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     */
+    private CacheConfiguration<Integer, String> cacheConfig(String cacheName) {
+        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>();
+
+        if (PARTITIONED_ATOMIC_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(PARTITIONED_ATOMIC_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+            cfg.setBackups(2);
+        }
+        else if (PARTITIONED_TX_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(PARTITIONED_TX_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+            cfg.setBackups(2);
+        }
+        else if (PARTITIONED_TX_CLIENT_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(PARTITIONED_TX_CLIENT_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+            cfg.setBackups(2);
+        }
+        else if (PARTITIONED_TX_PRIM_SYNC_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 41)); // To break collocation.
+            cfg.setBackups(2);
+        }
+        else if (REPLICATED_ATOMIC_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(REPLICATED_ATOMIC_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+            cfg.setCacheMode(CacheMode.REPLICATED);
+        }
+        else if (REPLICATED_TX_CACHE_NAME.equals(cacheName)) {
+            cfg.setName(REPLICATED_TX_CACHE_NAME);
+            cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+            cfg.setCacheMode(CacheMode.REPLICATED);
+        }
+        else
+            throw new IllegalArgumentException("Unexpected cache name");
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    public void testPartitionedAtomicCache() throws Exception {
+        testChangingBaselineDown(PARTITIONED_ATOMIC_CACHE_NAME, false);
+    }
+
+    /**
+     *
+     */
+    public void testPartitionedTxCache() throws Exception {
+        testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, false);
+    }
+
+    /**
+     * Test that activation after client join won't break cache.
+     */
+    public void testLateActivation() throws Exception {
+        testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, true);
+    }
+
+    /**
+     *
+     */
+    public void testReplicatedAtomicCache() throws Exception {
+        testChangingBaselineDown(REPLICATED_ATOMIC_CACHE_NAME, false);
+    }
+
+    /**
+     *
+     */
+    public void testReplicatedTxCache() throws Exception {
+        testChangingBaselineDown(REPLICATED_TX_CACHE_NAME, false);
+    }
+
+    /**
+     * Tests that changing baseline down under load won't break cache.
+     */
+    private void testChangingBaselineDown(String cacheName, boolean lateActivation) throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT);
+
+        IgniteEx client1 = null;
+        IgniteEx client2 = null;
+
+        if (lateActivation) {
+            client1 = (IgniteEx)startGrid("client1");
+            client2 = (IgniteEx)startGrid("client2");
+        }
+        else
+            ig0.cluster().active(true);
+
+        AtomicBoolean stopLoad = new AtomicBoolean(false);
+
+        AtomicReference<Throwable> loadError = new AtomicReference<>(null);
+
+        if (lateActivation)
+            ig0.cluster().active(true);
+
+        IgniteCache<Integer, String> cache = ig0.cache(cacheName);
+
+        System.out.println("### Starting preloading");
+
+        for (int i = 0; i < ENTRIES; i++) {
+            ThreadLocalRandom r = ThreadLocalRandom.current();
+
+            byte[] randBytes = new byte[r.nextInt(10, 100)];
+
+            cache.put(r.nextInt(ENTRIES), new String(randBytes));
+        }
+
+        System.out.println("### Preloading is finished");
+
+        if (!lateActivation) {
+            client1 = (IgniteEx)startGrid("client1");
+            client2 = (IgniteEx)startGrid("client2");
+        }
+
+        ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
+
+        startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
+        startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
+        startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        List<BaselineNode> fullBlt = new ArrayList<>();
+        for (int i = 0; i < DEFAULT_NODES_COUNT; i++)
+            fullBlt.add(grid(i).localNode());
+
+        stopGrid(DEFAULT_NODES_COUNT - 1, true);
+        stopGrid(DEFAULT_NODES_COUNT - 2, true);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker);
+        tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker);
+
+        stopLoad.set(true);
+    }
+
+    /**
+     * Tests that rejoin of baseline node with clear LFS under load won't break cache.
+     */
+    public void testRejoinWithCleanLfs() throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT - 1);
+        startGrid("flaky");
+
+        ig0.cluster().active(true);
+
+        AtomicBoolean stopLoad = new AtomicBoolean(false);
+
+        AtomicReference<Throwable> loadError = new AtomicReference<>(null);
+
+        IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_ATOMIC_CACHE_NAME);
+        IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_CACHE_NAME);
+        IgniteCache<Integer, String> cache3 = ig0.cache(REPLICATED_ATOMIC_CACHE_NAME);
+        IgniteCache<Integer, String> cache4 = ig0.cache(REPLICATED_TX_CACHE_NAME);
+
+        System.out.println("### Starting preloading");
+
+        for (int i = 0; i < ENTRIES; i++) {
+            ThreadLocalRandom r = ThreadLocalRandom.current();
+
+            cache1.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
+            cache2.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
+            cache3.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
+            cache4.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)]));
+        }
+
+        System.out.println("### Preloading is finished");
+
+        IgniteEx client1 = (IgniteEx)startGrid("client1");
+        IgniteEx client2 = (IgniteEx)startGrid("client2");
+
+        ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
+
+        startSimpleLoadThread(client1, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+        startSimpleLoadThread(client1, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+        startSimpleLoadThread(client1, REPLICATED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+        startTxLoadThread(client2, REPLICATED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        stopGrid("flaky");
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+        U.delete(new File(store, FLAKY_WAL_PATH));
+        U.delete(new File(store, FLAKY_WAL_ARCHIVE_PATH));
+        U.delete(new File(store, FLAKY_STORAGE_PATH));
+
+        startGrid("flaky");
+
+        System.out.println("### Starting rebalancing after flaky node join");
+        waitForRebalancing();
+        System.out.println("### Rebalancing is finished after flaky node join");
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        stopLoad.set(true);
+    }
+
+    /**
+     * Test that changing baseline down under cross-cache txs load won't break cache.
+     */
+    public void testCrossCacheTxs() throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT);
+
+        ig0.cluster().active(true);
+
+        AtomicBoolean stopLoad = new AtomicBoolean(false);
+
+        AtomicReference<Throwable> loadError = new AtomicReference<>(null);
+
+        String cacheName1 = PARTITIONED_TX_CACHE_NAME;
+        String cacheName2 = PARTITIONED_TX_PRIM_SYNC_CACHE_NAME;
+
+        IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_TX_CACHE_NAME);
+        IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME);
+
+        System.out.println("### Starting preloading");
+
+        for (int i = 0; i < ENTRIES; i++) {
+            ThreadLocalRandom r = ThreadLocalRandom.current();
+
+            byte[] randBytes1 = new byte[r.nextInt(10, 100)];
+            byte[] randBytes2 = new byte[r.nextInt(10, 100)];
+
+            cache1.put(r.nextInt(ENTRIES), new String(randBytes1));
+            cache2.put(r.nextInt(ENTRIES), new String(randBytes2));
+        }
+
+        System.out.println("### Preloading is finished");
+
+        IgniteEx client1 = (IgniteEx)startGrid("client1");
+        IgniteEx client2 = (IgniteEx)startGrid("client2");
+
+        ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>();
+
+        startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
+        startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
+        startCrossCacheTxLoadThread(client1, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker);
+        startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
+        startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker);
+        startCrossCacheTxLoadThread(client2, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        List<BaselineNode> fullBlt = new ArrayList<>();
+        for (int i = 0; i < DEFAULT_NODES_COUNT; i++)
+            fullBlt.add(grid(i).localNode());
+
+        stopGrid(DEFAULT_NODES_COUNT - 1, true);
+        stopGrid(DEFAULT_NODES_COUNT - 2, true);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker);
+        tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker);
+
+        stopLoad.set(true);
+    }
+
+    /**
+     * Tests that join of non-baseline node while long transactions are running won't break dynamically started cache.
+     */
+    public void testDynamicCacheLongTransactionNodeStart() throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(4);
+
+        ig0.cluster().active(true);
+
+        IgniteEx client = (IgniteEx)startGrid("client");
+
+        CacheConfiguration<Integer, String> dynamicCacheCfg = cacheConfig(REPLICATED_TX_CACHE_NAME);
+        dynamicCacheCfg.setName("dyn");
+
+        IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg);
+
+        for (int i = 0; i < ENTRIES; i++)
+            dynamicCache.put(i, "abacaba" + i);
+
+        AtomicBoolean releaseTx = new AtomicBoolean(false);
+        CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
+
+        for (int i = 0; i < 10; i++) {
+            final int i0 = i;
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+                        TransactionIsolation.REPEATABLE_READ)) {
+                        dynamicCache.put(i0, "txtxtxtx" + i0);
+
+                        while (!releaseTx.get())
+                            LockSupport.parkNanos(1_000_000);
+
+                        tx.commit();
+
+                        System.out.println("Tx #" + i0 + " committed");
+                    }
+                    catch (Throwable t) {
+                        System.out.println("Tx #" + i0 + " failed");
+
+                        t.printStackTrace();
+                    }
+                    finally {
+                        allTxsDoneLatch.countDown();
+                    }
+                }
+            });
+        }
+
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    startGrid(4);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        U.sleep(1_000);
+
+        releaseTx.set(true);
+
+        allTxsDoneLatch.await();
+
+        for (int i = 0; i < 10_000; i++)
+            assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10));
+    }
+
+    /**
+     * Tests that if dynamic cache has no affinity nodes at the moment of start,
+     * it will still work correctly when affinity nodes will appear.
+     */
+    public void testDynamicCacheStartNoAffinityNodes() throws Exception {
+        fail("IGNITE-8652");
+
+        IgniteEx ig0 = startGrid(0);
+
+        ig0.cluster().active(true);
+        
+        IgniteEx client = (IgniteEx)startGrid("client");
+
+        CacheConfiguration<Integer, String> dynamicCacheCfg = new CacheConfiguration<Integer, String>()
+            .setName("dyn")
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setBackups(2)
+            .setNodeFilter(new ConsistentIdNodeFilter((Serializable)ig0.localNode().consistentId()));
+
+        IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg);
+        
+        for (int i = 1; i < 4; i++)
+            startGrid(i);
+
+        resetBaselineTopology();
+
+        for (int i = 0; i < ENTRIES; i++)
+            dynamicCache.put(i, "abacaba" + i);
+        
+        AtomicBoolean releaseTx = new AtomicBoolean(false);
+        CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
+        
+        for (int i = 0; i < 10; i++) {
+            final int i0 = i;
+            
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+                        TransactionIsolation.REPEATABLE_READ)) {
+                        dynamicCache.put(i0, "txtxtxtx" + i0);
+                        
+                        while (!releaseTx.get())
+                            LockSupport.parkNanos(1_000_000);
+                        
+                        tx.commit();
+                        
+                        System.out.println("Tx #" + i0 + " committed");
+                    }
+                    catch (Throwable t) {
+                        System.out.println("Tx #" + i0 + " failed");
+                        
+                        t.printStackTrace();
+                    }
+                    finally {
+                        allTxsDoneLatch.countDown();
+                    }
+                }
+            });
+        }
+        
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    startGrid(4);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        U.sleep(1_000);
+
+        releaseTx.set(true);
+
+        allTxsDoneLatch.await();
+
+        for (int i = 0; i < 10_000; i++)
+            assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10));
+    }
+
+    /**
+     * Tests that join of non-baseline node while long transactions are running won't break cache started on client join.
+     */
+    public void testClientJoinCacheLongTransactionNodeStart() throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(4);
+
+        ig0.cluster().active(true);
+
+        IgniteEx client = (IgniteEx)startGrid("client");
+
+        IgniteCache<Integer, String> clientJoinCache = client.cache(PARTITIONED_TX_CLIENT_CACHE_NAME);
+
+        for (int i = 0; i < ENTRIES; i++)
+            clientJoinCache.put(i, "abacaba" + i);
+
+        AtomicBoolean releaseTx = new AtomicBoolean(false);
+        CountDownLatch allTxsDoneLatch = new CountDownLatch(10);
+
+        for (int i = 0; i < 10; i++) {
+            final int i0 = i;
+
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+                        TransactionIsolation.REPEATABLE_READ)) {
+                        clientJoinCache.put(i0, "txtxtxtx" + i0);
+
+                        while (!releaseTx.get())
+                            LockSupport.parkNanos(1_000_000);
+
+                        tx.commit();
+
+                        System.out.println("Tx #" + i0 + " committed");
+                    }
+                    catch (Throwable t) {
+                        System.out.println("Tx #" + i0 + " failed");
+
+                        t.printStackTrace();
+                    }
+                    finally {
+                        allTxsDoneLatch.countDown();
+                    }
+                }
+            });
+        }
+
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    startGrid(4);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        U.sleep(1_000);
+
+        releaseTx.set(true);
+
+        allTxsDoneLatch.await();
+
+        for (int i = 0; i < 10_000; i++)
+            assertEquals("txtxtxtx" + (i % 10), clientJoinCache.get(i % 10));
+    }
+
+    /**
+     * @param ig0 Ignite.
+     * @param fullBlt Initial BLT list.
+     * @param newBaselineSize New baseline size.
+     * @param threadProgressTracker Thread progress tracker.
+     */
+    private void tryChangeBaselineDown(
+        IgniteEx ig0,
+        List<BaselineNode> fullBlt,
+        int newBaselineSize,
+        AtomicReference<Throwable> loadError,
+        ConcurrentMap<Long, Long> threadProgressTracker
+    ) throws Exception {
+        System.out.println("### Changing BLT: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
+        ig0.cluster().setBaselineTopology(fullBlt.subList(0, newBaselineSize));
+
+        System.out.println("### Starting rebalancing after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
+        waitForRebalancing();
+        System.out.println("### Rebalancing is finished after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize);
+
+        awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker);
+
+        if (loadError.get() != null) {
+            loadError.get().printStackTrace();
+
+            fail("Unexpected error in load thread: " + loadError.get().toString());
+        }
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @param cacheName Cache name.
+     * @param stopFlag Stop flag.
+     * @param loadError Load error reference.
+     * @param threadProgressTracker Progress tracker.
+     */
+    private void startSimpleLoadThread(
+        IgniteEx ig,
+        String cacheName,
+        AtomicBoolean stopFlag,
+        AtomicReference<Throwable> loadError,
+        ConcurrentMap<Long, Long> threadProgressTracker
+    ) {
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                ThreadLocalRandom r = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, String> cache = ig.cache(cacheName);
+
+                try {
+                    while (!stopFlag.get()) {
+                        try {
+                            int op = r.nextInt(3);
+
+                            switch (op) {
+                                case 0:
+                                    byte[] randBytes = new byte[r.nextInt(10, 100)];
+
+                                    cache.put(r.nextInt(ENTRIES), new String(randBytes));
+
+                                    break;
+                                case 1:
+                                    cache.remove(r.nextInt(ENTRIES));
+
+                                    break;
+                                case 2:
+                                    cache.get(r.nextInt(ENTRIES));
+
+                                    break;
+                            }
+
+                            threadProgressTracker.compute(Thread.currentThread().getId(),
+                                (tId, ops) -> ops == null ? 1 : ops + 1);
+                        }
+                        catch (CacheException e) {
+                            if (e.getCause() instanceof ClusterTopologyException)
+                                ((ClusterTopologyException)e.getCause()).retryReadyFuture().get();
+                        }
+                        catch (ClusterTopologyException e) {
+                            e.retryReadyFuture().get();
+                        }
+                    }
+                }
+                catch (Throwable t) {
+                    loadError.compareAndSet(null, t);
+
+                    stopFlag.set(true);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @param cacheName Cache name.
+     * @param stopFlag Stop flag.
+     * @param loadError Load error reference.
+     * @param threadProgressTracker Progress tracker.
+     */
+    private void startTxLoadThread(
+        IgniteEx ig,
+        String cacheName,
+        AtomicBoolean stopFlag,
+        AtomicReference<Throwable> loadError,
+        ConcurrentMap<Long, Long> threadProgressTracker
+    ) {
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                ThreadLocalRandom r = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, String> cache = ig.cache(cacheName);
+
+                boolean pessimistic = r.nextBoolean();
+
+                boolean rollback = r.nextBoolean();
+
+                try {
+                    while (!stopFlag.get()) {
+                        try (Transaction tx = ig.transactions().txStart(
+                            pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC,
+                            TransactionIsolation.REPEATABLE_READ
+                        )) {
+                            int key1 = -1;
+                            String val1 = null;
+                            while (val1 == null) {
+                                key1 = r.nextInt(ENTRIES);
+                                val1 = cache.get(key1);
+                            }
+
+                            int key2 = -1;
+                            String val2 = null;
+                            while (val2 == null) {
+                                key2 = r.nextInt(ENTRIES);
+                                val2 = cache.get(key2);
+                            }
+
+                            cache.put(key1, val2);
+                            cache.put(key2, val1);
+
+                            if (rollback)
+                                tx.rollback();
+                            else
+                                tx.commit();
+
+                            threadProgressTracker.compute(Thread.currentThread().getId(),
+                                (tId, ops) -> ops == null ? 1 : ops + 1);
+                        }
+                        catch (CacheException e) {
+                            if (e.getCause() instanceof ClusterTopologyException)
+                                ((ClusterTopologyException)e.getCause()).retryReadyFuture().get();
+                        }
+                        catch (ClusterTopologyException e) {
+                            e.retryReadyFuture().get();
+                        }
+                    }
+                }
+                catch (Throwable t) {
+                    loadError.compareAndSet(null, t);
+
+                    stopFlag.set(true);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param ig Ignite instance.
+     * @param cacheName1 Cache name 1.
+     * @param cacheName2 Cache name 2.
+     * @param stopFlag Stop flag.
+     * @param loadError Load error reference.
+     * @param threadProgressTracker Progress tracker.
+     */
+    private void startCrossCacheTxLoadThread(
+        IgniteEx ig,
+        String cacheName1,
+        String cacheName2,
+        AtomicBoolean stopFlag,
+        AtomicReference<Throwable> loadError,
+        ConcurrentMap<Long, Long> threadProgressTracker
+    ) {
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                ThreadLocalRandom r = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, String> cache1 = ig.cache(cacheName1);
+                IgniteCache<Integer, String> cache2 = ig.cache(cacheName2);
+
+                boolean pessimistic = r.nextBoolean();
+
+                boolean rollback = r.nextBoolean();
+
+                try {
+                    while (!stopFlag.get()) {
+                        try (Transaction tx = ig.transactions().txStart(
+                            pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC,
+                            TransactionIsolation.REPEATABLE_READ
+                        )) {
+                            int key1 = -1;
+                            String val1 = null;
+                            while (val1 == null) {
+                                key1 = r.nextInt(ENTRIES);
+                                val1 = cache1.get(key1);
+                            }
+
+                            int key2 = -1;
+                            String val2 = null;
+                            while (val2 == null) {
+                                key2 = r.nextInt(ENTRIES);
+                                val2 = cache2.get(key2);
+                            }
+
+                            cache1.put(key1, val2);
+                            cache2.put(key2, val1);
+
+                            if (rollback)
+                                tx.rollback();
+                            else
+                                tx.commit();
+
+                            threadProgressTracker.compute(Thread.currentThread().getId(),
+                                (tId, ops) -> ops == null ? 1 : ops + 1);
+                        }
+                        catch (CacheException e) {
+                            if (e.getCause() instanceof ClusterTopologyException)
+                                ((ClusterTopologyException)e.getCause()).retryReadyFuture().get();
+                        }
+                        catch (ClusterTopologyException e) {
+                            e.retryReadyFuture().get();
+                        }
+                    }
+                }
+                catch (Throwable t) {
+                    loadError.compareAndSet(null, t);
+
+                    stopFlag.set(true);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param waitMs Wait milliseconds.
+     * @param loadError Load error.
+     * @param threadProgressTracker Thread progress tracker.
+     */
+    private void awaitProgressInAllLoaders(
+        long waitMs,
+        AtomicReference<Throwable> loadError,
+        ConcurrentMap<Long, Long> threadProgressTracker
+    ) throws Exception {
+        Map<Long, Long> view1 = new HashMap<>(threadProgressTracker);
+
+        long startTs = U.currentTimeMillis();
+
+        while (U.currentTimeMillis() < startTs + waitMs) {
+            Map<Long, Long> view2 = new HashMap<>(threadProgressTracker);
+
+            if (loadError.get() != null) {
+                loadError.get().printStackTrace();
+
+                fail("Unexpected error in load thread: " + loadError.get().toString());
+            }
+
+            boolean frozenThreadExists = false;
+
+            for (Map.Entry<Long, Long> entry : view1.entrySet()) {
+                if (entry.getValue().equals(view2.get(entry.getKey())))
+                    frozenThreadExists = true;
+            }
+
+            if (!frozenThreadExists)
+                return;
+
+            U.sleep(100);
+        }
+
+        fail("No progress in load thread");
+    }
+
+    /**
+     * Accepts all nodes except one with specified consistent ID.
+     */
+    private static class ConsistentIdNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** Consistent ID. */
+        private final Serializable consId0;
+
+        /**
+         * @param consId0 Consistent ID.
+         */
+        public ConsistentIdNodeFilter(Serializable consId0) {
+            this.consId0 = consId0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return !node.consistentId().equals(consId0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d519ef51/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 1185ebd..c85965f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPartitio
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
 import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.baseline.ClientAffinityAssignmentWithBaselineTest;
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAllBaselineNodesOnlineFullApiSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOfflineBaselineNodeFullApiSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest;
@@ -69,6 +70,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class);
         suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class);
         suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class);
+        suite.addTestSuite(ClientAffinityAssignmentWithBaselineTest.class);
 
         return suite;
     }


Mime
View raw message