ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/38] ignite git commit: ignite-2854 Deadlock detection for pessimistic transactions
Date Wed, 18 May 2016 10:58:24 GMT
ignite-2854 Deadlock detection for pessimistic transactions


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f7e6504
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f7e6504
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f7e6504

Branch: refs/heads/ignite-3165
Commit: 8f7e6504f97ad8f4966dad53005a17ef2c50069d
Parents: f805917
Author: agura <agura@gridgain.com>
Authored: Thu May 5 18:16:31 2016 +0300
Committer: agura <agura@gridgain.com>
Committed: Thu May 5 18:16:31 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     | 197 ++++--
 .../apache/ignite/IgniteSystemProperties.java   |  11 +
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../communication/GridIoMessageFactory.java     |  28 +-
 .../processors/cache/GridCacheMapEntry.java     |  16 +
 .../processors/cache/GridCacheMvcc.java         |   7 +
 .../processors/cache/GridCacheMvccManager.java  |  20 +-
 .../cache/GridCacheSharedContext.java           |   2 +-
 .../distributed/dht/GridDhtLockFuture.java      |  22 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  10 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  71 ++-
 .../distributed/near/GridNearLockFuture.java    |   3 +-
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   1 +
 .../cache/local/GridLocalLockFuture.java        |  55 +-
 .../cache/transactions/IgniteTxAdapter.java     |  17 +-
 .../transactions/IgniteTxLocalAdapter.java      |  44 +-
 .../cache/transactions/IgniteTxManager.java     | 413 ++++++++++++-
 .../cache/transactions/TxDeadlock.java          | 159 +++++
 .../cache/transactions/TxDeadlockDetection.java | 599 +++++++++++++++++++
 .../processors/cache/transactions/TxLock.java   | 225 +++++++
 .../cache/transactions/TxLockList.java          | 134 +++++
 .../cache/transactions/TxLocksRequest.java      | 205 +++++++
 .../cache/transactions/TxLocksResponse.java     | 318 ++++++++++
 .../ignite/internal/util/IgniteUtils.java       |   4 +
 .../apache/ignite/transactions/Transaction.java |  19 +-
 .../TransactionDeadlockException.java           |  42 ++
 .../TransactionTimeoutException.java            |   5 +-
 .../transactions/DepthFirstSearchTest.java      | 252 ++++++++
 .../transactions/TxDeadlockDetectionTest.java   | 495 +++++++++++++++
 ...simisticDeadlockDetectionCrossCacheTest.java | 165 +++++
 .../TxPessimisticDeadlockDetectionTest.java     | 487 +++++++++++++++
 ...naryObjectsTxDeadlockDetectionTestSuite.java |  37 ++
 .../TxDeadlockDetectionTestSuite.java           |  44 ++
 34 files changed, 4012 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index a791e38..3af2c44 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -58,6 +58,9 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.transactions.TransactionHeuristicException;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -233,6 +236,9 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      *      previous value).
      * @throws NullPointerException If either key or value are {@code null}.
      * @throws CacheException If put operation failed.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      */
     @IgniteAsyncSupported
     public V getAndPutIfAbsent(K key, V val) throws CacheException;
@@ -390,12 +396,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      * if any, defined by the {@link EntryProcessor} implementation.  No mappings
      * will be returned for {@link EntryProcessor}s that return a
      * <code>null</code> value for a key.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      */
     @IgniteAsyncSupported
     public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public V get(K key);
 
@@ -406,19 +420,27 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      * because the entry is missing from the cache, the Cache's {@link CacheLoader}
      * is called in an attempt to load the entry.
      *
-     * @param key the key whose associated value is to be returned
-     * @return the element, or null, if it does not exist.
-     * @throws IllegalStateException if the cache is {@link #isClosed()}
-     * @throws NullPointerException  if the key is null
-     * @throws CacheException        if there is a problem fetching the value
-     * @throws ClassCastException    if the implementation is configured to perform
+     * @param key The key whose associated value is to be returned.
+     * @return The element, or null, if it does not exist.
+     * @throws IllegalStateException If the cache is {@link #isClosed()}.
+     * @throws NullPointerException If the key is {@code null}.
+     * @throws CacheException If there is a problem fetching the value.
+     * @throws ClassCastException If the implementation is configured to perform
      * runtime-type-checking, and the key or value types are incompatible with those that have been
-     * configured for the {@link Cache}
+     * configured for the {@link Cache}.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      */
     @IgniteAsyncSupported
     public CacheEntry<K, V> getEntry(K key);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public Map<K, V> getAll(Set<? extends K> keys);
 
@@ -434,12 +456,15 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      * @param keys The keys whose associated values are to be returned.
      * @return A collection of entries that were found for the given keys. Entries not found
      *         in the cache are not in the returned collection.
-     * @throws NullPointerException  if keys is null or if keys contains a null
-     * @throws IllegalStateException if the cache is {@link #isClosed()}
-     * @throws CacheException        if there is a problem fetching the values
-     * @throws ClassCastException    if the implementation is configured to perform
+     * @throws NullPointerException If keys is null or if keys contains a {@code null}.
+     * @throws IllegalStateException If the cache is {@link #isClosed()}.
+     * @throws CacheException If there is a problem fetching the values.
+     * @throws ClassCastException If the implementation is configured to perform
      * runtime-type-checking, and the key or value types are incompatible with those that have been
-     * configured for the {@link Cache}
+     * configured for the {@link Cache}.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      */
     @IgniteAsyncSupported
     public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys);
@@ -454,7 +479,12 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     @IgniteAsyncSupported
     public Map<K, V> getAllOutTx(Set<? extends K> keys);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean containsKey(K key);
 
@@ -463,51 +493,109 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      *
      * @param keys Key whose presence in this cache is to be tested.
      * @return {@code True} if this cache contains a mapping for the specified keys.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      */
     @IgniteAsyncSupported
     public boolean containsKeys(Set<? extends K> keys);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public void put(K key, V val);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public V getAndPut(K key, V val);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public void putAll(Map<? extends K, ? extends V> map);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean putIfAbsent(K key, V val);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean remove(K key);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean remove(K key, V oldVal);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public V getAndRemove(K key);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean replace(K key, V oldVal, V newVal);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public boolean replace(K key, V val);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public V getAndReplace(K key, V val);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public void removeAll(Set<? extends K> keys);
 
@@ -591,7 +679,12 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      */
     public void localClearAll(Set<? extends K> keys);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments);
 
@@ -605,28 +698,34 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always
      * the same.
      *
-     * @param key            the key to the entry
-     * @param entryProcessor the {@link CacheEntryProcessor} to invoke
-     * @param arguments      additional arguments to pass to the
-     *                       {@link CacheEntryProcessor}
-     * @return the result of the processing, if any, defined by the
-     *         {@link CacheEntryProcessor} implementation
-     * @throws NullPointerException    if key or {@link CacheEntryProcessor} is null
-     * @throws IllegalStateException   if the cache is {@link #isClosed()}
-     * @throws ClassCastException    if the implementation is configured to perform
+     * @param key The key to the entry.
+     * @param entryProcessor The {@link CacheEntryProcessor} to invoke.
+     * @param arguments Additional arguments to pass to the {@link CacheEntryProcessor}.
+     * @return The result of the processing, if any, defined by the {@link CacheEntryProcessor} implementation.
+     * @throws NullPointerException If key or {@link CacheEntryProcessor} is null
+     * @throws IllegalStateException If the cache is {@link #isClosed()}
+     * @throws ClassCastException If the implementation is configured to perform
      *                               runtime-type-checking, and the key or value
      *                               types are incompatible with those that have been
-     *                               configured for the {@link Cache}
-     * @throws EntryProcessorException if an exception is thrown by the {@link
+     *                               configured for the {@link Cache}.
+     * @throws EntryProcessorException If an exception is thrown by the {@link
      *                                 CacheEntryProcessor}, a Caching Implementation
      *                                 must wrap any {@link Exception} thrown
      *                                 wrapped in an {@link EntryProcessorException}.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      * @see CacheEntryProcessor
      */
     @IgniteAsyncSupported
     public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments);
 
-    /** {@inheritDoc} */
+    /**
+     * {@inheritDoc}
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
+     */
     @IgniteAsyncSupported
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
         EntryProcessor<K, V, T> entryProcessor, Object... args);
@@ -655,20 +754,22 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
      * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always
      * the same.
      *
-     * @param keys           the set of keys for entries to process
-     * @param entryProcessor the {@link CacheEntryProcessor} to invoke
-     * @param args      additional arguments to pass to the
-     *                       {@link CacheEntryProcessor}
-     * @return the map of {@link EntryProcessorResult}s of the processing per key,
+     * @param keys The set of keys for entries to process.
+     * @param entryProcessor The {@link CacheEntryProcessor} to invoke.
+     * @param args Additional arguments to pass to the {@link CacheEntryProcessor}.
+     * @return The map of {@link EntryProcessorResult}s of the processing per key,
      * if any, defined by the {@link CacheEntryProcessor} implementation.  No mappings
      * will be returned for {@link CacheEntryProcessor}s that return a
      * <code>null</code> value for a key.
-     * @throws NullPointerException    if keys or {@link CacheEntryProcessor} are null
-     * @throws IllegalStateException   if the cache is {@link #isClosed()}
-     * @throws ClassCastException    if the implementation is configured to perform
+     * @throws NullPointerException If keys or {@link CacheEntryProcessor} are {#code null}.
+     * @throws IllegalStateException If the cache is {@link #isClosed()}.
+     * @throws ClassCastException If the implementation is configured to perform
      *                               runtime-type-checking, and the key or value
      *                               types are incompatible with those that have been
-     *                               configured for the {@link Cache}
+     *                               configured for the {@link Cache}.
+     * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred.
+     * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back.
+     * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state.
      * @see CacheEntryProcessor
      */
     @IgniteAsyncSupported

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 095d199..2b643b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -188,6 +188,17 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_TX_SALVAGE_TIMEOUT = "IGNITE_TX_SALVAGE_TIMEOUT";
 
     /**
+     * Specifies maximum number of iterations for deadlock detection procedure.
+     * If value of this property is less then or equal to zero then deadlock detection will be disabled.
+     */
+    public static final String IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS = "IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS";
+
+    /**
+     * Specifies timeout for deadlock detection procedure.
+     */
+    public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT";
+
+    /**
      * System property to override multicast group taken from configuration.
      * Used for testing purposes.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index fbf2b18..248f75b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -91,7 +91,10 @@ public enum GridTopic {
     TOPIC_HADOOP,
 
     /** */
-    TOPIC_QUERY;
+    TOPIC_QUERY,
+
+    /** */
+    TOPIC_TX;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3c7f378..5f60215 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,13 +26,13 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
 import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
-import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
-import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue;
@@ -99,6 +99,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder;
+import org.apache.ignite.internal.processors.cache.transactions.TxLock;
+import org.apache.ignite.internal.processors.cache.transactions.TxLockList;
+import org.apache.ignite.internal.processors.cache.transactions.TxLocksRequest;
+import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -156,6 +160,26 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -26:
+                msg = new TxLockList();
+
+                break;
+
+            case -25:
+                msg = new TxLock();
+
+                break;
+
+            case -24:
+                msg = new TxLocksRequest();
+
+                break;
+
+            case -23:
+                msg = new TxLocksResponse();
+
+                break;
+
             case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
                 msg = new TcpCommunicationSpi.NodeIdMessage();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 45be26c..0f7482a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -4324,6 +4326,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @return All MVCC local candidates.
+     */
+    @Nullable public synchronized List<GridCacheMvccCandidate> mvccAllLocal() {
+        GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null;
+
+        if (mvcc == null)
+            return null;
+
+        List<GridCacheMvccCandidate> locs = mvcc.allLocal();
+
+        return (locs == null || locs.isEmpty()) ? null : new ArrayList<>(locs);
+    }
+
+    /**
      * @param mvcc MVCC.
      */
     protected void mvccExtras(@Nullable GridCacheMvcc mvcc) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index adcbf92..507a2c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -1357,6 +1357,13 @@ public final class GridCacheMvcc {
     }
 
     /**
+     * @return Local MVCC candidates.
+     */
+    @Nullable public List<GridCacheMvccCandidate> allLocal() {
+        return locs;
+    }
+
+    /**
      * @param ver Version to check for ownership.
      * @return {@code True} if lock is owned by the specified version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index fb05ca5..43609eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -664,6 +663,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * Gets futures for given lock ID.
+     *
+     * @param ver Lock ID.
+     * @return Futures.
+     */
+    @SuppressWarnings({"unchecked"})
+    @Nullable public Collection<GridCacheMvccFuture<?>> mvccFutures(GridCacheVersion ver) {
+        Collection<GridCacheMvccFuture<?>> futs = this.mvccFuts.get(ver);
+
+        if (futs != null) {
+            synchronized (futs) {
+                return new ArrayList<>(futs);
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @param futId Future ID.
      * @return Found future.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 2221d3b..ef271f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -467,7 +467,7 @@ public class GridCacheSharedContext<K, V> {
      * @param nodeId Node ID.
      * @return Node or {@code null}.
      */
-    public ClusterNode node(UUID nodeId) {
+    @Nullable public ClusterNode node(UUID nodeId) {
         return kernalCtx.discovery().node(nodeId);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 80f35c1..5659436 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -132,7 +132,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     private LockTimeoutObject timeoutObj;
 
     /** Lock timeout. */
-    private long timeout;
+    private final long timeout;
 
     /** Filter. */
     private CacheEntryPredicate[] filter;
@@ -199,6 +199,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         assert nearNodeId != null;
         assert nearLockVer != null;
         assert topVer.topologyVersion() > 0;
+        assert (tx != null && timeout >= 0) || tx == null;
 
         this.cctx = cctx;
         this.nearNodeId = nearNodeId;
@@ -482,7 +483,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     private void onFailed(boolean dist) {
         undoLocks(dist);
 
-        onComplete(false, false);
+        onComplete(false, false, true);
     }
 
     /**
@@ -628,7 +629,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             err = t;
         }
 
-        onComplete(false, false);
+        onComplete(false, false, true);
     }
 
     /**
@@ -691,7 +692,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
-            onComplete(false, false);
+            onComplete(false, false, true);
 
         return isCancelled();
     }
@@ -721,7 +722,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                 this.err = err;
         }
 
-        return onComplete(success, err instanceof NodeStoppingException);
+        return onComplete(success, err instanceof NodeStoppingException, true);
     }
 
     /**
@@ -729,13 +730,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      *
      * @param success {@code True} if lock was acquired.
      * @param stopping {@code True} if node is stopping.
+     * @param unlock {@code True} if locks should be released.
      * @return {@code True} if complete by this operation.
      */
-    private boolean onComplete(boolean success, boolean stopping) {
+    private boolean onComplete(boolean success, boolean stopping, boolean unlock) {
         if (log.isDebugEnabled())
             log.debug("Received onComplete(..) callback [success=" + success + ", fut=" + this + ']');
 
-        if (!success && !stopping)
+        if (!success && !stopping && unlock)
             undoLocks(true);
 
         boolean set = false;
@@ -784,7 +786,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
      */
     public void map() {
         if (F.isEmpty(entries)) {
-            onComplete(true, false);
+            onComplete(true, false, true);
 
             return;
         }
@@ -1088,7 +1090,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
 
             timedOut = true;
 
-            onComplete(false, false);
+            boolean releaseLocks = !(inTx() && cctx.tm().deadlockDetectionEnabled());
+
+            onComplete(false, false, releaseLocks);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 534a560..cdbb6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -690,10 +690,16 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         if (passedKeys.isEmpty())
             return new GridFinishedFuture<>(ret);
 
-        GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
+        GridDhtTransactionalCacheAdapter<?, ?> dhtCache =
+            cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx();
+
+        long timeout = remainingTime();
+
+        if (timeout == -1)
+            return new GridFinishedFuture<>(timeoutException());
 
         IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys,
-            lockTimeout(),
+            timeout,
             this,
             isInvalidate(),
             read,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index a5f5286..dc55eb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -21,8 +21,10 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -50,8 +52,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -65,7 +69,9 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -451,6 +457,28 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
     }
 
     /**
+     * @return Keys for which locks requested from remote nodes but response isn't received.
+     */
+    public Set<KeyCacheObject> requestedKeys() {
+        Set<KeyCacheObject> requestedKeys = null;
+
+        for (IgniteInternalFuture<Boolean> miniFut : futures()) {
+            if (isMini(miniFut) && !miniFut.isDone()) {
+                if (requestedKeys == null)
+                    requestedKeys = new HashSet<>();
+
+                MiniFuture mini = (MiniFuture)miniFut;
+
+                requestedKeys.addAll(mini.keys);
+
+                return requestedKeys;
+            }
+        }
+
+        return requestedKeys;
+    }
+
+    /**
      * Finds pending mini future by the given mini ID.
      *
      * @param miniId Mini ID to find.
@@ -502,6 +530,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         if (log.isDebugEnabled())
             log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']');
 
+        // Local GridDhtLockFuture
+        if (inTx() && this.err instanceof IgniteTxTimeoutCheckedException && cctx.tm().deadlockDetectionEnabled())
+            return false;
+
         if (isDone())
             return false;
 
@@ -1288,7 +1320,36 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             if (log.isDebugEnabled())
                 log.debug("Timed out waiting for lock response: " + this);
 
-            onComplete(false, true);
+            if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
+                Set<IgniteTxKey> keys = new HashSet<>();
+
+                for (IgniteTxEntry txEntry : tx.allEntries()) {
+                    if (!txEntry.locked())
+                        keys.add(txEntry.txKey());
+                }
+
+                IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
+
+                fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
+                    @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
+                        try {
+                            TxDeadlock deadlock = fut.get();
+
+                            if (deadlock != null)
+                                err = new TransactionDeadlockException(deadlock.toString(cctx.shared()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            err = e;
+
+                            U.warn(log, "Failed to detect deadlock.", e);
+                        }
+
+                        onComplete(false, true);
+                    }
+                });
+            }
+            else
+                onComplete(false, true);
         }
 
         /** {@inheritDoc} */
@@ -1310,11 +1371,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
 
         /** Node ID. */
         @GridToStringExclude
-        private ClusterNode node;
+        private final ClusterNode node;
 
         /** Keys. */
         @GridToStringInclude
-        private Collection<KeyCacheObject> keys;
+        private final Collection<KeyCacheObject> keys;
 
         /** Mappings to proceed. */
         @GridToStringExclude
@@ -1394,6 +1455,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             }
 
             if (res.error() != null) {
+                if (inTx() && res.error() instanceof IgniteTxTimeoutCheckedException &&
+                    cctx.tm().deadlockDetectionEnabled())
+                    return;
+
                 if (log.isDebugEnabled())
                     log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
                         ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 55c5ab6..d5c0133 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -118,7 +118,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
     private LockTimeoutObject timeoutObj;
 
     /** Lock timeout. */
-    private long timeout;
+    private final long timeout;
 
     /** Filter. */
     private final CacheEntryPredicate[] filter;
@@ -180,6 +180,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         super(cctx.kernalContext(), CU.boolReducer());
 
         assert keys != null;
+        assert (tx != null && timeout >= 0) || tx == null;
 
         this.cctx = cctx;
         this.keys = keys;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 515d284..3ff165b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1171,8 +1171,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         if (log.isDebugEnabled())
             log.debug("Before acquiring transaction lock on keys: " + keys);
 
+        long timeout = remainingTime();
+
+        if (timeout == -1)
+            return new GridFinishedFuture<>(timeoutException());
+
         IgniteInternalFuture<Boolean> fut = cacheCtx.colocated().lockAllAsyncInternal(keys,
-            lockTimeout(),
+            timeout,
             this,
             isInvalidate(),
             read,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 52cad91..6992aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -73,6 +73,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
     protected GridCacheSharedContext<?, ?> cctx;
 
     /** Future ID. */
+    @GridToStringInclude
     protected IgniteUuid futId;
 
     /** Transaction. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 9f53c18..7868c79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -19,11 +19,14 @@ package org.apache.ignite.internal.processors.cache.local;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -31,14 +34,18 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -86,7 +93,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
     private LockTimeoutObject timeoutObj;
 
     /** Lock timeout. */
-    private long timeout;
+    private final long timeout;
 
     /** Filter. */
     private CacheEntryPredicate[] filter;
@@ -114,6 +121,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
         CacheEntryPredicate[] filter) {
         assert keys != null;
         assert cache != null;
+        assert (tx != null && timeout >= 0) || tx == null;
 
         this.cctx = cctx;
         this.cache = cache;
@@ -435,12 +443,53 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+        @SuppressWarnings({"ThrowableInstanceNeverThrown", "ForLoopReplaceableByForEach"})
         @Override public void onTimeout() {
             if (log.isDebugEnabled())
                 log.debug("Timed out waiting for lock response: " + this);
 
-            onComplete(false);
+            if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
+                Set<IgniteTxKey> keys = new HashSet<>();
+
+                List<GridLocalCacheEntry> entries = entries();
+
+                for (int i = 0; i < entries.size(); i++) {
+                    GridLocalCacheEntry e = entries.get(i);
+
+                    List<GridCacheMvccCandidate> mvcc = e.mvccAllLocal();
+
+                    if (mvcc == null)
+                        continue;
+
+                    GridCacheMvccCandidate cand = mvcc.get(0);
+
+                    if (cand.owner() && cand.tx() && !cand.version().equals(tx.xidVersion()))
+                        keys.add(e.txKey());
+                }
+
+                IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
+
+                fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
+                    @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
+                        try {
+                            TxDeadlock deadlock = fut.get();
+
+                            if (deadlock != null)
+                                err.compareAndSet(null,
+                                    new TransactionDeadlockException(deadlock.toString(cctx.shared())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            err.compareAndSet(null, e);
+
+                            U.warn(log, "Failed to detect deadlock.", e);
+                        }
+
+                        onComplete(false);
+                    }
+                });
+            }
+            else
+                onComplete(false);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 34f1fa4..3286689 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -697,27 +697,24 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     /**
      * Gets remaining allowed transaction time.
      *
-     * @return Remaining transaction time.
+     * @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out.
      */
     @Override public long remainingTime() {
         if (timeout() <= 0)
-            return -1;
+            return 0;
 
         long timeLeft = timeout() - (U.currentTimeMillis() - startTime());
 
-        if (timeLeft < 0)
-            return 0;
+        return timeLeft <= 0 ? -1 : timeLeft;
 
-        return timeLeft;
     }
 
     /**
-     * @return Lock timeout.
+     * @return Transaction timeout exception.
      */
-    protected long lockTimeout() {
-        long timeout = remainingTime();
-
-        return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout;
+    protected final IgniteCheckedException timeoutException() {
+        return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " +
+            "for transaction [timeout=" + timeout() + ", tx=" + this + ']');
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dc1ec43..962f2d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.transactions.TransactionDeadlockException;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1833,8 +1834,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
 
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
-                    lockTimeout(),
+                    timeout,
                     this,
                     true,
                     true,
@@ -3095,8 +3101,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for put on key: " + enlisted);
 
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
-                    lockTimeout(),
+                    timeout,
                     this,
                     false,
                     retval,
@@ -3270,8 +3281,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
 
+                long timeout = remainingTime();
+
+                if (timeout == -1)
+                    return new GridFinishedFuture<>(timeoutException());
+
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
-                    lockTimeout(),
+                    timeout,
                     this,
                     false,
                     retval,
@@ -3552,8 +3568,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             if (log.isDebugEnabled())
                 log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
 
+            long timeout = remainingTime();
+
+            if (timeout == -1)
+                return new GridFinishedFuture<>(timeoutException());
+
             IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
-                lockTimeout(),
+                timeout,
                 this,
                 false,
                 retval,
@@ -3707,7 +3728,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             throw new IgniteCheckedException("Cache transaction marked as rollback-only: " + this);
         }
 
-        if (remainingTime() == 0 && setRollbackOnly())
+        if (remainingTime() == -1 && setRollbackOnly())
             throw new IgniteTxTimeoutCheckedException("Cache transaction timed out " +
                 "(was rolled back automatically): " + this);
     }
@@ -4070,7 +4091,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         /** {@inheritDoc} */
         @Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable final Exception e) {
-            if (e != null) {
+            TransactionDeadlockException deadlockErr = X.cause(e, TransactionDeadlockException.class);
+
+            if (e != null && deadlockErr == null) {
                 setRollbackOnly();
 
                 if (commit && commitAfterLock())
@@ -4083,12 +4106,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 throw new GridClosureException(e);
             }
 
-            if (!locked) {
+            if (deadlockErr != null || !locked) {
                 setRollbackOnly();
 
-                final GridClosureException ex = new GridClosureException(new IgniteTxTimeoutCheckedException("Failed to " +
-                    "acquire lock within provided timeout for transaction [timeout=" + timeout() +
-                    ", tx=" + this + ']'));
+                final GridClosureException ex = new GridClosureException(
+                    new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " +
+                        "for transaction [timeout=" + timeout() + ", tx=" + this + ']', deadlockErr)
+                );
 
                 if (commit && commitAfterLock())
                     return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, T>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 5dcd53d..6e8f9fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -32,17 +33,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
@@ -51,9 +60,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -68,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -79,9 +91,12 @@ import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
@@ -111,6 +126,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Tx salvage timeout (default 3s). */
     private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
 
+    /** Version in which deadlock detection introduced. */
+    public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
+
+    /** Deadlock detection maximum iterations. */
+    static final int DEADLOCK_MAX_ITERS =
+        IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
+
     /** Committing transactions. */
     private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();
 
@@ -129,6 +151,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Per-ID map for near transactions. */
     private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap();
 
+    /** Deadlock detection futures. */
+    private final ConcurrentMap<Long, TxDeadlockFuture> deadlockDetectFuts = new ConcurrentHashMap8<>();
+
     /** TX handler. */
     private IgniteTxHandler txHnd;
 
@@ -162,6 +187,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> mappedVers =
         new ConcurrentHashMap8<>(5120);
 
+    /** TxDeadlock detection. */
+    private TxDeadlockDetection txDeadlockDetection;
+
     /** {@inheritDoc} */
     @Override protected void onKernalStart0(boolean reconnect) {
         if (reconnect)
@@ -175,14 +203,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                     DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
 
-                    cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(discoEvt.eventNode().id()));
+                    UUID nodeId = discoEvt.eventNode().id();
+
+                    cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
 
                     if (txFinishSync != null)
-                        txFinishSync.onNodeLeft(discoEvt.eventNode().id());
+                        txFinishSync.onNodeLeft(nodeId);
+
+                    for (TxDeadlockFuture fut : deadlockDetectFuts.values())
+                        fut.onNodeLeft(nodeId);
                 }
             },
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
+        this.txDeadlockDetection = new TxDeadlockDetection(cctx);
+
+        cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
+
         for (IgniteInternalTx tx : idMap.values()) {
             if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) {
                 if (log.isDebugEnabled())
@@ -194,6 +231,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        cctx.gridIO().removeMessageListener(TOPIC_TX);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
@@ -206,6 +248,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
             rollbackTx(e.getValue());
+
+        IgniteClientDisconnectedException err =
+            new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.");
+
+        for (TxDeadlockFuture fut : deadlockDetectFuts.values())
+            fut.onDone(err);
     }
 
     /**
@@ -378,7 +426,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             implicit,
             implicitSingle,
             sysCacheCtx != null,
-            sysCacheCtx != null ? sysCacheCtx.ioPolicy() : GridIoPolicy.SYSTEM_POOL,
+            sysCacheCtx != null ? sysCacheCtx.ioPolicy() : SYSTEM_POOL,
             concurrency,
             isolation,
             timeout,
@@ -758,7 +806,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             throw new IgniteCheckedException("Transaction is marked for rollback: " + tx);
         }
 
-        if (tx.remainingTime() == 0) {
+        if (tx.remainingTime() == -1) {
             tx.setRollbackOnly();
 
             throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this);
@@ -1434,12 +1482,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         throws IgniteCheckedException {
         assert tx.optimistic() || !tx.local();
 
-        long remainingTime = tx.timeout() - (U.currentTimeMillis() - tx.startTime());
+        long remainingTime = tx.remainingTime();
 
         // For serializable transactions, failure to acquire lock means
         // that there is a serializable conflict. For all other isolation levels,
         // we wait for the lock.
-        long timeout = tx.timeout() == 0 ? 0 : (remainingTime < 0 ? 0 : remainingTime);
+        long timeout = remainingTime < 0 ? 0 : remainingTime;
 
         GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null;
 
@@ -1843,6 +1891,217 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return {@code True} if deadlock detection is enabled.
+     */
+    public boolean deadlockDetectionEnabled() {
+        return DEADLOCK_MAX_ITERS > 0;
+    }
+
+    /**
+     * Performs deadlock detection for given keys.
+     *
+     * @param tx Target tx.
+     * @param keys Keys.
+     * @return Detection result.
+     */
+    public IgniteInternalFuture<TxDeadlock> detectDeadlock(
+        IgniteInternalTx tx,
+        Set<IgniteTxKey> keys
+    ) {
+        return txDeadlockDetection.detectDeadlock(tx, keys);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param fut Future.
+     * @param txKeys Tx keys.
+     */
+    void txLocksInfo(UUID nodeId, TxDeadlockFuture fut, Set<IgniteTxKey> txKeys) {
+        ClusterNode node = cctx.node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+
+            fut.onDone();
+
+            return;
+        }
+
+        if (supportsDeadlockDetection(node)) {
+            TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
+
+            try {
+                if (!cctx.localNodeId().equals(nodeId))
+                    req.prepareMarshal(cctx);
+
+                cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                if (e instanceof ClusterTopologyCheckedException) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+                }
+                else
+                    U.warn(log, "Failed to finish deadlock detection: " + e, e);
+
+                fut.onDone();
+            }
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node);
+
+            fut.onDone();
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if node supports deadlock detection protocol.
+     */
+    private boolean supportsDeadlockDetection(ClusterNode node) {
+        return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0;
+    }
+
+    /**
+     * @param tx Tx.
+     * @param txKeys Tx keys.
+     * @return {@code True} if key is involved into tx.
+     */
+    private boolean hasKeys(IgniteInternalTx tx, Collection<IgniteTxKey> txKeys) {
+        for (IgniteTxKey key : txKeys) {
+            if (tx.txState().entry(key) != null)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param txKeys Tx keys.
+     * @return Transactions locks and nodes.
+     */
+    private TxLocksResponse txLocksInfo(Collection<IgniteTxKey> txKeys) {
+        TxLocksResponse res = new TxLocksResponse();
+
+        Collection<IgniteInternalTx> txs = activeTransactions();
+
+        for (IgniteInternalTx tx : txs) {
+            boolean nearTxLoc = tx instanceof GridNearTxLocal;
+
+            if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys))
+                continue;
+
+            Collection<IgniteTxEntry> txEntries = tx.allEntries();
+
+            Set<KeyCacheObject> requestedKeys = null;
+
+            // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction
+            // in order to reduce amount of requests to remote nodes.
+            if (nearTxLoc) {
+                GridDhtColocatedLockFuture fut = colocatedLockFuture(tx);
+
+                if (fut != null)
+                    requestedKeys = fut.requestedKeys();
+            }
+
+            for (IgniteTxEntry txEntry : txEntries) {
+                IgniteTxKey txKey = txEntry.txKey();
+
+                if (res.txLocks(txKey) == null) {
+                    GridCacheMapEntry e = (GridCacheMapEntry)txEntry.cached();
+
+                    List<GridCacheMvccCandidate> locs = e.mvccAllLocal();
+
+                    if (locs != null) {
+                        boolean owner = false;
+
+                        for (GridCacheMvccCandidate loc : locs) {
+                            if (!owner && loc.owner() && loc.tx())
+                                owner = true;
+
+                            if (!owner) // Skip all candidates in case when no tx that owns lock.
+                                break;
+
+                            if (loc.tx()) {
+                                UUID nearNodeId = loc.otherNodeId();
+
+                                GridCacheVersion txId = loc.otherVersion();
+
+                                TxLock txLock = new TxLock(
+                                    txId == null ? loc.version() : txId,
+                                    nearNodeId == null ? loc.nodeId() : nearNodeId,
+                                    loc.threadId(),
+                                    loc.owner() ? TxLock.OWNERSHIP_OWNER : TxLock.OWNERSHIP_CANDIDATE);
+
+                                res.addTxLock(txKey, txLock);
+                            }
+                        }
+                    }
+                    // Special case for optimal sequence of nodes processing.
+                    else if (nearTxLoc && requestedKeys != null && requestedKeys.contains(txKey.key())) {
+                        TxLock txLock = new TxLock(
+                            tx.nearXidVersion(),
+                            tx.nodeId(),
+                            tx.threadId(),
+                            TxLock.OWNERSHIP_REQUESTED);
+
+                        res.addTxLock(txKey, txLock);
+                    }
+                    else
+                        res.addKey(txKey);
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * @param tx Tx. Must be instance of {@link GridNearTxLocal}.
+     * @return Colocated future.
+     */
+    private GridDhtColocatedLockFuture colocatedLockFuture(IgniteInternalTx tx) {
+        assert tx instanceof GridNearTxLocal : tx;
+
+        Collection<GridCacheMvccFuture<?>> futs = cctx.mvcc().mvccFutures(tx.nearXidVersion());
+
+        if (futs != null) {
+            for (GridCacheMvccFuture<?> fut : futs) {
+                if (fut instanceof GridDhtColocatedLockFuture)
+                    return (GridDhtColocatedLockFuture)fut;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param fut Future.
+     */
+    public void addFuture(TxDeadlockFuture fut) {
+        TxDeadlockFuture old = deadlockDetectFuts.put(fut.futureId(), fut);
+
+        assert old == null : old;
+    }
+
+    /**
+     * @param futId Future ID.
+     * @return Found future.
+     */
+    @Nullable public TxDeadlockFuture future(long futId) {
+        return deadlockDetectFuts.get(futId);
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    public void removeFuture(long futId) {
+        deadlockDetectFuts.remove(futId);
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2058,4 +2317,144 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             }
         }
     }
+
+    /**
+     * Transactions deadlock detection process message listener.
+     */
+    private class DeadlockDetectionListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            GridCacheMessage cacheMsg = (GridCacheMessage)msg;
+
+            unmarshall(nodeId, cacheMsg);
+
+            if (cacheMsg.classError() != null) {
+                try {
+                    processFailedMessage(nodeId, cacheMsg);
+                }
+                catch(Throwable e){
+                    U.error(log, "Failed to process message [senderId=" + nodeId +
+                        ", messageType=" + cacheMsg.getClass() + ']', e);
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Message received [locNodeId=" + cctx.localNodeId() +
+                        ", rmtNodeId=" + nodeId + ", msg=" + msg + ']');
+
+                if (msg instanceof TxLocksRequest) {
+                    TxLocksRequest req = (TxLocksRequest)msg;
+
+                    TxLocksResponse res = txLocksInfo(req.txKeys());
+
+                    res.futureId(req.futureId());
+
+                    try {
+                        if (!cctx.localNodeId().equals(nodeId))
+                            res.prepareMarshal(cctx);
+
+                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
+                    }
+                }
+                else if (msg instanceof TxLocksResponse) {
+                    TxLocksResponse res = (TxLocksResponse)msg;
+
+                    long futId = res.futureId();
+
+                    TxDeadlockFuture fut = future(futId);
+
+                    if (fut != null)
+                        fut.onResult(nodeId, res);
+                    else
+                        U.warn(log, "Unexpected response received " + res);
+                }
+                else
+                    throw new IllegalArgumentException("Unknown message [msg=" + msg + ']');
+            }
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param msg Message.
+         */
+        private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException {
+            switch (msg.directType()) {
+                case -24: {
+                    TxLocksRequest req = (TxLocksRequest)msg;
+
+                    TxLocksResponse res = new TxLocksResponse();
+
+                    res.futureId(req.futureId());
+
+                    try {
+                        cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
+                            ", res=" + res + ']', e);
+                    }
+                }
+
+                break;
+
+                case -23: {
+                    TxLocksResponse res = (TxLocksResponse)msg;
+
+                    TxDeadlockFuture fut = future(res.futureId());
+
+                    if (fut == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']');
+
+                        return;
+                    }
+
+                    fut.onResult(nodeId, res);
+                }
+
+                break;
+
+                default:
+                    throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" +
+                        msg + ']', msg.classError());
+            }
+
+        }
+
+        /**
+         * @param nodeId Sender node ID.
+         * @param cacheMsg Message.
+         */
+        private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
+            if (cctx.localNodeId().equals(nodeId))
+                return;
+
+            try {
+                cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
+            }
+            catch (IgniteCheckedException e) {
+                cacheMsg.onClassError(e);
+            }
+            catch (BinaryObjectException e) {
+                cacheMsg.onClassError(new IgniteCheckedException(e));
+            }
+            catch (Error e) {
+                if (cacheMsg.ignoreClassErrors() &&
+                    X.hasCause(e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) {
+                    cacheMsg.onClassError(
+                        new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e)
+                    );
+                }
+                else
+                    throw e;
+            }
+        }
+    }
 }


Mime
View raw message