ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-2146 Avoid hang in 'cache.get' if topology locked.
Date Wed, 16 Dec 2015 10:31:14 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1537 dea42fd32 -> ae2383fcf


ignite-2146 Avoid hang in 'cache.get' if topology locked.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae2383fc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae2383fc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae2383fc

Branch: refs/heads/ignite-1537
Commit: ae2383fcfa5bcb9cbb27e14f645b7db980439fa9
Parents: dea42fd
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Dec 16 13:30:55 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Dec 16 13:30:55 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   8 +-
 .../cache/GridCacheSharedContext.java           |  20 +
 .../dht/CacheDistributedGetFutureAdapter.java   |   2 +-
 .../dht/GridPartitionedGetFuture.java           |  15 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   8 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  10 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   2 +-
 .../distributed/near/GridNearGetFuture.java     |  17 +-
 .../distributed/near/GridNearLockFuture.java    |   6 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |   6 +-
 .../cache/transactions/IgniteTxManager.java     |   6 +-
 .../CacheGetInsideLockChangingTopologyTest.java | 472 +++++++++++++++++++
 .../IgniteCacheFailoverTestSuite3.java          |   2 +
 13 files changed, 544 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f0bed99..5212eee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3343,16 +3343,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteException If transaction exist.
      */
     private void checkEmptyTransactions() throws IgniteException {
-        if (transactions().tx() != null)
-            throw new IgniteException("Cannot start/stop cache within transaction.");
-
-        if (sharedCtx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId())
!= null)
-            throw new IgniteException("Cannot start/stop cache within lock.");
+        if (sharedCtx.lockedTopologyVersion(null) != null)
+            throw new IgniteException("Cannot start/stop cache within lock or transaction.");
     }
 
     /**
      * @param val Object to check.
      * @throws IgniteCheckedException If validation failed.
+     * @return Configuration copy.
      */
     private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException
{
         if (val == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 608829a..5ed1df9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -572,6 +572,26 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @param ignore Transaction to ignore.
+     * @return Not null topology version if current thread holds lock preventing topology
change.
+     */
+    @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore)
{
+        long threadId = Thread.currentThread().getId();
+
+        IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
+
+        AffinityTopologyVersion topVer = null;
+
+        if (tx != null && tx.topologyVersionSnapshot() != null)
+            topVer = tx.topologyVersionSnapshot();
+
+        if (topVer == null)
+            topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);
+
+        return topVer;
+    }
+
+    /**
      * Nulling references to potentially leak-prone objects.
      */
     public void cleanup() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 320c3c2..cfbc21b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -87,7 +87,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends
GridCompoun
     protected IgniteCacheExpiryPolicy expiryPlc;
 
     /** Flag indicating that get should be done on a locked topology version. */
-    protected final boolean canRemap;
+    protected boolean canRemap;
 
     /** */
     protected final boolean needVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6867e21..e8aaca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -133,10 +133,19 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer
:
-            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+        AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
-        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
topVer);
+        if (lockedTopVer != null) {
+            canRemap = false;
+
+            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
lockedTopVer);
+        }
+        else {
+            AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer
:
+                canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+
+            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
topVer);
+        }
 
         markInitialized();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 5d0814f..29971fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -156,6 +156,14 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object>
im
     ) {
         assert key != null;
 
+        AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
+
+        if (lockedTopVer != null) {
+            topVer = lockedTopVer;
+
+            canRemap = false;
+        }
+
         this.cctx = cctx;
         this.key = key;
         this.readThrough = readThrough;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index b384bab..eefdc73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -278,15 +278,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * Performs future mapping.
      */
     public void map() {
-        AffinityTopologyVersion topVer = null;
-
-        IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
-
-        if (tx != null && tx.topologyVersionSnapshot() != null)
-            topVer = tx.topologyVersionSnapshot();
-
-        if (topVer == null)
-            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+        AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
 
         if (topVer == null)
             mapOnTopology();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index d3028ca..22b329c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -597,7 +597,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
 
         // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
         if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(),
tx);
 
             if (tx0 != null)
                 topVer = tx0.topologyVersionSnapshot();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index cb866e3..a121af9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -142,11 +142,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = tx == null ?
-            (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion())
:
-            tx.topologyVersion();
+        AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
-        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
topVer);
+        if (lockedTopVer != null) {
+            canRemap = false;
+
+            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
lockedTopVer);
+        }
+        else {
+            AffinityTopologyVersion topVer = tx == null ?
+                (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion())
:
+                tx.topologyVersion();
+
+            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
topVer);
+        }
 
         markInitialized();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 4cb7248..a90058a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -700,11 +700,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
      */
     void map() {
         // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+        long threadId = Thread.currentThread().getId();
+
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
         if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
 
             if (tx0 != null)
                 topVer = tx0.topologyVersionSnapshot();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 5c7553f..b3eab34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -50,11 +50,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends
GridNearT
     /** {@inheritDoc} */
     @Override public final void prepare() {
         // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+        long threadId = Thread.currentThread().getId();
+
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
         // If there is another system transaction in progress, use it's topology version
to prevent deadlock.
         if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
 
             if (tx0 != null)
                 topVer = tx0.topologyVersionSnapshot();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 243c4cb..d2b803a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -605,11 +605,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param threadId Thread ID.
+     * @param ignore Transaction to ignore.
      * @return Any transaction associated with the current thread.
      */
-    public IgniteInternalTx anyActiveThreadTx(IgniteInternalTx ignore) {
-        long threadId = Thread.currentThread().getId();
-
+    public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) {
         IgniteInternalTx tx = threadMap.get(threadId);
 
         if (tx != null && tx.topologyVersionSnapshot() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
new file mode 100644
index 0000000..d31c4e7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheAlwaysEvictionPolicy;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+    /** */
+    private static final int SRVS = 3;
+
+    /** */
+    private static final int CLIENTS = 2;
+
+    /** */
+    private static final String TX_CACHE1 = "tx1";
+
+    /** */
+    private static final String TX_CACHE2 = "tx2";
+
+    /** */
+    private static final String ATOMIC_CACHE = "atomic";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        Boolean clientMode = client.get();
+
+        client.set(null);
+
+        if (clientMode != null && clientMode)
+            cfg.setClientMode(true);
+        else {
+            cfg.setCacheConfiguration(cacheConfiguration(TX_CACHE1, TRANSACTIONAL),
+                cacheConfiguration(TX_CACHE2, TRANSACTIONAL),
+                cacheConfiguration(ATOMIC_CACHE, ATOMIC));
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param atomicityMode Atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode)
{
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client.set(true);
+
+        Ignite client1 = startGrid(SRVS);
+
+        assertTrue(client1.configuration().isClientMode());
+
+        client.set(true);
+
+        Ignite client2 = startGrid(SRVS + 1);
+
+        assertTrue(client2.configuration().isClientMode());
+
+        client2.createNearCache(TX_CACHE1,
+            new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+
+        client2.createNearCache(TX_CACHE2,
+            new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+
+        client2.createNearCache(ATOMIC_CACHE,
+            new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxGetInsideLockStopPrimary() throws Exception {
+        getInsideLockStopPrimary(ignite(SRVS), TX_CACHE1);
+        getInsideLockStopPrimary(ignite(SRVS + 1), TX_CACHE1);
+
+        getInsideLockStopPrimary(ignite(SRVS), TX_CACHE2);
+        getInsideLockStopPrimary(ignite(SRVS + 1), TX_CACHE2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicGetInsideLockStopPrimary() throws Exception {
+        getInsideLockStopPrimary(ignite(SRVS), ATOMIC_CACHE);
+
+        getInsideLockStopPrimary(ignite(SRVS + 1), ATOMIC_CACHE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicGetInsideTxStopPrimary() throws Exception {
+        getInsideTxStopPrimary(ignite(SRVS), ATOMIC_CACHE);
+
+        getInsideTxStopPrimary(ignite(SRVS + 1), ATOMIC_CACHE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadCommittedPessimisticStopPrimary() throws Exception {
+        getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE1, PESSIMISTIC);
+        getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE1, PESSIMISTIC);
+
+        getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE2, PESSIMISTIC);
+        getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE2, PESSIMISTIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadCommittedOptimisticStopPrimary() throws Exception {
+        getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE1, OPTIMISTIC);
+        getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE1, OPTIMISTIC);
+
+        getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE2, OPTIMISTIC);
+        getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE2, OPTIMISTIC);
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     * @param concurrency Transaction concurrency.
+     * @throws Exception If failed.
+     */
+    private void getReadCommittedStopPrimary(Ignite ignite,
+        String cacheName,
+        TransactionConcurrency concurrency) throws Exception {
+        IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1);
+
+        IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+        final int NEW_NODE = SRVS + CLIENTS;
+
+        Ignite srv = startGrid(NEW_NODE);
+
+        try {
+            Integer key = primaryKey(srv.cache(cacheName));
+
+            Integer txKey = nearKey(srv.cache(cacheName));
+
+            srv.cache(cacheName).put(key, 1);
+
+            IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    U.sleep(500);
+
+                    log.info("Stop node.");
+
+                    stopGrid(NEW_NODE);
+
+                    log.info("Node stopped.");
+
+                    return null;
+                }
+            }, "stop-thread");
+
+            try (Transaction tx = ignite.transactions().txStart(concurrency, READ_COMMITTED))
{
+                txCache.put(txKey, 1);
+
+                while (!stopFut.isDone())
+                    assertEquals(1, (Object)getCache.get(key));
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopGrid(NEW_NODE);
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    private void getInsideLockStopPrimary(Ignite ignite, String cacheName) throws Exception
{
+        IgniteCache<Integer, Integer> lockCache = ignite.cache(TX_CACHE1);
+
+        IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+        final int NEW_NODE = SRVS + CLIENTS;
+
+        Ignite srv = startGrid(NEW_NODE);
+
+        try {
+            Integer key = primaryKey(srv.cache(cacheName));
+
+            getCache.put(key, 1);
+
+            IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    U.sleep(500);
+
+                    log.info("Stop node.");
+
+                    stopGrid(NEW_NODE);
+
+                    log.info("Node stopped.");
+
+                    return null;
+                }
+            }, "stop-thread");
+
+            Lock lock = lockCache.lock(key + 1);
+
+            lock.lock();
+
+            try {
+                while (!stopFut.isDone())
+                    assertEquals(1, (Object)getCache.get(key));
+            }
+            finally {
+                lock.unlock();
+            }
+
+            stopFut.get();
+        }
+        finally {
+            stopGrid(NEW_NODE);
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    private void getInsideTxStopPrimary(Ignite ignite, String cacheName) throws Exception
{
+        IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1);
+
+        IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+        final int NEW_NODE = SRVS + CLIENTS;
+
+        Ignite srv = startGrid(NEW_NODE);
+
+        try {
+            Integer key = primaryKey(srv.cache(cacheName));
+
+            getCache.put(key, 1);
+
+            IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    U.sleep(500);
+
+                    log.info("Stop node.");
+
+                    stopGrid(NEW_NODE);
+
+                    log.info("Node stopped.");
+
+                    return null;
+                }
+            }, "stop-thread");
+
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                txCache.get(key + 1);
+
+                while (!stopFut.isDone())
+                    assertEquals(1, (Object)getCache.get(key));
+
+                tx.commit();
+            }
+        }
+        finally {
+            stopGrid(NEW_NODE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded() throws Exception {
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final int NEW_NODE = SRVS + CLIENTS;
+
+        final AtomicInteger stopIdx = new AtomicInteger();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new
Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int idx = stopIdx.getAndIncrement();
+
+                int node = NEW_NODE + idx;
+
+                while (!finished.get()) {
+                    log.info("Start node: " + node);
+
+                    startGrid(node);
+
+                    U.sleep(300);
+
+                    log.info("Stop node: " + node);
+
+                    stopGrid(node);
+                }
+
+                return null;
+            }
+        }, 2, "stop-thread");
+
+        try {
+            final long stopTime = System.currentTimeMillis() + 60_000;
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            final int KEYS = 100_000;
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int node = idx.getAndIncrement() % (SRVS + CLIENTS);
+
+                    Ignite ignite = ignite(node);
+
+                    IgniteCache<Integer, Integer> txCache1 = ignite.cache(TX_CACHE1);
+                    IgniteCache<Integer, Integer> txCache2 = ignite.cache(TX_CACHE2);
+                    IgniteCache<Integer, Integer> atomicCache = ignite.cache(ATOMIC_CACHE);
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (U.currentTimeMillis() < stopTime) {
+                        Integer lockKey = rnd.nextInt(KEYS, KEYS + 1000);
+
+                        Lock lock = txCache1.lock(lockKey);
+
+                        try {
+                            lock.lock();
+
+                            try {
+                                executeGet(txCache1);
+
+                                executeGet(txCache2);
+
+                                executeGet(atomicCache);
+                            } finally {
+                                lock.unlock();
+                            }
+
+                            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+                                txCache1.put(lockKey, lockKey);
+
+                                executeGet(txCache1);
+
+                                executeGet(txCache2);
+
+                                executeGet(atomicCache);
+
+                                tx.commit();
+                            }
+                        }
+                        catch (IgniteException | CacheException e) {
+                            log.info("Error: " + e);
+                        }
+                    }
+
+                    return null;
+                }
+
+                private void executeGet(IgniteCache<Integer, Integer> cache) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    for (int i = 0; i < 100; i++)
+                        cache.get(rnd.nextInt(KEYS));
+
+                    Set<Integer> keys = new HashSet<>();
+
+                    for (int i = 0; i < 100; i++) {
+                        keys.add(rnd.nextInt(KEYS));
+
+                        if (keys.size() == 20) {
+                            cache.getAll(keys);
+
+                            keys.clear();
+                        }
+                    }
+
+                    cache.getAll(keys);
+                }
+            }, 10, "test-thread");
+
+            finished.set(true);
+
+            restartFut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae2383fc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
index 4b04c05..85b6e93 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGetInsideLockChangingTopologyTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCachePutRetryAtomicPrimaryWriteOrderSelfTest;
@@ -36,6 +37,7 @@ public class IgniteCacheFailoverTestSuite3 extends TestSuite {
         suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
         suite.addTestSuite(IgniteCachePutRetryAtomicPrimaryWriteOrderSelfTest.class);
         suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);
+        suite.addTestSuite(CacheGetInsideLockChangingTopologyTest.class);
 
         return suite;
     }


Mime
View raw message