ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/7] ignite git commit: ignite-6181
Date Wed, 20 Sep 2017 12:04:14 GMT
ignite-6181


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56dbdc2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56dbdc2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56dbdc2f

Branch: refs/heads/ignite-6181-2
Commit: 56dbdc2fc25f5a53570e59a87767a97fb7108e49
Parents: 2299589
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 20 10:58:24 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 20 14:56:00 2017 +0300

----------------------------------------------------------------------
 .../IgniteDiagnosticPrepareContext.java         |   4 +-
 .../processors/cache/GridCacheAdapter.java      |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   2 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   6 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  56 +-
 .../near/GridNearFastFinishFuture.java          |  79 +++
 .../distributed/near/GridNearLockFuture.java    |  56 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   3 +-
 .../near/GridNearTxFinishFuture.java            |  15 +-
 .../cache/distributed/near/GridNearTxLocal.java | 276 +++++---
 .../distributed/near/NearTxFinishFuture.java    |  31 +
 .../cache/transactions/IgniteInternalTx.java    |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  13 +-
 .../transactions/IgniteTxLocalAdapter.java      |   8 +-
 .../cache/transactions/IgniteTxLocalEx.java     |   4 +-
 .../cache/transactions/IgniteTxManager.java     |  79 ++-
 .../timeout/GridTimeoutProcessor.java           |  21 +-
 .../cache/IgniteTxConfigCacheSelfTest.java      |  14 +
 .../IgniteCacheThreadLocalTxTest.java           | 223 +++++++
 .../IgniteOptimisticTxSuspendResumeTest.java    |   6 +-
 ...ionedMultiNodeLongTxTimeout2FullApiTest.java |  34 +
 .../TxRollbackOnTimeoutNearCacheTest.java       |  28 +
 ...ollbackOnTimeoutNoDeadlockDetectionTest.java |  47 ++
 .../transactions/TxRollbackOnTimeoutTest.java   | 658 +++++++++++++++++++
 .../IgniteCacheFullApiSelfTestSuite.java        |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   9 +
 .../hadoop/impl/HadoopTxConfigCacheTest.java    |   4 +-
 30 files changed, 1502 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
index 378dc74..ed8d35e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
@@ -75,7 +75,7 @@ public class IgniteDiagnosticPrepareContext {
      * @param keys Entry keys.
      * @param msg Initial message.
      */
-    public void  txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) {
+    public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) {
         closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys));
     }
 
@@ -280,4 +280,4 @@ public class IgniteDiagnosticPrepareContext {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 92a8245..05c8cc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1857,7 +1857,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return new GridFinishedFuture<>(e);
             }
 
-            tx = ctx.tm().threadLocalTx(ctx.systemTx() ? ctx : null);
+            tx = ctx.tm().threadLocalTx(ctx);
         }
 
         if (tx == null || tx.implicit()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 958f156..7b60b9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2463,7 +2463,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /** {@inheritDoc} */
     @Nullable @Override public CacheObject peek(@Nullable IgniteCacheExpiryPolicy plc)
         throws GridCacheEntryRemovedException, IgniteCheckedException {
-        IgniteInternalTx tx = cctx.tm().localTxx();
+        IgniteInternalTx tx = cctx.tm().localTx();
 
         AffinityTopologyVersion topVer = tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion();
 
@@ -3110,10 +3110,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Current transaction.
      */
     private IgniteTxLocalAdapter currentTx() {
-        if (cctx.isDht())
-            return cctx.dht().near().context().tm().localTx();
-        else
-            return cctx.tm().localTx();
+        return cctx.tm().localTx();
     }
 
     /** {@inheritDoc} */
@@ -3491,7 +3488,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
         }
 
-        IgniteInternalTx tx = cctx.tm().localTxx();
+        IgniteInternalTx tx = cctx.tm().localTx();
 
         if (tx != null) {
             IgniteTxEntry e = tx.entry(txKey());

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index ea6461d..e5bcc46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -855,7 +855,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             // Note that we don't evict near entries here -
             // they will be deleted by their corresponding transactions.
             if (state(ROLLING_BACK) || state() == UNKNOWN) {
-                cctx.tm().rollbackTx(this);
+                cctx.tm().rollbackTx(this, false);
 
                 state(ROLLED_BACK);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5311ddc..6380710 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -227,7 +227,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 try {
                     boolean nodeStop = err != null && X.hasCause(err, NodeStoppingException.class);
 
-                    this.tx.tmFinish(err == null, nodeStop);
+                    this.tx.tmFinish(err == null, nodeStop, false);
                 }
                 catch (IgniteCheckedException finishErr) {
                     U.error(log, "Failed to finish tx: " + tx, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index ab5631e..28cc018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -436,7 +436,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (prepFut != null)
                 prepFut.get(); // Check for errors.
 
-            boolean finished = localFinish(commit);
+            boolean finished = localFinish(commit, false);
 
             if (!finished)
                 err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit +
@@ -542,7 +542,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException {
         assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate()
             || onePhaseCommit() || state() == PREPARED :
             "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
@@ -550,7 +550,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         assert nearMiniId != 0;
 
-        return super.localFinish(commit);
+        return super.localFinish(commit, clearThreadMap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 86eac42..e4a7141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -734,7 +734,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]");
 
@@ -773,7 +773,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
             if (commit && !isRollbackOnly())
                 userCommit();
             else
-                userRollback();
+                userRollback(clearThreadMap);
         }
         catch (IgniteCheckedException e) {
             err = e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 58c6319..b3f4c0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -226,12 +227,6 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
             log = U.logger(cctx.kernalContext(), logRef, GridDhtColocatedLockFuture.class);
         }
 
-        if (timeout > 0) {
-            timeoutObj = new LockTimeoutObject();
-
-            cctx.time().addTimeoutObject(timeoutObj);
-        }
-
         valMap = new ConcurrentHashMap8<>();
     }
 
@@ -399,7 +394,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
      * @param success Success flag.
      */
     public void complete(boolean success) {
-        onComplete(success, true);
+        onComplete(success, true, true);
     }
 
     /**
@@ -533,7 +528,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
-            onComplete(false, true);
+            onComplete(false, true, true);
 
         return isCancelled();
     }
@@ -556,7 +551,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
         if (err != null)
             success = false;
 
-        return onComplete(success, true);
+        return onComplete(success, true, true);
     }
 
     /**
@@ -564,9 +559,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
      *
      * @param success {@code True} if lock was acquired.
      * @param distribute {@code True} if need to distribute lock removal in case of failure.
+     * @param restoreTimeout {@code True} if need restore tx timeout callback.
      * @return {@code True} if complete by this operation.
      */
-    private boolean onComplete(boolean success, boolean distribute) {
+    private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) {
         if (log.isDebugEnabled())
             log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute +
                 ", fut=" + this + ']');
@@ -577,6 +573,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
         if (tx != null)
             cctx.tm().txContext(tx);
 
+        if (restoreTimeout && tx != null && tx.trackTimeout()) {
+            // Need restore timeout before onDone is called, but onComplete can be called concurrently,
+            // thus need ignore duplicated timeout objects.
+            tx.addTimeoutHandler(true);
+        }
+
         if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
@@ -675,6 +677,30 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
      * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back.
      */
     void map() {
+        if (tx != null && tx.trackTimeout()) {
+            if (!tx.removeTimeoutHandler()) {
+                tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " +
+                            "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() +
+                            ", tx=" + tx + ']');
+
+                        onError(err);
+
+                        onComplete(false, false, false);
+                    }
+                });
+
+                return;
+            }
+        }
+
+        if (timeout > 0) {
+            timeoutObj = new LockTimeoutObject();
+
+            cctx.time().addTimeoutObject(timeoutObj);
+        }
+
         // Obtain the topology version to use.
         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
@@ -930,7 +956,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
                             if (log.isDebugEnabled())
                                 log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 
-                            onComplete(false, false);
+                            onComplete(false, false, true);
 
                             return;
                         }
@@ -1307,7 +1333,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
             if (log.isDebugEnabled())
                 log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 
-            onComplete(false, false);
+            onComplete(false, false, true);
 
             return false;
         }
@@ -1419,12 +1445,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
                             U.warn(log, "Failed to detect deadlock.", e);
                         }
 
-                        onComplete(false, true);
+                        onComplete(false, true, true);
                     }
                 });
             }
             else
-                onComplete(false, true);
+                onComplete(false, true, true);
         }
 
         /** {@inheritDoc} */
@@ -1673,4 +1699,4 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
             return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java
new file mode 100644
index 0000000..158d85d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearFastFinishFuture.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+
+/**
+ *
+ */
+public class GridNearFastFinishFuture extends GridFutureAdapter<IgniteInternalTx> implements NearTxFinishFuture {
+    /** */
+    private final GridNearTxLocal tx;
+
+    /** */
+    private final boolean commit;
+
+    /**
+     * @param tx Transaction.
+     * @param commit Commit flag.
+     */
+    GridNearFastFinishFuture(GridNearTxLocal tx, boolean commit) {
+        this.tx = tx;
+        this.commit = commit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean commit() {
+        return commit;
+    }
+
+    /**
+     *
+     */
+    public void finish() {
+        if (commit) {
+            tx.state(PREPARING);
+            tx.state(PREPARED);
+            tx.state(COMMITTING);
+
+            tx.context().tm().fastFinishTx(tx, true);
+
+            tx.state(COMMITTED);
+        }
+        else {
+            tx.state(PREPARING);
+            tx.state(PREPARED);
+            tx.state(ROLLING_BACK);
+
+            tx.context().tm().fastFinishTx(tx, false);
+
+            tx.state(ROLLED_BACK);
+        }
+
+        onDone(tx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index c947715..c999351 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
@@ -230,12 +231,6 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridNearLockFuture.class);
 
-        if (timeout > 0) {
-            timeoutObj = new LockTimeoutObject();
-
-            cctx.time().addTimeoutObject(timeoutObj);
-        }
-
         valMap = new ConcurrentHashMap8<>();
     }
 
@@ -434,7 +429,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
      * @param success Success flag.
      */
     public void complete(boolean success) {
-        onComplete(success, true);
+        onComplete(success, true, true);
     }
 
     /**
@@ -655,7 +650,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
                     log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
             }
 
-            onComplete(true, true);
+            onComplete(true, true, true);
 
             return true;
         }
@@ -666,7 +661,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
     /** {@inheritDoc} */
     @Override public boolean cancel() {
         if (onCancelled())
-            onComplete(false, true);
+            onComplete(false, true, true);
 
         return isCancelled();
     }
@@ -690,7 +685,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
         if (err != null)
             success = false;
 
-        return onComplete(success, true);
+        return onComplete(success, true, true);
     }
 
     /**
@@ -698,9 +693,10 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
      *
      * @param success {@code True} if lock was acquired.
      * @param distribute {@code True} if need to distribute lock removal in case of failure.
+     * @param restoreTimeout {@code True} if need restore tx timeout callback.
      * @return {@code True} if complete by this operation.
      */
-    private boolean onComplete(boolean success, boolean distribute) {
+    private boolean onComplete(boolean success, boolean distribute, boolean restoreTimeout) {
         if (log.isDebugEnabled())
             log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute +
                 ", fut=" + this + ']');
@@ -711,6 +707,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
         if (tx != null)
             cctx.tm().txContext(tx);
 
+        if (restoreTimeout && tx != null && tx.trackTimeout()) {
+            // Need restore timeout before onDone is called, but onComplete can be called concurrently,
+            // thus need ignore duplicated timeout objects.
+            tx.addTimeoutHandler(true);
+        }
+
         if (super.onDone(success, err)) {
             if (log.isDebugEnabled())
                 log.debug("Completing future: " + this);
@@ -770,6 +772,30 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
      * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back.
      */
     void map() {
+        if (tx != null && tx.trackTimeout()) {
+            if (!tx.removeTimeoutHandler()) {
+                tx.finishFuture().listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                        IgniteTxTimeoutCheckedException err = new IgniteTxTimeoutCheckedException("Failed to " +
+                            "acquire lock, transaction was rolled back on timeout [timeout=" + tx.timeout() +
+                            ", tx=" + tx + ']');
+
+                        onError(err);
+
+                        onComplete(false, false, false);
+                    }
+                });
+
+                return;
+            }
+        }
+
+        if (timeout > 0) {
+            timeoutObj = new LockTimeoutObject();
+
+            cctx.time().addTimeoutObject(timeoutObj);
+        }
+
         // Obtain the topology version to use.
         long threadId = Thread.currentThread().getId();
 
@@ -981,9 +1007,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
                                     if (log.isDebugEnabled())
                                         log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 
-                                    onComplete(
-                                        false,
-                                        false);
+                                    onComplete(false, false, true);
 
                                     return;
                                 }
@@ -1476,12 +1500,12 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
                             U.warn(log, "Failed to detect deadlock.", e);
                         }
 
-                        onComplete(false, true);
+                        onComplete(false, true, true);
                     }
                 });
             }
             else
-                onComplete(false, true);
+                onComplete(false, true, true);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 77c68bd..6d7a862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -913,8 +913,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 return;
 
             if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
-                if (parent.cctx.tm().deadlockDetectionEnabled() &&
-                    (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException)) {
+                if (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException) {
                     parent.onTimeout();
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c45eb7b..b4bc977 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -67,7 +67,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  *
  */
 public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
-    implements GridCacheFuture<IgniteInternalTx> {
+    implements GridCacheFuture<IgniteInternalTx>, NearTxFinishFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -136,6 +136,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     }
 
     /** {@inheritDoc} */
+    @Override public boolean commit() {
+        return commit;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
     }
@@ -310,7 +315,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                         err = new TransactionRollbackException("Failed to commit transaction.", err);
 
                     try {
-                        tx.localFinish(err == null);
+                        tx.localFinish(err == null, true);
                     }
                     catch (IgniteCheckedException e) {
                         if (err != null)
@@ -327,7 +332,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                         finishOnePhase(commit);
 
                     try {
-                        tx.tmFinish(commit, nodeStop);
+                        tx.tmFinish(commit, nodeStop, true);
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to finish tx: " + tx, e);
@@ -388,7 +393,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
      * @param commit Commit flag.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    void finish(boolean commit) {
+    public void finish(boolean commit, boolean clearThreadMap) {
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
 
@@ -402,7 +407,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
         }
 
         try {
-            if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) {
+            if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
                 if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) {
                     if (mappings.single()) {
                         GridDistributedTxMapping mapping = mappings.singleMapping();

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 058a3ff..a08d693 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
@@ -89,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -107,6 +109,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxE
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
@@ -117,7 +120,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable  {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -126,12 +129,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, IgniteInternalFuture.class, "prepFut");
 
     /** Prepare future updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> COMMIT_FUT_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "commitFut");
-
-    /** Rollback future updater. */
-    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut");
+    private static final AtomicReferenceFieldUpdater<GridNearTxLocal, NearTxFinishFuture> FINISH_FUT_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, NearTxFinishFuture.class, "finishFut");
 
     /** DHT mappings. */
     private IgniteTxMappings mappings;
@@ -144,12 +143,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     /** Commit future. */
     @SuppressWarnings("UnusedDeclaration")
     @GridToStringExclude
-    private volatile GridNearTxFinishFuture commitFut;
-
-    /** Rollback future. */
-    @SuppressWarnings("UnusedDeclaration")
-    @GridToStringExclude
-    private volatile GridNearTxFinishFuture rollbackFut;
+    private volatile NearTxFinishFuture finishFut;
 
     /** True if transaction contains near cache entries mapped to local node. */
     private boolean nearLocallyMapped;
@@ -170,6 +164,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     protected boolean transform;
 
     /** */
+    private boolean trackTimeout;
+
+    /** */
     @GridToStringExclude
     private TransactionProxyImpl proxy;
 
@@ -229,6 +226,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl();
 
         initResult();
+
+        if (timeout() > 0 && !implicit())
+            trackTimeout = cctx.time().addTimeoutObject(this, false);
     }
 
     /** {@inheritDoc} */
@@ -3044,7 +3044,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing near local tx [tx=" + this + ", commit=" + commit + "]");
 
@@ -3080,7 +3080,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             if (commit && !isRollbackOnly())
                 userCommit();
             else
-                userRollback();
+                userRollback(clearThreadMap);
         }
         catch (IgniteCheckedException e) {
             err = e;
@@ -3146,6 +3146,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             if (!PREP_FUT_UPD.compareAndSet(this, null, fut))
                 return prepFut;
 
+            if (trackTimeout)
+                removeTimeoutHandler();
+
             if (timeout == -1) {
                 fut.onDone(this, timeoutException());
 
@@ -3178,21 +3181,51 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         if (awaitLastFuture)
             txState().awaitLastFuture(cctx);
 
-        prepareAsync().get();
+        prepareNearTxLocal().get();
     }
 
     /**
-     * @return Prepare future.
+     * @throws IgniteCheckedException If failed.
      */
-    private IgniteInternalFuture<?> prepareAsync() {
-        return prepareNearTxLocal();
+    public void commit() throws IgniteCheckedException {
+        commitNearTxLocalAsync().get();
     }
 
     /**
-     * @throws IgniteCheckedException If failed.
+     * @param fut Already started finish future.
+     * @param commit Commit flag.
+     * @return Finish future.
      */
-    public void commit() throws IgniteCheckedException {
-        commitNearTxLocalAsync().get();
+    private IgniteInternalFuture<IgniteInternalTx> chainFinishFuture(NearTxFinishFuture fut, final boolean commit) {
+        assert fut != null;
+
+        if (fut.commit() != commit) {
+            final GridNearTxLocal tx = this;
+
+            if (!commit) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to rollback transaction, commit already started: " + tx);
+
+                return fut;
+            }
+
+            final GridFutureAdapter<IgniteInternalTx> fut0 = new GridFutureAdapter<>();
+
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<IgniteInternalTx>>() {
+                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                    if (timedOut())
+                        fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " +
+                            "transaction is concurrently rolled back on timeout: " + tx));
+                    else
+                        fut0.onDone(new IgniteCheckedException("Failed to commit transaction, " +
+                            "transaction is concurrently rolled back: " + tx));
+                }
+            });
+
+            return fut0;
+        }
+
+        return fut;
     }
 
     /**
@@ -3202,42 +3235,43 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
         if (log.isDebugEnabled())
             log.debug("Committing near local tx: " + this);
 
+        NearTxFinishFuture fut = finishFut;
+
+        if (fut != null)
+            return chainFinishFuture(fut, true);
+
         if (fastFinish()) {
-            state(PREPARING);
-            state(PREPARED);
-            state(COMMITTING);
+            GridNearFastFinishFuture fut0;
 
-            cctx.tm().fastFinishTx(this, true);
+            if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearFastFinishFuture(this, true)))
+                return chainFinishFuture(finishFut, true);
 
-            state(COMMITTED);
+            fut0.finish();
 
-            return new GridFinishedFuture<>((IgniteInternalTx)this);
+            return fut0;
         }
 
-        final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
+        final GridNearTxFinishFuture fut0;
 
-        GridNearTxFinishFuture fut = commitFut;
+        if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, true)))
+            return chainFinishFuture(finishFut, true);
 
-        if (fut != null ||
-            !COMMIT_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, true)))
-            return commitFut;
+        cctx.mvcc().addFuture(fut0, fut0.futureId());
 
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        final IgniteInternalFuture<?> prepareFut = prepareNearTxLocal();
 
         prepareFut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(IgniteInternalFuture<?> f) {
-                GridNearTxFinishFuture fut0 = commitFut;
-
                 try {
                     // Make sure that here are no exceptions.
                     prepareFut.get();
 
-                    fut0.finish(true);
+                    fut0.finish(true, true);
                 }
                 catch (Error | RuntimeException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
-                    fut0.finish(false);
+                    fut0.finish(false, true);
 
                     throw e;
                 }
@@ -3245,12 +3279,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e);
 
                     if (!(e instanceof NodeStoppingException))
-                        fut0.finish(false);
+                        fut0.finish(false, true);
                 }
             }
         });
 
-        return fut;
+        return fut0;
     }
 
     /** {@inheritDoc} */
@@ -3269,30 +3303,42 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
      * @return Rollback future.
      */
     public IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync() {
+        return rollbackNearTxLocalAsync(false);
+    }
+
+    /**
+     * @param onTimeout {@code True} if rolled back asynchronously on timeout.
+     * @return Rollback future.
+     */
+    private IgniteInternalFuture<IgniteInternalTx> rollbackNearTxLocalAsync(final boolean onTimeout) {
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
+        if (!onTimeout && trackTimeout)
+            removeTimeoutHandler();
+
+        NearTxFinishFuture fut = finishFut;
+
+        if (fut != null)
+            return chainFinishFuture(finishFut, false);
+
         if (fastFinish()) {
-            state(PREPARING);
-            state(PREPARED);
-            state(ROLLING_BACK);
+            GridNearFastFinishFuture fut0;
 
-            cctx.tm().fastFinishTx(this, false);
+            if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearFastFinishFuture(this, false)))
+                return chainFinishFuture(finishFut, false);
 
-            state(ROLLED_BACK);
+            fut0.finish();
 
-            return new GridFinishedFuture<>((IgniteInternalTx)this);
+            return fut0;
         }
 
-        GridNearTxFinishFuture fut = rollbackFut;
-
-        if (fut != null)
-            return fut;
+        final GridNearTxFinishFuture fut0;
 
-        if (!ROLLBACK_FUT_UPD.compareAndSet(this, null, fut = new GridNearTxFinishFuture<>(cctx, this, false)))
-            return rollbackFut;
+        if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false)))
+            return chainFinishFuture(finishFut, false);
 
-        cctx.mvcc().addFuture(fut, fut.futureId());
+        cctx.mvcc().addFuture(fut0, fut0.futureId());
 
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
@@ -3307,7 +3353,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                     log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
             }
 
-            fut.finish(false);
+            fut0.finish(false, !onTimeout);
         }
         else {
             prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@@ -3321,14 +3367,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                             log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']');
                     }
 
-                    GridNearTxFinishFuture fut0 = rollbackFut;
-
-                    fut0.finish(false);
+                    fut0.finish(false, !onTimeout);
                 }
             });
         }
 
-        return fut;
+        return fut0;
     }
 
     /** {@inheritDoc} */
@@ -3693,38 +3737,52 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     @Override public void close() throws IgniteCheckedException {
         TransactionState state = state();
 
-        if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
-            rollback();
+        try {
+            if (state == COMMITTED || state == ROLLED_BACK)
+                return;
 
-        synchronized (this) {
-            try {
-                while (!done())
-                    wait();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+            if (trackTimeout)
+                removeTimeoutHandler();
 
-                if (!done())
-                    throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
-                        this, e);
+            if (state != ROLLING_BACK && state != COMMITTING)
+                rollback();
+
+            synchronized (this) {
+                try {
+                    while (!done())
+                        wait();
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    if (!done())
+                        throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " +
+                            this, e);
+                }
             }
         }
+        finally {
+            boolean cleanup = state == ROLLED_BACK && timedOut();
 
-        if (accessMap != null) {
-            assert optimistic();
+            if (cleanup)
+                cctx.tm().clearThreadMap(this);
 
-            for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
-                if (e.getValue().entries() != null) {
-                    GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId());
+            if (accessMap != null) {
+                assert optimistic();
 
-                    if (cctx0.isNear())
-                        cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
-                    else
-                        cctx0.dht().sendTtlUpdateRequest(e.getValue());
+                for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : accessMap.entrySet()) {
+                    if (e.getValue().entries() != null) {
+                        GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId());
+
+                        if (cctx0.isNear())
+                            cctx0.near().dht().sendTtlUpdateRequest(e.getValue());
+                        else
+                            cctx0.dht().sendTtlUpdateRequest(e.getValue());
+                    }
                 }
-            }
 
-            accessMap = null;
+                accessMap = null;
+            }
         }
     }
 
@@ -4000,13 +4058,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
 
     /**
      * @param threadId new owner of transaction.
-     * @throws IgniteCheckedException if method executed not in the middle of resume or suspend.
      */
     public void threadId(long threadId) {
         this.threadId = threadId;
     }
 
     /**
+     * @return {@code True} if need register callback which cancels tx on timeout.
+     */
+    public boolean trackTimeout() {
+        return trackTimeout;
+    }
+
+    /**
+     * Removes timeout handler.
+     *
+     * @return {@code True} if handler was removed.
+     */
+    public boolean removeTimeoutHandler() {
+        assert trackTimeout;
+
+        return cctx.time().removeTimeoutObject(this);
+    }
+
+    /**
+     * @param ignoreDuplicated {@code True} if do need check for duplicated timeout object.
+     */
+    public void addTimeoutHandler(boolean ignoreDuplicated) {
+        assert trackTimeout;
+
+        cctx.time().addTimeoutObject(this, ignoreDuplicated);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return xid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return startTime() + timeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
+            if (log.isDebugEnabled())
+                log.debug("Will rollback tx on timeout: " + this);
+
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    rollbackNearTxLocalAsync(true);
+                }
+            });
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Skip rollback tx on timeout: " + this);
+        }
+    }
+
+    /**
      * Post-lock closure.
      *
      * @param <T> Return type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
new file mode 100644
index 0000000..132c754
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/NearTxFinishFuture.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+
+/**
+ *
+ */
+public interface NearTxFinishFuture extends IgniteInternalFuture<IgniteInternalTx> {
+    /**
+     * @return Commit flag.
+     */
+    boolean commit();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7598003..9e06d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -634,4 +634,4 @@ public interface IgniteInternalTx {
      * @param e Commit error.
      */
     public void commitError(Throwable e);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index c447436..b5178b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -370,6 +370,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
     }
 
+    /**
+     * @return Shared cache context.
+     */
+    public GridCacheSharedContext<?, ?> context() {
+        return cctx;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean localResult() {
         assert originatingNodeId() != null;
@@ -987,7 +994,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
      * @return {@code True} if state changed.
      */
     @SuppressWarnings({"TooBroadScope"})
-    protected boolean state(TransactionState state, boolean timedOut) {
+    protected final boolean state(TransactionState state, boolean timedOut) {
         boolean valid = false;
 
         TransactionState prev;
@@ -1068,7 +1075,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
             if (valid) {
                 this.state = state;
-                this.timedOut = timedOut;
+
+                if (timedOut)
+                    this.timedOut = true;
 
                 if (log.isDebugEnabled())
                     log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 7ab921c..9fa0ff7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -901,7 +901,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @param nodeStop If {@code true} tx is cancelled on node stop.
      * @throws IgniteCheckedException If failed.
      */
-    public void tmFinish(boolean commit, boolean nodeStop) throws IgniteCheckedException {
+    public void tmFinish(boolean commit, boolean nodeStop, boolean clearThreadMap) throws IgniteCheckedException {
         assert onePhaseCommit();
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
@@ -910,7 +910,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 if (commit)
                     cctx.tm().commitTx(this);
                 else
-                    cctx.tm().rollbackTx(this);
+                    cctx.tm().rollbackTx(this, clearThreadMap);
             }
 
             state(commit ? COMMITTED : ROLLED_BACK);
@@ -957,7 +957,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
-    @Override public void userRollback() throws IgniteCheckedException {
+    @Override public void userRollback(boolean clearThreadMap) throws IgniteCheckedException {
         TransactionState state = state();
 
         if (state != ROLLING_BACK && state != ROLLED_BACK) {
@@ -975,7 +975,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         }
 
         if (DONE_FLAG_UPD.compareAndSet(this, 0, 1)) {
-            cctx.tm().rollbackTx(this);
+            cctx.tm().rollbackTx(this, clearThreadMap);
 
             if (!internal()) {
                 Collection<CacheStoreManager> stores = txState.stores(cctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 307c348..e0aa7a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -43,7 +43,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     /**
      * @throws IgniteCheckedException If rollback failed.
      */
-    public void userRollback() throws IgniteCheckedException;
+    public void userRollback(boolean clearThreadMap) throws IgniteCheckedException;
 
     /**
      * Finishes transaction (either commit or rollback).
@@ -52,5 +52,5 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @return {@code True} if state has been changed.
      * @throws IgniteCheckedException If finish failed.
      */
-    public boolean localFinish(boolean commit) throws IgniteCheckedException;
+    public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 9a8280f..208bd61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -291,7 +291,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     public void rollbackTransactionsForCache(int cacheId) {
         rollbackTransactionsForCache(cacheId, nearIdMap);
 
-        rollbackTransactionsForCache(cacheId, threadMap);
+        rollbackTransactionsForCache(cacheId, idMap);
     }
 
     /**
@@ -304,7 +304,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
-                    rollbackTx(tx);
+                    rollbackTx(tx, false);
 
                     break;
                 }
@@ -316,8 +316,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
-            rollbackTx(e.getValue());
+        for (IgniteInternalTx tx : idMap.values())
+            rollbackTx(tx, true);
+        for (IgniteInternalTx tx : nearIdMap.values())
+            rollbackTx(tx, true);
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.");
@@ -378,6 +380,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
         X.println(">>>   threadMapSize: " + threadMap.size());
         X.println(">>>   idMap [size=" + idMap.size() + ']');
+        X.println(">>>   nearIdMap [size=" + nearIdMap.size() + ']');
         X.println(">>>   completedVersSortedSize: " + completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + completedVersHashMap.sizex());
     }
@@ -490,14 +493,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         IgniteInternalTx t;
 
         if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) {
-            // Add both, explicit and implicit transactions.
-            // Do not add remote and dht local transactions as remote node may have the same thread ID
-            // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
-                if (cacheCtx == null || !cacheCtx.systemTx())
-                    threadMap.put(tx.threadId(), tx);
-                else
-                    sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx);
+                assert tx instanceof GridNearTxLocal : tx;
+
+                if (!tx.implicit()) {
+                    if (cacheCtx == null || !cacheCtx.systemTx())
+                        threadMap.put(tx.threadId(), tx);
+                    else
+                        sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx);
+                }
             }
 
             // Handle mapped versions.
@@ -633,11 +637,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @return Local transaction.
      */
-    @SuppressWarnings({"unchecked"})
-    @Nullable public <T> T localTx() {
-        IgniteInternalTx tx = tx();
+    @Nullable public IgniteTxLocalAdapter localTx() {
+        IgniteTxLocalAdapter tx = tx();
 
-        return tx != null && tx.local() ? (T)tx : null;
+        return tx != null && tx.local() ? tx : null;
     }
 
     /**
@@ -719,15 +722,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return Local transaction.
-     */
-    @Nullable public IgniteInternalTx localTxx() {
-        IgniteInternalTx tx = tx();
-
-        return tx != null && tx.local() ? tx : null;
-    }
-
-    /**
      * @return User transaction for current thread.
      */
     @Nullable public GridNearTxLocal userTx() {
@@ -1215,32 +1209,32 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 collectPendingVersions(dhtTxLoc);
             }
 
-            // 4. Unlock write resources.
+            // 3. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 
-            // 5. Unlock read resources if required.
+            // 4. Unlock read resources if required.
             if (unlockReadEntries(tx))
                 unlockMultiple(tx, tx.readEntries());
 
-            // 6. Notify evictions.
+            // 5. Notify evictions.
             notifyEvictions(tx);
 
-            // 7. Remove obsolete entries from cache.
+            // 6. Remove obsolete entries from cache.
             removeObsolete(tx);
 
-            // 8. Assign transaction number at the end of transaction.
+            // 7. Assign transaction number at the end of transaction.
             tx.endVersion(cctx.versions().next(tx.topologyVersion()));
 
-            // 9. Remove from per-thread storage.
+            // 8. Remove from per-thread storage.
             clearThreadMap(tx);
 
-            // 10. Unregister explicit locks.
+            // 9. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty()) {
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
             }
 
-            // 11. Remove Near-2-DHT mappings.
+            // 10. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion) {
                 GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion();
 
@@ -1248,10 +1242,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     mappedVers.remove(mapped);
             }
 
-            // 12. Clear context.
+            // 11. Clear context.
             resetContext();
 
-            // 14. Update metrics.
+            // 12. Update metrics.
             if (!tx.dht() && tx.local()) {
                 if (!tx.system())
                     cctx.txMetrics().onTxCommit();
@@ -1276,7 +1270,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      *
      * @param tx Transaction to rollback.
      */
-    public void rollbackTx(IgniteInternalTx tx) {
+    public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap) {
         assert tx != null;
 
         if (log.isDebugEnabled())
@@ -1302,7 +1296,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             removeObsolete(tx);
 
             // 6. Remove from per-thread storage.
-            clearThreadMap(tx);
+            if (clearThreadMap)
+                clearThreadMap(tx);
 
             // 7. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
@@ -1427,8 +1422,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @param tx Transaction to clear.
      */
-    private void clearThreadMap(IgniteInternalTx tx) {
+    public void clearThreadMap(IgniteInternalTx tx) {
         if (tx.local() && !tx.dht()) {
+            assert tx instanceof GridNearTxLocal : tx;
+
             if (!tx.system())
                 threadMap.remove(tx.threadId(), tx);
             else {
@@ -2257,6 +2254,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @see #resumeTx(GridNearTxLocal)
      * @see GridNearTxLocal#suspend()
      * @see GridNearTxLocal#resume()
+     * @throws IgniteCheckedException If failed to suspend transaction.
      */
     public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException {
         assert tx != null && !tx.system() : tx;
@@ -2280,6 +2278,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @see #suspendTx(GridNearTxLocal)
      * @see GridNearTxLocal#suspend()
      * @see GridNearTxLocal#resume()
+     * @throws IgniteCheckedException If failed to resume tx.
      */
     public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
         assert tx != null && !tx.system() : tx;
@@ -2287,7 +2286,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         assert !transactionMap(tx).containsValue(tx) : tx;
         assert !haveSystemTxForThread(Thread.currentThread().getId());
 
-        if(!tx.state(ACTIVE)) {
+        if (!tx.state(ACTIVE)) {
             throw new IgniteCheckedException("Trying to resume transaction with incorrect state "
                 + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
         }
@@ -2295,10 +2294,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         long threadId = Thread.currentThread().getId();
 
         if (threadMap.putIfAbsent(threadId, tx) != null)
-            throw new IgniteCheckedException("Thread already start a transaction.");
+            throw new IgniteCheckedException("Thread already has started a transaction.");
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("Thread already start a transaction.");
+            throw new IgniteCheckedException("Thread already has started a transaction.");
 
         tx.threadId(threadId);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 8c71f76..9317391 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -85,21 +85,31 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
     /**
      * @param timeoutObj Timeout object.
      */
-    @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"})
     public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+        addTimeoutObject(timeoutObj, false);
+    }
+
+    /**
+     * @param timeoutObj Timeout object.
+     * @param ignoreDuplicated {@code True} if do need check for duplicated object.
+     */
+    @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"})
+    public boolean addTimeoutObject(GridTimeoutObject timeoutObj, boolean ignoreDuplicated) {
         if (timeoutObj.endTime() <= 0 || timeoutObj.endTime() == Long.MAX_VALUE)
             // Timeout will never happen.
-            return;
+            return false;
 
         boolean added = timeoutObjs.add(timeoutObj);
 
-        assert added : "Duplicate timeout object found: " + timeoutObj;
+        assert ignoreDuplicated || added : "Duplicate timeout object found: " + timeoutObj;
 
         if (timeoutObjs.firstx() == timeoutObj) {
             synchronized (mux) {
                 mux.notify(); // No need to notifyAll since we only have one thread.
             }
         }
+
+        return true;
     }
 
     /**
@@ -124,9 +134,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
 
     /**
      * @param timeoutObj Timeout object.
+     * @return {@code True} if timeout object was removed.
      */
-    public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
-        timeoutObjs.remove(timeoutObj);
+    public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) {
+        return timeoutObjs.remove(timeoutObj);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 2efa0cb..f2e17e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -185,6 +185,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
         try (final Transaction tx = ignite.transactions().txStart()) {
             assert tx != null;
 
+            cache.put("key0", "val0");
+
             sleepForTxFailure();
 
             cache.put("key", "val");
@@ -195,7 +197,19 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
             assert e.getCause() instanceof TransactionTimeoutException;
         }
 
+        assertNull(ignite.transactions().tx());
+
+        assert !cache.containsKey("key0");
         assert !cache.containsKey("key");
+
+        // New transaction must succeed.
+        try (final Transaction tx = ignite.transactions().txStart()) {
+            cache.put("key", "val");
+
+            tx.commit();
+        }
+
+        assert cache.containsKey("key");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
new file mode 100644
index 0000000..c8eac20
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteCacheThreadLocalTxTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleNode() throws Exception {
+        threadLocalTx(startGrid(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiNode() throws Exception {
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        startGrid(4);
+
+        for (Ignite node : G.allGrids())
+            threadLocalTx(node);
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void threadLocalTx(Ignite node) throws Exception {
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(2);
+
+        IgniteCache<Object, Object> cache = node.getOrCreateCache(ccfg);
+
+        checkNoTx(node);
+
+        boolean[] reads = {true, false};
+        boolean[] writes = {true, false};
+        int endOps = 5;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                for (boolean read : reads) {
+                    for (boolean write : writes) {
+                        for (int i = 0; i < endOps; i++)
+                            checkTx(concurrency, isolation, node, cache, read, write, i);
+                    }
+                }
+            }
+        }
+
+        checkNoTx(node);
+
+        cache.put(1, 1);
+
+        checkNoTx(node);
+    }
+
+    /**
+     * @param concurrency Tx concurrency.
+     * @param isolation Tx isolation.
+     * @param node Node.
+     * @param cache Cache.
+     * @param read {@code True} if read in tx.
+     * @param write {@code True} if write in tx.
+     * @param endOp Operation to test.
+     */
+    private void checkTx(TransactionConcurrency concurrency,
+        TransactionIsolation isolation,
+        Ignite node,
+        IgniteCache<Object, Object> cache,
+        boolean read,
+        boolean write,
+        int endOp) {
+        IgniteTransactions txs = node.transactions();
+
+        checkNoTx(node);
+
+        Transaction tx = txs.txStart(concurrency, isolation);
+
+        assertEquals(tx, txs.tx());
+
+        try {
+            txs.txStart(concurrency, isolation);
+
+            fail();
+        }
+        catch (IllegalStateException expected) {
+            // No-op.
+        }
+
+        if (read)
+            cache.get(ThreadLocalRandom.current().nextInt(100_000));
+
+        if (write)
+            cache.put(ThreadLocalRandom.current().nextInt(100_000), 1);
+
+
+        try {
+            txs.txStart(concurrency, isolation);
+
+            fail();
+        }
+        catch (IllegalStateException expected) {
+            // No-op.
+        }
+
+        assertEquals(tx, txs.tx());
+
+        IgniteFuture fut = null;
+
+        switch (endOp) {
+            case 0:
+                tx.commit();
+
+                break;
+
+            case 1:
+                fut = tx.commitAsync();
+
+                break;
+
+            case 2:
+                tx.rollback();
+
+                break;
+
+            case 3:
+                fut = tx.rollbackAsync();
+
+                break;
+
+            case 4:
+                tx.close();
+
+                break;
+
+            default:
+                fail();
+        }
+
+        if (fut != null)
+            fut.get();
+
+        checkNoTx(node);
+    }
+
+    /**
+     * @param node Node.
+     */
+    private void checkNoTx(Ignite node) {
+        IgniteTransactions txs = node.transactions();
+
+        assertNull(txs.tx());
+        assertNull(((IgniteKernal)node).context().cache().context().tm().tx());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
index 37003a7..86c0fa4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -46,7 +46,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
-import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 
@@ -60,6 +59,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     /** Future timeout */
     private static final int FUT_TIMEOUT = 5000;
 
+    /** */
     private boolean client = false;
 
     /**
@@ -442,7 +442,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
                         }
                     }, TransactionTimeoutException.class);
 
-                    assertEquals(MARKED_ROLLBACK, tx.state());
+                    assertEquals(ROLLED_BACK, tx.state());
 
                     tx.close();
                 }
@@ -476,7 +476,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
                         }
                     }, TransactionTimeoutException.class);
 
-                    assertEquals(MARKED_ROLLBACK, tx.state());
+                    assertEquals(ROLLED_BACK, tx.state());
 
                     tx.close();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/56dbdc2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
new file mode 100644
index 0000000..aef63d0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedMultiNodeLongTxTimeout2FullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE / 2);
+
+        return cfg;
+    }
+}


Mime
View raw message