ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [20/38] ignite git commit: ignite-2968 Deadlock detection for optimistic tx and near caches
Date Wed, 31 Aug 2016 06:33:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 63c9919..f9357f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -62,6 +61,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemo
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -126,7 +127,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
 
     /** Deadlock detection maximum iterations. */
-    static final int DEADLOCK_MAX_ITERS =
+    static int DEADLOCK_MAX_ITERS =
         IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
 
     /** Committing transactions. */
@@ -389,7 +390,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      *      {@code false} otherwise.
      */
     public boolean isCompleted(IgniteInternalTx tx) {
-        return completedVersHashMap.containsKey(tx.xidVersion());
+        boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
+
+        // Need check that for tx with timeout rollback message was not received before lock.
+        if (!completed && tx.local() && tx.dht() && tx.timeout() > 0)
+            return completedVersHashMap.containsKey(tx.nearXidVersion());
+
+        return completed;
     }
 
     /**
@@ -495,13 +502,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return null;
         }
 
-        if (tx.timeout() > 0) {
-            cctx.time().addTimeoutObject(tx);
-
-            if (log.isDebugEnabled())
-                log.debug("Registered transaction with timeout processor: " + tx);
-        }
-
         if (log.isDebugEnabled())
             log.debug("Transaction created: " + tx);
 
@@ -786,7 +786,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      */
     public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException {
         if (tx.state() == MARKED_ROLLBACK) {
-            if (tx.timedOut())
+            if (tx.remainingTime() == -1)
                 throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
 
             throw new IgniteCheckedException("Transaction is marked for rollback: " + tx);
@@ -1081,13 +1081,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (log.isDebugEnabled())
             log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']');
 
-        if (tx.timeout() > 0) {
-            cctx.time().removeTimeoutObject(tx);
-
-            if (log.isDebugEnabled())
-                log.debug("Unregistered transaction with timeout processor: " + tx);
-        }
-
         /*
          * Note that write phase is handled by transaction adapter itself,
          * so we don't do it here.
@@ -2006,17 +1999,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys))
                 continue;
 
-            Collection<IgniteTxEntry> txEntries = tx.allEntries();
+            IgniteTxState state = tx.txState();
+
+            assert state instanceof IgniteTxStateImpl || state instanceof IgniteTxImplicitSingleStateImpl;
+
+            Collection<IgniteTxEntry> txEntries =
+                state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries();
 
-            Set<KeyCacheObject> requestedKeys = null;
+            Set<IgniteTxKey> requestedKeys = null;
 
             // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction
             // in order to reduce amount of requests to remote nodes.
             if (nearTxLoc) {
-                GridDhtColocatedLockFuture fut = colocatedLockFuture(tx);
+                if (tx.pessimistic()) {
+                    GridDhtColocatedLockFuture fut =
+                        (GridDhtColocatedLockFuture)mvccFuture(tx, GridDhtColocatedLockFuture.class);
+
+                    if (fut != null)
+                        requestedKeys = fut.requestedKeys();
+
+                    GridNearLockFuture nearFut = (GridNearLockFuture)mvccFuture(tx, GridNearLockFuture.class);
 
-                if (fut != null)
-                    requestedKeys = fut.requestedKeys();
+                    if (nearFut != null) {
+                        Set<IgniteTxKey> nearRequestedKeys = nearFut.requestedKeys();
+
+                        if (nearRequestedKeys != null) {
+                            if (requestedKeys == null)
+                                requestedKeys = nearRequestedKeys;
+                            else
+                                requestedKeys = nearRequestedKeys;
+                        }
+                    }
+                }
+                else {
+                    GridNearOptimisticTxPrepareFuture fut =
+                        (GridNearOptimisticTxPrepareFuture)mvccFuture(tx, GridNearOptimisticTxPrepareFuture.class);
+
+                    if (fut != null)
+                        requestedKeys = fut.requestedKeys();
+                }
             }
 
             for (IgniteTxEntry txEntry : txEntries) {
@@ -2073,17 +2094,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
     /**
      * @param tx Tx. Must be instance of {@link GridNearTxLocal}.
-     * @return Colocated future.
+     * @param cls Future class.
+     * @return Cache future.
      */
-    private GridDhtColocatedLockFuture colocatedLockFuture(IgniteInternalTx tx) {
+    private IgniteInternalFuture mvccFuture(IgniteInternalTx tx, Class<? extends IgniteInternalFuture> cls) {
         assert tx instanceof GridNearTxLocal : tx;
 
         Collection<GridCacheMvccFuture<?>> futs = cctx.mvcc().mvccFutures(tx.nearXidVersion());
 
         if (futs != null) {
             for (GridCacheMvccFuture<?> fut : futs) {
-                if (fut instanceof GridDhtColocatedLockFuture)
-                    return (GridDhtColocatedLockFuture)fut;
+                if (fut.getClass().equals(cls))
+                    return fut;
             }
         }
 
@@ -2115,6 +2137,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Collection of active transaction deadlock detection futures.
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() {
+        Collection<? extends IgniteInternalFuture<?>> values = deadlockDetectFuts.values();
+
+        return (Collection<IgniteInternalFuture<?>>)values;
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c116d0d..f23cca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -362,6 +364,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values();
     }
 
+    /**
+     * @return All entries. Returned collection is copy of internal collection.
+     */
+    public synchronized Collection<IgniteTxEntry> allEntriesCopy() {
+        return txMap == null ? Collections.<IgniteTxEntry>emptySet() : new HashSet<>(txMap.values());
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteTxEntry entry(IgniteTxKey key) {
         return txMap == null ? null : txMap.get(key);
@@ -408,7 +417,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void addEntry(IgniteTxEntry entry) {
+    @Override public synchronized void addEntry(IgniteTxEntry entry) {
         txMap.put(entry.txKey(), entry);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
index 36843dd..70d938e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
@@ -36,8 +36,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
@@ -106,22 +109,30 @@ public class TxDeadlockDetection {
         stack.push(txId);
 
         while (!stack.isEmpty()) {
-            GridCacheVersion v = stack.pop();
+            GridCacheVersion v = stack.peek();
+
+            if (visited.contains(v)) {
+                stack.pop();
+                inPath.remove(v);
 
-            if (visited.contains(v))
                 continue;
+            }
 
             visited.add(v);
 
             Set<GridCacheVersion> children = wfg.get(v);
 
-            if (children == null || children.isEmpty())
+            if (children == null || children.isEmpty()) {
+                stack.pop();
+                inPath.remove(v);
+
                 continue;
+            }
 
             inPath.add(v);
 
             for (GridCacheVersion w : children) {
-                if (inPath.contains(w)) {
+                if (inPath.contains(w) && visited.contains(w)) {
                     List<GridCacheVersion> cycle = new ArrayList<>();
 
                     for (GridCacheVersion x = v; !x.equals(w); x = edgeTo.get(x))
@@ -158,15 +169,18 @@ public class TxDeadlockDetection {
         private final Set<IgniteTxKey> keys;
 
         /** Processed keys. */
+        @GridToStringInclude
         private final Set<IgniteTxKey> processedKeys = new HashSet<>();
 
         /** Processed nodes. */
         private final Set<UUID> processedNodes = new HashSet<>();
 
         /** Pending keys. */
+        @GridToStringInclude
         private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>();
 
         /** Nodes queue. */
+        @GridToStringInclude
         private final UniqueDeque<UUID> nodesQueue = new UniqueDeque<>();
 
         /** Preferred nodes. */
@@ -194,6 +208,7 @@ public class TxDeadlockDetection {
         private int itersCnt;
 
         /** Timeout object. */
+        @GridToStringExclude
         private DeadlockTimeoutObject timeoutObj;
 
         /** Timed out flag. */
@@ -252,8 +267,8 @@ public class TxDeadlockDetection {
 
             if (topVer == null) // Tx manager already stopped
                 onDone();
-
-            map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap());
+            else
+                map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap());
         }
 
         /**
@@ -441,14 +456,17 @@ public class TxDeadlockDetection {
          * @param txLocks Tx locks.
          */
         private void updateWaitForGraph(Map<IgniteTxKey, TxLockList> txLocks) {
+            if (txLocks == null || txLocks.isEmpty())
+                return;
+
             for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
 
                 GridCacheVersion txOwner = null;
 
                 for (TxLock lock : e.getValue().txLocks()) {
-                    if (lock.owner()) {
-                        assert txOwner == null;
-
+                    if (lock.owner() && txOwner == null) {
+                        // Actually we can get lock list with more than one owner. In this case ignore all owners
+                        // except first because likely the first owner was cause of deadlock.
                         txOwner = lock.txId();
 
                         if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) {
@@ -463,7 +481,7 @@ public class TxDeadlockDetection {
                         continue;
                     }
 
-                    if (lock.candiate()) {
+                    if (lock.candiate() || lock.owner()) {
                         GridCacheVersion txId0 = lock.txId();
 
                         Set<GridCacheVersion> waitForTxs = wfg.get(txId0);
@@ -485,9 +503,9 @@ public class TxDeadlockDetection {
 
             if (res != null && set) {
                 if (res.classError() != null) {
-                    IgniteLogger log = cctx.logger(TxDeadlockDetection.class);
+                    IgniteLogger log = cctx.kernalContext().log(this.getClass());
 
-                    log.warning("Failed to finish deadlock detection due to an error: " + nodeId);
+                    U.warn(log, "Failed to finish deadlock detection due to an error: " + nodeId);
 
                     onDone();
                 }
@@ -528,6 +546,11 @@ public class TxDeadlockDetection {
             return false;
         }
 
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TxDeadlockFuture.class, this);
+        }
+
         /**
          * Lock request timeout object.
          */
@@ -543,6 +566,10 @@ public class TxDeadlockDetection {
             @Override public void onTimeout() {
                 timedOut = true;
 
+                IgniteLogger log = cctx.kernalContext().log(this.getClass());
+
+                U.warn(log, "Deadlock detection was timed out [timeout=" + DEADLOCK_TIMEOUT + ", fut=" + this + ']');
+
                 onDone();
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 94b5620..2b524e8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -18,15 +18,29 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -34,11 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 
-import javax.cache.CacheException;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
@@ -62,6 +71,10 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
+        TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
         CacheConfiguration ccfg = new CacheConfiguration();
 
         ccfg.setAtomicityMode(atomicityMode());
@@ -87,7 +100,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
-        startGrids(1);
+        startGrids(2);
     }
 
     /** {@inheritDoc} */
@@ -98,7 +111,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
     /**
      * Success if user tx was timed out.
      *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testUserTxTimeout() throws Exception {
         final Ignite ignite = grid(0);
@@ -112,7 +125,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
     /**
      * Success if system caches weren't timed out.
      *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSystemCacheTx() throws Exception {
         final Ignite ignite = grid(0);
@@ -143,27 +156,23 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
      * Success if implicit tx fails.
      *
      * @param cache Cache name.
-     * @throws Exception
+     * @throws Exception If failed.
      */
     protected void checkImplicitTxTimeout(final IgniteCache<Object, Object> cache) throws Exception {
-        try {
-            cache.invoke("key", new EntryProcessor<Object, Object, Object>() {
-                @Override public Object process(final MutableEntry<Object, Object> entry, final Object... args)
-                    throws EntryProcessorException {
-                    try {
-                        sleepForTxFailure();
-                    } catch (InterruptedException e) {
-                        throw new EntryProcessorException(e);
-                    }
+        TestCommunicationSpi.delay = true;
 
-                    return null;
-                }
-            });
+        Integer key = primaryKey(ignite(1).cache(CACHE_NAME));
+
+        try {
+            cache.put(key, 0);
 
             fail("Timeout exception must be thrown");
         }
         catch (CacheException e) {
-            // OK
+            // No-op.
+        }
+        finally {
+            TestCommunicationSpi.delay = false;
         }
 
         cache.clear();
@@ -174,7 +183,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
      *
      * @param cache Cache name.
      * @param ignite Ignite instance.
-     * @throws Exception
+     * @throws Exception If failed.
      */
     protected void checkExplicitTxTimeout(final IgniteCache<Object, Object> cache, final Ignite ignite)
         throws Exception {
@@ -198,7 +207,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
      * Success if explicit tx doesn't fail.
      *
      * @param cache Cache instance.
-     * @throws Exception
+     * @throws Exception If failed.
      */
     protected void checkStartTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
         try (final IgniteInternalTx tx = CU.txStartInternal(cache.context(), cache, PESSIMISTIC, READ_COMMITTED)) {
@@ -220,7 +229,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
      * Success if implicit tx fails.
      *
      * @param cache Cache instance.
-     * @throws Exception
+     * @throws Exception If failed.
      */
     protected void checkImplicitTxSuccess(final IgniteInternalCache<Object, Object> cache) throws Exception {
         cache.invoke("key", new EntryProcessor<Object, Object, Object>() {
@@ -241,9 +250,39 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
     /**
      * Sleep multiple {@link #TX_TIMEOUT} times.
      *
-     * @throws InterruptedException
+     * @throws InterruptedException If interrupted.
      */
     private void sleepForTxFailure() throws InterruptedException {
         Thread.sleep(TX_TIMEOUT * 3);
     }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Delay. */
+        private static volatile boolean delay;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridNearTxPrepareRequest && delay) {
+                    try {
+                        U.sleep(TX_TIMEOUT * 2);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
index c417821..8475175 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxTimeoutAbstractTest.java
@@ -20,10 +20,10 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -154,8 +154,8 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest {
 
             assert false : "Timeout never happened for transaction: " + tx;
         }
-        catch (CacheException e) {
-            if (!(e.getCause() instanceof TransactionTimeoutException))
+        catch (Exception e) {
+            if (!(X.hasCause(e, TransactionTimeoutException.class)))
                 throw e;
 
             info("Received expected timeout exception [msg=" + e.getMessage() + ", tx=" + tx + ']');
@@ -164,4 +164,4 @@ public class IgniteTxTimeoutAbstractTest extends GridCommonAbstractTest {
             tx.close();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
new file mode 100644
index 0000000..89fe8e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeoutFullApiTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedMultiNodeLongTxTimeoutFullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
new file mode 100644
index 0000000..3e3b84e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest extends
+    GridCachePartitionedMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
index cfa93ac..e27207d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheLocalTxTimeoutSelfTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.local;
 
-import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -154,7 +153,7 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest {
 
             tx.commit();
         }
-        catch (CacheException e) {
+        catch (Exception e) {
             assertTrue(X.hasCause(e, TransactionTimeoutException.class));
 
             info("Received expected optimistic exception: " + e.getMessage());
@@ -166,4 +165,4 @@ public class GridCacheLocalTxTimeoutSelfTest extends GridCommonAbstractTest {
 
         assert wasEx;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
index 20467c2..b0a407c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java
@@ -24,10 +24,12 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import junit.framework.TestCase;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.findCycle;
 
@@ -95,6 +97,14 @@ public class DepthFirstSearchTest extends TestCase {
 
         wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
             put(T1, new HashSet<GridCacheVersion>(){{add(T2);}});
+            put(T2, new HashSet<GridCacheVersion>(){{add(T3);}});
+            put(T4, new HashSet<GridCacheVersion>(){{add(T1); add(T2); add(T3);}});
+        }};
+
+        assertAllNull(wfg);
+
+        wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
+            put(T1, new HashSet<GridCacheVersion>(){{add(T2);}});
             put(T3, new HashSet<GridCacheVersion>(){{add(T4);}});
             put(T4, new HashSet<GridCacheVersion>(){{add(T1);}});
         }};
@@ -228,6 +238,94 @@ public class DepthFirstSearchTest extends TestCase {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testFindCycle4() throws Exception {
+        Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{
+            put(T1, Collections.singleton(T2));
+            put(T2, asLinkedHashSet(T3, T4));
+            put(T3, Collections.singleton(T4));
+            put(T4, Collections.singleton(T5));
+            put(T6, Collections.singleton(T3));
+        }};
+
+        assertNull(findCycle(wfg, T1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomNoExceptions() throws Exception {
+        int maxNodesCnt = 100;
+        int minNodesCnt = 10;
+        int maxWaitForNodesCnt = 20;
+
+        int cyclesFound = 0;
+        int cyclesNotFound = 0;
+
+        Random seedRnd = new Random();
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < 50000; i++) {
+            long seed = seedRnd.nextLong();
+
+            rnd.setSeed(seed);
+
+            System.out.println(">>> Iteration " + i + " with seed " + seed);
+
+            int nodesCnt = rnd.nextInt(maxNodesCnt - minNodesCnt) + minNodesCnt;
+
+            Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<>();
+
+            for (int j = 0; j < nodesCnt; j++) {
+                if (rnd.nextInt(100) > 30) {
+                    int waitForNodesCnt = rnd.nextInt(maxWaitForNodesCnt);
+
+                    Set<GridCacheVersion> waitForNodes = null;
+
+                    if (waitForNodesCnt > 0) {
+                        waitForNodes = new LinkedHashSet<>();
+
+                        for (int k = 0; k < waitForNodesCnt;) {
+                            int n = rnd.nextInt(nodesCnt);
+
+                            if (n != j) {
+                                waitForNodes.add(new GridCacheVersion(n, 0, 0, 0));
+                                k++;
+                            }
+                        }
+                    }
+
+                    wfg.put(new GridCacheVersion(j, 0, 0, 0), waitForNodes);
+                }
+            }
+
+            for (int j = 0; j < nodesCnt; j++) {
+                try {
+                    List<GridCacheVersion> cycle = findCycle(wfg, new GridCacheVersion(j, 0, 0, 0));
+
+                    if (cycle == null)
+                        cyclesNotFound++;
+                    else
+                        cyclesFound++;
+                }
+                catch (Throwable e) {
+                    U.error(null, "Error during finding cycle in graph: ", e);
+
+                    U.warn(null, "Seed: " + seed);
+
+                    U.warn(null, "Wait-for-graph: " + wfg);
+
+                    fail();
+                }
+            }
+        }
+
+        System.out.println(">>> Test finished. Cycles found: " + cyclesFound + ", cycles not found: " + cyclesNotFound);
+    }
+
+    /**
      * @param wfg Wait-for-graph.
      */
     private static void assertAllNull(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion... ignore) {
@@ -249,4 +347,4 @@ public class DepthFirstSearchTest extends TestCase {
 
         return set;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
new file mode 100644
index 0000000..c9d18eb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 3;
+
+    /** Cache. */
+    private static final String CACHE = "cache";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(CACHE);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setNearConfiguration(null);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT", (int)(getTestTimeout() * 2));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        GridTestUtils.setFieldValue(null, TxDeadlockDetection.class, "DEADLOCK_TIMEOUT",
+            getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoHangsPessimistic() throws Exception {
+        assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+        doTest(PESSIMISTIC);
+
+        try {
+            GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0);
+
+            assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+            doTest(PESSIMISTIC);
+        }
+        finally {
+            GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS",
+                IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoHangsOptimistic() throws Exception {
+        assertTrue(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+        doTest(OPTIMISTIC);
+
+        try {
+            GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS", 0);
+
+            assertFalse(grid(0).context().cache().context().tm().deadlockDetectionEnabled());
+
+            doTest(OPTIMISTIC);
+        }
+        finally {
+            GridTestUtils.setFieldValue(null, IgniteTxManager.class, "DEADLOCK_MAX_ITERS",
+                IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000));
+        }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doTest(final TransactionConcurrency concurrency) throws IgniteCheckedException {
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> restartFut = null;
+
+        try {
+            restartFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    while (!stop.get()) {
+                        try {
+                            U.sleep(500);
+
+                            startGrid(NODES_CNT);
+
+                            awaitPartitionMapExchange();
+
+                            U.sleep(500);
+
+                            stopGrid(NODES_CNT);
+                        }
+                        catch (Exception e) {
+                            // No-op.
+                        }
+                    }
+                }
+            }, 1, "restart-thread");
+
+            long stopTime = System.currentTimeMillis() + 2 * 60_000L;
+
+            for (int i = 0; System.currentTimeMillis() < stopTime; i++) {
+                boolean detectionEnabled = grid(0).context().cache().context().tm().deadlockDetectionEnabled();
+
+                log.info(">>> Iteration " + i + " (detection is " + (detectionEnabled ? "enabled" : "disabled") + ')');
+
+                final AtomicInteger threadCnt = new AtomicInteger();
+
+                IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                    @Override public void run() {
+                        int threadNum = threadCnt.getAndIncrement();
+
+                        Ignite ignite = ignite(threadNum % NODES_CNT);
+
+                        IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
+
+                        try (Transaction tx = ignite.transactions().txStart(concurrency, REPEATABLE_READ, 500, 0)) {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            for (int i = 0; i < 50; i++) {
+                                int key = rnd.nextInt(50);
+
+                                if (log.isDebugEnabled()) {
+                                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+                                        ", tx=" + tx + ", key=" + key + ']');
+                                }
+
+                                cache.put(key, 0);
+                            }
+
+                            tx.commit();
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }, NODES_CNT * 3, "tx-thread");
+
+                fut.get();
+            }
+        }
+        finally {
+            stop.set(true);
+
+            if (restartFut != null)
+                restartFut.get();
+
+            checkDetectionFutures();
+        }
+    }
+
+    /**
+     *
+     */
+    private void checkDetectionFutures() {
+        for (int i = 0; i < NODES_CNT ; i++) {
+            Ignite ignite = ignite(i);
+
+            IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+            assertTrue(futs.isEmpty());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
index 3d0beac..87bc70f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionTest.java
@@ -21,8 +21,9 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collection;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -47,7 +48,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
-import org.jsr166.ThreadLocalRandom8;
 
 import static org.apache.ignite.internal.util.typedef.X.hasCause;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -152,7 +152,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
                         IgniteCache<Integer, Integer> cache = ignite.cache(CACHE);
 
                         try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 700, 0)) {
-                            ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
                             for (int i = 0; i < 50; i++) {
                                 int key = rnd.nextInt(50);
@@ -217,7 +217,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
 
                     cache.put(key, 0);
 
-                    barrier.await(timeout + 100, TimeUnit.MILLISECONDS);
+                    barrier.await(timeout + 1000, TimeUnit.MILLISECONDS);
 
                     tx.commit();
                 }
@@ -281,7 +281,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
                             log.info(">>> Performs sleep. [node=" + ((IgniteKernal)ignite).localNode() +
                                 ", tx=" + tx + ']');
 
-                            U.sleep(timeout * 2);
+                            U.sleep(timeout * 3);
                         }
                         else {
                             int key2 = threadNum + 1;
@@ -406,8 +406,7 @@ public class TxDeadlockDetectionTest extends GridCommonAbstractTest {
 
             IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
 
-            ConcurrentMap<Long, TxDeadlockDetection.TxDeadlockFuture> futs =
-                GridTestUtils.getFieldValue(txMgr, IgniteTxManager.class, "deadlockDetectFuts");
+            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
 
             assertTrue(futs.isEmpty());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
new file mode 100644
index 0000000..7b40da2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionCrossCacheTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxOptimisticDeadlockDetectionCrossCacheTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        CacheConfiguration ccfg0 = defaultCacheConfiguration();
+
+        ccfg0.setName("cache0");
+        ccfg0.setCacheMode(CacheMode.PARTITIONED);
+        ccfg0.setBackups(1);
+        ccfg0.setNearConfiguration(null);
+
+        CacheConfiguration ccfg1 = defaultCacheConfiguration();
+
+        ccfg1.setName("cache1");
+        ccfg1.setCacheMode(CacheMode.PARTITIONED);
+        ccfg1.setBackups(1);
+        ccfg1.setNearConfiguration(null);
+
+        cfg.setCacheConfiguration(ccfg0, ccfg1);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlock() throws Exception {
+        // Sometimes boh transactions perform commit, so we repeat attempt.
+        while (!doTestDeadlock()) {}
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private boolean doTestDeadlock() throws Exception {
+        TestCommunicationSpi.init(2);
+
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        final AtomicInteger threadCnt = new AtomicInteger();
+
+        final AtomicBoolean deadlock = new AtomicBoolean();
+
+        final AtomicInteger commitCnt = new AtomicInteger();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                int threadNum = threadCnt.getAndIncrement();
+
+                Ignite ignite = ignite(0);
+
+                IgniteCache<Integer, Integer> cache1 = ignite.cache("cache" + (threadNum == 0 ? 0 : 1));
+
+                IgniteCache<Integer, Integer> cache2 = ignite.cache("cache" + (threadNum == 0 ? 1 : 0));
+
+                try (Transaction tx =
+                         ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 500, 0)
+                ) {
+                    int key1 = primaryKey(cache1);
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+                        ", tx=" + tx + ", key=" + key1 + ", cache=" + cache1.getName() + ']');
+
+                    cache1.put(key1, 0);
+
+                    barrier.await();
+
+                    int key2 = primaryKey(cache2);
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+                        ", tx=" + tx + ", key=" + key2 + ", cache=" + cache2.getName() + ']');
+
+                    cache2.put(key2, 1);
+
+                    tx.commit();
+
+                    commitCnt.incrementAndGet();
+                }
+                catch (Throwable e) {
+                    // At least one stack trace should contain TransactionDeadlockException.
+                    if (hasCause(e, TransactionTimeoutException.class) &&
+                        hasCause(e, TransactionDeadlockException.class)
+                        ) {
+                        if (deadlock.compareAndSet(false, true))
+                            U.error(log, "At least one stack trace should contain " +
+                                TransactionDeadlockException.class.getSimpleName(), e);
+                    }
+                }
+            }
+        }, 2, "tx-thread");
+
+        fut.get();
+
+        if (commitCnt.get() == 2)
+            return false;
+
+        assertTrue(deadlock.get());
+
+        for (int i = 0; i < NODES_CNT ; i++) {
+            Ignite ignite = ignite(i);
+
+            IgniteTxManager txMgr = ((IgniteKernal)ignite).context().cache().context().tm();
+
+            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+            assertTrue(futs.isEmpty());
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Tx count. */
+        private static volatile int TX_CNT;
+
+        /** Tx ids. */
+        private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
+
+        /**
+         * @param txCnt Tx count.
+         */
+        private static void init(int txCnt) {
+            TX_CNT = txCnt;
+            TX_IDS.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridNearTxPrepareRequest) {
+                    final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
+
+                    GridCacheVersion txId = req.version();
+
+                    if (TX_IDS.contains(txId)) {
+                        while (TX_IDS.size() < TX_CNT) {
+                            try {
+                                U.sleep(50);
+                            }
+                            catch (IgniteInterruptedCheckedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }
+                }
+                else if (msg0 instanceof GridNearTxPrepareResponse) {
+                    GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+
+                    GridCacheVersion txId = res.version();
+
+                    TX_IDS.add(txId);
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
new file mode 100644
index 0000000..aa240aa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.util.typedef.X.cause;
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Nodes count (actually two times more nodes will started: server + client). */
+    private static final int NODES_CNT = 4;
+
+    /** No op transformer. */
+    private static final NoOpTransformer NO_OP_TRANSFORMER = new NoOpTransformer();
+
+    /** Wrapping transformer. */
+    private static final WrappingTransformer WRAPPING_TRANSFORMER = new WrappingTransformer();
+
+    /** Client mode flag. */
+    private static boolean client;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (isDebug()) {
+            TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+            discoSpi.failureDetectionTimeoutEnabled(false);
+
+            cfg.setDiscoverySpi(discoSpi);
+        }
+
+        TcpCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = false;
+
+        startGrids(NODES_CNT);
+
+        client = true;
+
+        for (int i = 0; i < NODES_CNT; i++)
+            startGrid(i + NODES_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksPartitioned() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, false), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksPartitionedNear() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(PARTITIONED, syncMode, true), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeadlocksReplicated() throws Exception {
+        for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), NO_OP_TRANSFORMER);
+            doTestDeadlocks(createCache(REPLICATED, syncMode, false), WRAPPING_TRANSFORMER);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write sync mode.
+     * @param near Near.
+     * @return Created cache.
+     */
+    @SuppressWarnings("unchecked")
+    private IgniteCache createCache(CacheMode cacheMode, CacheWriteSynchronizationMode syncMode, boolean near) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setBackups(1);
+        ccfg.setNearConfiguration(near ? new NearCacheConfiguration() : null);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        IgniteCache cache = ignite(0).createCache(ccfg);
+
+        if (near) {
+            for (int i = 0; i < NODES_CNT; i++) {
+                Ignite client = ignite(i + NODES_CNT);
+
+                assertTrue(client.configuration().isClientMode());
+
+                client.createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+            }
+        }
+
+        return cache;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param transformer Transformer closure.
+     * @throws Exception If failed.
+     */
+    private void doTestDeadlocks(IgniteCache cache, IgniteClosure<Integer, Object> transformer) throws Exception {
+        try {
+            awaitPartitionMapExchange();
+
+            doTestDeadlock(3, false, true, true, transformer);
+            doTestDeadlock(3, false, false, false, transformer);
+            doTestDeadlock(3, false, false, true, transformer);
+
+            doTestDeadlock(4, false, true, true, transformer);
+            doTestDeadlock(4, false, false, false, transformer);
+            doTestDeadlock(4, false, false, true, transformer);
+        }
+        catch (Throwable e) {
+            U.error(log, "Unexpected exception: ", e);
+
+            fail();
+        }
+        finally {
+            if (cache != null)
+                cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestDeadlock(
+        final int txCnt,
+        final boolean loc,
+        boolean lockPrimaryFirst,
+        final boolean clientTx,
+        final IgniteClosure<Integer, Object> transformer
+    ) throws Exception {
+        log.info(">>> Test deadlock [txCnt=" + txCnt + ", loc=" + loc + ", lockPrimaryFirst=" + lockPrimaryFirst +
+            ", clientTx=" + clientTx + ", transformer=" + transformer.getClass().getName() + ']');
+
+        TestCommunicationSpi.init(txCnt);
+
+        final AtomicInteger threadCnt = new AtomicInteger();
+
+        final CyclicBarrier barrier = new CyclicBarrier(txCnt);
+
+        final AtomicReference<TransactionDeadlockException> deadlockErr = new AtomicReference<>();
+
+        final List<List<Integer>> keySets = generateKeys(txCnt, loc, !lockPrimaryFirst);
+
+        final Set<Integer> involvedKeys = new GridConcurrentHashSet<>();
+        final Set<Integer> involvedLockedKeys = new GridConcurrentHashSet<>();
+        final Set<IgniteInternalTx> involvedTxs = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                int threadNum = threadCnt.incrementAndGet();
+
+                Ignite ignite = loc ? ignite(0) : ignite(clientTx ? threadNum - 1 + txCnt : threadNum - 1);
+
+                IgniteCache<Object, Integer> cache = ignite.cache(CACHE_NAME);
+
+                List<Integer> keys = keySets.get(threadNum - 1);
+
+                int txTimeout = 500 + txCnt * 100;
+
+                try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, txTimeout, 0)) {
+                    IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx();
+
+                    involvedTxs.add(tx0);
+
+                    Integer key = keys.get(0);
+
+                    involvedKeys.add(key);
+
+                    Object k;
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+                        ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+
+                    cache.put(transformer.apply(key), 0);
+
+                    involvedLockedKeys.add(key);
+
+                    barrier.await();
+
+                    key = keys.get(1);
+
+                    ClusterNode primaryNode =
+                        ((IgniteCacheProxy)cache).context().affinity().primary(key, NONE);
+
+                    List<Integer> primaryKeys =
+                        primaryKeys(grid(primaryNode).cache(CACHE_NAME), 5, key + (100 * threadNum));
+
+                    Map<Object, Integer> entries = new HashMap<>();
+
+                    involvedKeys.add(key);
+
+                    entries.put(transformer.apply(key), 0);
+
+                    for (Integer i : primaryKeys) {
+                        involvedKeys.add(i);
+
+                        entries.put(transformer.apply(i), 1);
+
+                        k = transformer.apply(i + 13);
+
+                        involvedKeys.add(i + 13);
+
+                        entries.put(k, 2);
+                    }
+
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
+                        ", tx=" + tx + ", entries=" + entries + ']');
+
+                    cache.putAll(entries);
+
+                    tx.commit();
+                }
+                catch (Throwable e) {
+                    U.error(log, "Expected exception: ", e);
+
+                    // At least one stack trace should contain TransactionDeadlockException.
+                    if (hasCause(e, TransactionTimeoutException.class) &&
+                        hasCause(e, TransactionDeadlockException.class)
+                        ) {
+                        if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class)))
+                            U.error(log, "At least one stack trace should contain " +
+                                TransactionDeadlockException.class.getSimpleName(), e);
+                    }
+                }
+            }
+        }, loc ? 2 : txCnt, "tx-thread");
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(null, "Unexpected exception", e);
+
+            fail();
+        }
+
+        U.sleep(1000);
+
+        TransactionDeadlockException deadlockE = deadlockErr.get();
+
+        assertNotNull(deadlockE);
+
+        boolean fail = false;
+
+        // Check transactions, futures and entry locks state.
+        for (int i = 0; i < NODES_CNT * 2; i++) {
+            Ignite ignite = ignite(i);
+
+            int cacheId = ((IgniteCacheProxy)ignite.cache(CACHE_NAME)).context().cacheId();
+
+            GridCacheSharedContext<Object, Object> cctx = ((IgniteKernal)ignite).context().cache().context();
+
+            IgniteTxManager txMgr = cctx.tm();
+
+            Collection<IgniteInternalTx> activeTxs = txMgr.activeTransactions();
+
+            for (IgniteInternalTx tx : activeTxs) {
+                Collection<IgniteTxEntry> entries = tx.allEntries();
+
+                for (IgniteTxEntry entry : entries) {
+                    if (entry.cacheId() == cacheId) {
+                        fail = true;
+
+                        U.error(log, "Transaction still exists: " + "\n" + tx.xidVersion() +
+                            "\n" + tx.nearXidVersion() + "\n nodeId=" + cctx.localNodeId() + "\n tx=" + tx);
+                    }
+                }
+            }
+
+            Collection<IgniteInternalFuture<?>> futs = txMgr.deadlockDetectionFutures();
+
+            assertTrue(futs.isEmpty());
+
+            GridCacheAdapter<Object, Integer> intCache = internalCache(i, CACHE_NAME);
+
+            GridCacheConcurrentMap map = intCache.map();
+
+            for (Integer key : involvedKeys) {
+                Object key0 = transformer.apply(key);
+
+                KeyCacheObject keyCacheObj = intCache.context().toCacheKeyObject(key0);
+
+                GridCacheMapEntry entry = map.getEntry(keyCacheObj);
+
+                if (entry != null)
+                    assertNull("Entry still has locks " + entry, entry.mvccAllLocal());
+            }
+        }
+
+        if (fail)
+            fail("Some transactions still exist");
+
+        // Check deadlock report
+        String msg = deadlockE.getMessage();
+
+        for (IgniteInternalTx tx : involvedTxs)
+            assertTrue(msg.contains(
+                "[txId=" + tx.xidVersion() + ", nodeId=" + tx.nodeId() + ", threadId=" + tx.threadId() + ']'));
+
+        for (Integer key : involvedKeys) {
+            if (involvedLockedKeys.contains(key))
+                assertTrue(msg.contains("[key=" + transformer.apply(key) + ", cache=" + CACHE_NAME + ']'));
+            else
+                assertFalse(msg.contains("[key=" + transformer.apply(key)));
+        }
+    }
+
+    /**
+     * @param nodesCnt Nodes count.
+     * @param loc Local cache.
+     */
+    private List<List<Integer>> generateKeys(int nodesCnt, boolean loc, boolean reverse) throws IgniteCheckedException {
+        List<List<Integer>> keySets = new ArrayList<>();
+
+        if (loc) {
+            List<Integer> keys = primaryKeys(ignite(0).cache(CACHE_NAME), 2);
+
+            keySets.add(new ArrayList<>(keys));
+
+            Collections.reverse(keys);
+
+            keySets.add(keys);
+        }
+        else {
+            for (int i = 0; i < nodesCnt; i++) {
+                List<Integer> keys = new ArrayList<>(2);
+
+                int n1 = i + 1;
+                int n2 = n1 + 1;
+
+                int i1 = n1 < nodesCnt ? n1 : n1 - nodesCnt;
+                int i2 = n2 < nodesCnt ? n2 : n2 - nodesCnt;
+
+                keys.add(primaryKey(ignite(i1).cache(CACHE_NAME)));
+                keys.add(primaryKey(ignite(i2).cache(CACHE_NAME)));
+
+                if (reverse)
+                    Collections.reverse(keys);
+
+                keySets.add(keys);
+            }
+        }
+
+        return keySets;
+    }
+
+    /**
+     *
+     */
+    private static class NoOpTransformer implements IgniteClosure<Integer, Object> {
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer val) {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class WrappingTransformer implements IgniteClosure<Integer, Object> {
+        /** {@inheritDoc} */
+        @Override public Object apply(Integer val) {
+            return new KeyObject(val);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class KeyObject implements Serializable {
+        /** Id. */
+        private int id;
+
+        /** Name. */
+        private String name;
+
+        /**
+         * @param id Id.
+         */
+        public KeyObject(int id) {
+            this.id = id;
+            this.name = "KeyObject" + id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "KeyObject{" +
+                "id=" + id +
+                ", name='" + name + '\'' +
+                '}';
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            KeyObject obj = (KeyObject)o;
+
+            return id == obj.id && name.equals(obj.name);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Tx count. */
+        private static volatile int TX_CNT;
+
+        /** Tx ids. */
+        private static final Set<GridCacheVersion> TX_IDS = new GridConcurrentHashSet<>();
+
+        /**
+         * @param txCnt Tx count.
+         */
+        private static void init(int txCnt) {
+            TX_CNT = txCnt;
+            TX_IDS.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(
+            final ClusterNode node,
+            final Message msg,
+            final IgniteInClosure<IgniteException> ackC
+        ) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Message msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridNearTxPrepareRequest) {
+                    final GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg0;
+
+                    GridCacheVersion txId = req.version();
+
+                    if (TX_IDS.contains(txId)) {
+                        while (TX_IDS.size() < TX_CNT) {
+                            try {
+                                U.sleep(50);
+                            }
+                            catch (IgniteInterruptedCheckedException e) {
+                                e.printStackTrace();
+                            }
+                        }
+                    }
+                }
+                else if (msg0 instanceof GridNearTxPrepareResponse) {
+                    GridNearTxPrepareResponse res = (GridNearTxPrepareResponse)msg0;
+
+                    GridCacheVersion txId = res.version();
+
+                    TX_IDS.add(txId);
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}


Mime
View raw message