ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-6181 Tx rollback on timeout
Date Fri, 22 Sep 2017 08:20:26 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 27295f238 -> 5af30cf11


http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 9a8280f..77634bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -291,7 +291,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     public void rollbackTransactionsForCache(int cacheId) {
         rollbackTransactionsForCache(cacheId, nearIdMap);
 
-        rollbackTransactionsForCache(cacheId, threadMap);
+        rollbackTransactionsForCache(cacheId, idMap);
     }
 
     /**
@@ -304,7 +304,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
-                    rollbackTx(tx);
+                    rollbackTx(tx, false);
 
                     break;
                 }
@@ -316,8 +316,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
-            rollbackTx(e.getValue());
+        for (IgniteInternalTx tx : idMap.values())
+            rollbackTx(tx, true);
+        for (IgniteInternalTx tx : nearIdMap.values())
+            rollbackTx(tx, true);
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected.");
@@ -378,6 +380,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']');
         X.println(">>>   threadMapSize: " + threadMap.size());
         X.println(">>>   idMap [size=" + idMap.size() + ']');
+        X.println(">>>   nearIdMap [size=" + nearIdMap.size() + ']');
         X.println(">>>   completedVersSortedSize: " + completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + completedVersHashMap.sizex());
     }
@@ -490,14 +493,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         IgniteInternalTx t;
 
         if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) {
-            // Add both, explicit and implicit transactions.
-            // Do not add remote and dht local transactions as remote node may have the same thread ID
-            // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
-                if (cacheCtx == null || !cacheCtx.systemTx())
-                    threadMap.put(tx.threadId(), tx);
-                else
-                    sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx);
+                assert tx instanceof GridNearTxLocal : tx;
+
+                if (!tx.implicit()) {
+                    if (cacheCtx == null || !cacheCtx.systemTx())
+                        threadMap.put(tx.threadId(), tx);
+                    else
+                        sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx);
+                }
             }
 
             // Handle mapped versions.
@@ -633,11 +637,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @return Local transaction.
      */
-    @SuppressWarnings({"unchecked"})
-    @Nullable public <T> T localTx() {
-        IgniteInternalTx tx = tx();
+    @Nullable public IgniteTxLocalAdapter localTx() {
+        IgniteTxLocalAdapter tx = tx();
 
-        return tx != null && tx.local() ? (T)tx : null;
+        return tx != null && tx.local() ? tx : null;
     }
 
     /**
@@ -719,15 +722,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @return Local transaction.
-     */
-    @Nullable public IgniteInternalTx localTxx() {
-        IgniteInternalTx tx = tx();
-
-        return tx != null && tx.local() ? tx : null;
-    }
-
-    /**
      * @return User transaction for current thread.
      */
     @Nullable public GridNearTxLocal userTx() {
@@ -1215,32 +1209,32 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 collectPendingVersions(dhtTxLoc);
             }
 
-            // 4. Unlock write resources.
+            // 3. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 
-            // 5. Unlock read resources if required.
+            // 4. Unlock read resources if required.
             if (unlockReadEntries(tx))
                 unlockMultiple(tx, tx.readEntries());
 
-            // 6. Notify evictions.
+            // 5. Notify evictions.
             notifyEvictions(tx);
 
-            // 7. Remove obsolete entries from cache.
+            // 6. Remove obsolete entries from cache.
             removeObsolete(tx);
 
-            // 8. Assign transaction number at the end of transaction.
+            // 7. Assign transaction number at the end of transaction.
             tx.endVersion(cctx.versions().next(tx.topologyVersion()));
 
-            // 9. Remove from per-thread storage.
+            // 8. Remove from per-thread storage.
             clearThreadMap(tx);
 
-            // 10. Unregister explicit locks.
+            // 9. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty()) {
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
             }
 
-            // 11. Remove Near-2-DHT mappings.
+            // 10. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion) {
                 GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion();
 
@@ -1248,10 +1242,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     mappedVers.remove(mapped);
             }
 
-            // 12. Clear context.
+            // 11. Clear context.
             resetContext();
 
-            // 14. Update metrics.
+            // 12. Update metrics.
             if (!tx.dht() && tx.local()) {
                 if (!tx.system())
                     cctx.txMetrics().onTxCommit();
@@ -1275,8 +1269,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * Rolls back a transaction.
      *
      * @param tx Transaction to rollback.
+     * @param clearThreadMap {@code True} if need remove tx from thread map.
      */
-    public void rollbackTx(IgniteInternalTx tx) {
+    public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap) {
         assert tx != null;
 
         if (log.isDebugEnabled())
@@ -1302,7 +1297,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             removeObsolete(tx);
 
             // 6. Remove from per-thread storage.
-            clearThreadMap(tx);
+            if (clearThreadMap)
+                clearThreadMap(tx);
 
             // 7. Unregister explicit locks.
             if (!tx.alternateVersions().isEmpty())
@@ -1427,8 +1423,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @param tx Transaction to clear.
      */
-    private void clearThreadMap(IgniteInternalTx tx) {
+    public void clearThreadMap(IgniteInternalTx tx) {
         if (tx.local() && !tx.dht()) {
+            assert tx instanceof GridNearTxLocal : tx;
+
             if (!tx.system())
                 threadMap.remove(tx.threadId(), tx);
             else {
@@ -2257,6 +2255,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @see #resumeTx(GridNearTxLocal)
      * @see GridNearTxLocal#suspend()
      * @see GridNearTxLocal#resume()
+     * @throws IgniteCheckedException If failed to suspend transaction.
      */
     public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException {
         assert tx != null && !tx.system() : tx;
@@ -2280,6 +2279,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @see #suspendTx(GridNearTxLocal)
      * @see GridNearTxLocal#suspend()
      * @see GridNearTxLocal#resume()
+     * @throws IgniteCheckedException If failed to resume tx.
      */
     public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
         assert tx != null && !tx.system() : tx;
@@ -2287,7 +2287,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         assert !transactionMap(tx).containsValue(tx) : tx;
         assert !haveSystemTxForThread(Thread.currentThread().getId());
 
-        if(!tx.state(ACTIVE)) {
+        if (!tx.state(ACTIVE)) {
             throw new IgniteCheckedException("Trying to resume transaction with incorrect state "
                 + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
         }
@@ -2295,10 +2295,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         long threadId = Thread.currentThread().getId();
 
         if (threadMap.putIfAbsent(threadId, tx) != null)
-            throw new IgniteCheckedException("Thread already start a transaction.");
+            throw new IgniteCheckedException("Thread already has started a transaction.");
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("Thread already start a transaction.");
+            throw new IgniteCheckedException("Thread already has started a transaction.");
 
         tx.threadId(threadId);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 8c71f76..ff6beb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -84,12 +84,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
 
     /**
      * @param timeoutObj Timeout object.
+     * @return {@code True} if object was added.
      */
     @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"})
-    public void addTimeoutObject(GridTimeoutObject timeoutObj) {
+    public boolean addTimeoutObject(GridTimeoutObject timeoutObj) {
         if (timeoutObj.endTime() <= 0 || timeoutObj.endTime() == Long.MAX_VALUE)
             // Timeout will never happen.
-            return;
+            return false;
 
         boolean added = timeoutObjs.add(timeoutObj);
 
@@ -100,6 +101,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
                 mux.notify(); // No need to notifyAll since we only have one thread.
             }
         }
+
+        return true;
     }
 
     /**
@@ -124,9 +127,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
 
     /**
      * @param timeoutObj Timeout object.
+     * @return {@code True} if timeout object was removed.
      */
-    public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
-        timeoutObjs.remove(timeoutObj);
+    public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) {
+        return timeoutObjs.remove(timeoutObj);
     }
 
     /**
@@ -149,13 +153,14 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
                     GridTimeoutObject timeoutObj = iter.next();
 
                     if (timeoutObj.endTime() <= now) {
-                        iter.remove();
+                        try {
+                            boolean rmvd = timeoutObjs.remove(timeoutObj);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Timeout has occurred: " + timeoutObj);
+                            if (log.isDebugEnabled())
+                                log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']');
 
-                        try {
-                            timeoutObj.onTimeout();
+                            if (rmvd)
+                                timeoutObj.onTimeout();
                         }
                         catch (Throwable e) {
                             if (isCancelled() && !(e instanceof Error)){

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
index 9f4910a..79316bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFastFinishFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -210,8 +211,7 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest {
         IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx();
 
         assertNull(fieldValue(tx0, "prepFut"));
-        assertNull(fieldValue(tx0, "commitFut"));
-        assertNull(fieldValue(tx0, "rollbackFut"));
+        assertTrue(fieldValue(tx0, "finishFut") instanceof GridNearTxFastFinishFuture);
     }
 
     /**
@@ -225,12 +225,13 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest {
             tx.commit();
 
             assertNotNull(fieldValue(tx0, "prepFut"));
-            assertNotNull(fieldValue(tx0, "commitFut"));
+            assertNotNull(fieldValue(tx0, "finishFut"));
         }
         else {
             tx.rollback();
 
-            assertNotNull(fieldValue(tx0, "rollbackFut"));
+            assertNull(fieldValue(tx0, "prepFut"));
+            assertNotNull(fieldValue(tx0, "finishFut"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
index 2efa0cb..f2e17e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java
@@ -185,6 +185,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
         try (final Transaction tx = ignite.transactions().txStart()) {
             assert tx != null;
 
+            cache.put("key0", "val0");
+
             sleepForTxFailure();
 
             cache.put("key", "val");
@@ -195,7 +197,19 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest {
             assert e.getCause() instanceof TransactionTimeoutException;
         }
 
+        assertNull(ignite.transactions().tx());
+
+        assert !cache.containsKey("key0");
         assert !cache.containsKey("key");
+
+        // New transaction must succeed.
+        try (final Transaction tx = ignite.transactions().txStart()) {
+            cache.put("key", "val");
+
+            tx.commit();
+        }
+
+        assert cache.containsKey("key");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
new file mode 100644
index 0000000..c8eac20
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteCacheThreadLocalTxTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleNode() throws Exception {
+        threadLocalTx(startGrid(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiNode() throws Exception {
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        startGrid(4);
+
+        for (Ignite node : G.allGrids())
+            threadLocalTx(node);
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void threadLocalTx(Ignite node) throws Exception {
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(2);
+
+        IgniteCache<Object, Object> cache = node.getOrCreateCache(ccfg);
+
+        checkNoTx(node);
+
+        boolean[] reads = {true, false};
+        boolean[] writes = {true, false};
+        int endOps = 5;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                for (boolean read : reads) {
+                    for (boolean write : writes) {
+                        for (int i = 0; i < endOps; i++)
+                            checkTx(concurrency, isolation, node, cache, read, write, i);
+                    }
+                }
+            }
+        }
+
+        checkNoTx(node);
+
+        cache.put(1, 1);
+
+        checkNoTx(node);
+    }
+
+    /**
+     * @param concurrency Tx concurrency.
+     * @param isolation Tx isolation.
+     * @param node Node.
+     * @param cache Cache.
+     * @param read {@code True} if read in tx.
+     * @param write {@code True} if write in tx.
+     * @param endOp Operation to test.
+     */
+    private void checkTx(TransactionConcurrency concurrency,
+        TransactionIsolation isolation,
+        Ignite node,
+        IgniteCache<Object, Object> cache,
+        boolean read,
+        boolean write,
+        int endOp) {
+        IgniteTransactions txs = node.transactions();
+
+        checkNoTx(node);
+
+        Transaction tx = txs.txStart(concurrency, isolation);
+
+        assertEquals(tx, txs.tx());
+
+        try {
+            txs.txStart(concurrency, isolation);
+
+            fail();
+        }
+        catch (IllegalStateException expected) {
+            // No-op.
+        }
+
+        if (read)
+            cache.get(ThreadLocalRandom.current().nextInt(100_000));
+
+        if (write)
+            cache.put(ThreadLocalRandom.current().nextInt(100_000), 1);
+
+
+        try {
+            txs.txStart(concurrency, isolation);
+
+            fail();
+        }
+        catch (IllegalStateException expected) {
+            // No-op.
+        }
+
+        assertEquals(tx, txs.tx());
+
+        IgniteFuture fut = null;
+
+        switch (endOp) {
+            case 0:
+                tx.commit();
+
+                break;
+
+            case 1:
+                fut = tx.commitAsync();
+
+                break;
+
+            case 2:
+                tx.rollback();
+
+                break;
+
+            case 3:
+                fut = tx.rollbackAsync();
+
+                break;
+
+            case 4:
+                tx.close();
+
+                break;
+
+            default:
+                fail();
+        }
+
+        if (fut != null)
+            fut.get();
+
+        checkNoTx(node);
+    }
+
+    /**
+     * @param node Node.
+     */
+    private void checkNoTx(Ignite node) {
+        IgniteTransactions txs = node.transactions();
+
+        assertNull(txs.tx());
+        assertNull(((IgniteKernal)node).context().cache().context().tm().tx());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
index 37003a7..86c0fa4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -46,7 +46,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
 import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
-import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 
@@ -60,6 +59,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
     /** Future timeout */
     private static final int FUT_TIMEOUT = 5000;
 
+    /** */
     private boolean client = false;
 
     /**
@@ -442,7 +442,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
                         }
                     }, TransactionTimeoutException.class);
 
-                    assertEquals(MARKED_ROLLBACK, tx.state());
+                    assertEquals(ROLLED_BACK, tx.state());
 
                     tx.close();
                 }
@@ -476,7 +476,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest
                         }
                     }, TransactionTimeoutException.class);
 
-                    assertEquals(MARKED_ROLLBACK, tx.state());
+                    assertEquals(ROLLED_BACK, tx.state());
 
                     tx.close();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
new file mode 100644
index 0000000..aef63d0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class CachePartitionedMultiNodeLongTxTimeout2FullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE / 2);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java
new file mode 100644
index 0000000..7c1a6dd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+/**
+ *
+ */
+public class TxRollbackOnTimeoutNearCacheTest extends TxRollbackOnTimeoutTest {
+    /** {@inheritDoc} */
+    @Override protected boolean nearCacheEnabled() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java
new file mode 100644
index 0000000..5123329
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
+
+/**
+ * Tests an ability to eagerly rollback timed out transactions.
+ */
+public class TxRollbackOnTimeoutNoDeadlockDetectionTest extends TxRollbackOnTimeoutTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, "0");
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS);
+    }
+
+    /** */
+    @Override protected void validateDeadlockException(Exception e) {
+        assertEquals("TimeoutException is expected",
+            TransactionTimeoutException.class, e.getCause().getClass());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
new file mode 100644
index 0000000..e1c6c10
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java
@@ -0,0 +1,655 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionDeadlockException;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests an ability to eagerly rollback timed out transactions.
+ */
+public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest {
+    /** */
+    private static final long TX_MIN_TIMEOUT = 1;
+
+    /** */
+    private static final String CACHE_NAME = "test";
+
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        boolean client = "client".equals(igniteInstanceName);
+
+        cfg.setClientMode(client);
+
+        if (!client) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);
+
+            if (nearCacheEnabled())
+                ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+            ccfg.setAtomicityMode(TRANSACTIONAL);
+            ccfg.setBackups(2);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+            cfg.setCacheConfiguration(ccfg);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @return Near cache flag.
+     */
+    protected boolean nearCacheEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If f nodeailed.
+     * @return Started client.
+     */
+    private Ignite startClient() throws Exception {
+        Ignite client = startGrid("client");
+
+        assertTrue(client.configuration().isClientMode());
+
+        if (nearCacheEnabled())
+            client.createNearCache(CACHE_NAME, new NearCacheConfiguration<>());
+        else
+            assertNotNull(client.cache(CACHE_NAME));
+
+        return client;
+    }
+
+    /**
+     * @param e Exception.
+     */
+    protected void validateDeadlockException(Exception e) {
+        assertEquals("Deadlock report is expected",
+            TransactionDeadlockException.class, e.getCause().getCause().getClass());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockAndConcurrentTimeout() throws Exception {
+        startClient();
+
+        for (Ignite node : G.allGrids()) {
+            log.info("Test with node: " + node.name());
+
+            lock(node, false);
+
+            lock(node, false);
+
+            lock(node, true);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param retry {@code True}
+     * @throws Exception If failed.
+     */
+    private void lock(final Ignite node, final boolean retry) throws Exception {
+        final IgniteCache<Object, Object> cache = node.cache(CACHE_NAME);
+
+        final int KEYS_PER_THREAD = 10_000;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                int start = idx * KEYS_PER_THREAD;
+                int end = start + KEYS_PER_THREAD;
+
+                int locked = 0;
+
+                try {
+                    try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0)) {
+                        for (int i = start; i < end; i++) {
+                            cache.get(i);
+
+                            locked++;
+                        }
+
+                        tx.commit();
+                    }
+                }
+                catch (Exception e) {
+                    info("Expected error: " + e);
+                }
+
+                info("Done, locked: " + locked);
+
+                if (retry) {
+                    try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 10 * 60_000, 0)) {
+                        for (int i = start; i < end; i++)
+                            cache.get(i);
+
+                        cache.put(start, 0);
+
+                        tx.commit();
+                    }
+                }
+            }
+        }, Math.min(4, Runtime.getRuntime().availableProcessors()), "tx-thread");
+    }
+
+    /**
+     * Tests if timeout on first tx unblocks second tx waiting for the locked key.
+     *
+     * @throws Exception If failed.
+     */
+    public void testWaitingTxUnblockedOnTimeout() throws Exception {
+        waitingTxUnblockedOnTimeout(grid(0), grid(0));
+
+        waitingTxUnblockedOnTimeout(grid(0), grid(1));
+
+        Ignite client = startClient();
+
+        waitingTxUnblockedOnTimeout(grid(0), client);
+
+        waitingTxUnblockedOnTimeout(grid(1), client);
+
+        waitingTxUnblockedOnTimeout(client, grid(0));
+
+        waitingTxUnblockedOnTimeout(client, grid(1));
+
+        waitingTxUnblockedOnTimeout(client, client);
+    }
+
+    /**
+     * Tests if timeout on first tx unblocks second tx waiting for the locked key.
+     *
+     * @throws Exception If failed.
+     */
+    public void testWaitingTxUnblockedOnThreadDeath() throws Exception {
+        waitingTxUnblockedOnThreadDeath(grid(0), grid(0));
+
+        waitingTxUnblockedOnThreadDeath(grid(0), grid(1));
+
+        Ignite client = startClient();
+
+        waitingTxUnblockedOnThreadDeath(grid(0), client);
+
+        waitingTxUnblockedOnThreadDeath(grid(1), client);
+
+        waitingTxUnblockedOnThreadDeath(client, grid(0));
+
+        waitingTxUnblockedOnThreadDeath(client, grid(1));
+
+        waitingTxUnblockedOnThreadDeath(client, client);
+    }
+
+    /**
+     * Tests if deadlock is resolved on timeout with correct message.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDeadlockUnblockedOnTimeout() throws Exception {
+        deadlockUnblockedOnTimeout(ignite(0), ignite(1));
+
+        deadlockUnblockedOnTimeout(ignite(0), ignite(0));
+
+        Ignite client = startClient();
+
+        deadlockUnblockedOnTimeout(ignite(0), client);
+
+        deadlockUnblockedOnTimeout(client, ignite(0));
+    }
+
+    /**
+     * Tests if deadlock is resolved on timeout with correct message.
+     *
+     * @param node1 First node.
+     * @param node2 Second node.
+     * @throws Exception If failed.
+     */
+    private void deadlockUnblockedOnTimeout(final Ignite node1, final Ignite node2) throws Exception {
+        info("Start test [node1=" + node1.name() + ", node2=" + node2.name() + ']');
+
+        final CountDownLatch l = new CountDownLatch(2);
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 5000, 2)) {
+                        node1.cache(CACHE_NAME).put(1, 10);
+
+                        l.countDown();
+
+                        U.awaitQuiet(l);
+
+                        node1.cache(CACHE_NAME).put(2, 20);
+
+                        tx.commit();
+
+                        fail();
+                    }
+                }
+                catch (CacheException e) {
+                    // No-op.
+                    validateDeadlockException(e);
+                }
+            }
+        }, "First");
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) {
+                    node2.cache(CACHE_NAME).put(2, 2);
+
+                    l.countDown();
+
+                    U.awaitQuiet(l);
+
+                    node2.cache(CACHE_NAME).put(1, 1);
+
+                    tx.commit();
+                }
+            }
+        }, "Second");
+
+        fut1.get();
+        fut2.get();
+
+        assertTrue("Expecting committed key 2", node1.cache(CACHE_NAME).get(2) != null);
+        assertTrue("Expecting committed key 1", node1.cache(CACHE_NAME).get(1) != null);
+
+        node1.cache(CACHE_NAME).removeAll(F.asSet(1, 2));
+    }
+
+    /**
+     * Tests timeout object cleanup on tx commit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTimeoutRemoval() throws Exception {
+        IgniteEx client = (IgniteEx)startClient();
+
+        final long TX_TIMEOUT = 250;
+
+        final int modesCnt = 5;
+
+        for (int i = 0; i < modesCnt; i++)
+            testTimeoutRemoval0(grid(0), i, TX_TIMEOUT);
+
+        for (int i = 0; i < modesCnt; i++)
+            testTimeoutRemoval0(client, i, TX_TIMEOUT);
+
+        for (int i = 0; i < modesCnt; i++)
+            testTimeoutRemoval0(grid(0), i, TX_MIN_TIMEOUT);
+
+        for (int i = 0; i < modesCnt; i++)
+            testTimeoutRemoval0(client, i, TX_MIN_TIMEOUT);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        // Repeat with more iterations to make sure everything is cleared.
+        for (int i = 0; i < 500; i++)
+            testTimeoutRemoval0(client, rnd.nextInt(modesCnt), TX_MIN_TIMEOUT);
+    }
+
+    /**
+     * Tests timeouts in all tx configurations.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSimple() throws Exception {
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values())
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                for (int op = 0; op < 4; op++)
+                    testSimple0(concurrency, isolation, op);
+            }
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @param op Operation to test.
+     * @throws Exception If failed.
+     */
+    private void testSimple0(TransactionConcurrency concurrency, TransactionIsolation isolation, int op) throws Exception {
+        Ignite near = grid(0);
+
+        final int key = 1, val = 1;
+
+        final long TX_TIMEOUT = 250;
+
+        IgniteCache<Object, Object> cache = near.cache(CACHE_NAME);
+
+        try (Transaction tx = near.transactions().txStart(concurrency, isolation, TX_TIMEOUT, 1)) {
+            cache.put(key, val);
+
+            U.sleep(TX_TIMEOUT * 2);
+
+            try {
+                switch (op) {
+                    case 0:
+                        cache.put(key + 1, val);
+
+                        break;
+
+                    case 1:
+                        cache.remove(key + 1);
+
+                        break;
+
+                    case 2:
+                        cache.get(key + 1);
+
+                        break;
+
+                    case 3:
+                        tx.commit();
+
+                        break;
+
+                    default:
+                        fail();
+                }
+
+                fail("Tx must timeout");
+            }
+            catch (CacheException | IgniteException e) {
+                assertTrue("Expected exception: " + e, X.hasCause(e, TransactionTimeoutException.class));
+            }
+        }
+
+        assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key));
+        assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key + 1));
+
+        assertNull(near.transactions().tx());
+    }
+
+    /**
+     * @param near Node.
+     * @param mode Test mode.
+     *
+     * @param timeout Tx timeout.
+     * @throws Exception If failed.
+     */
+    private void testTimeoutRemoval0(IgniteEx near, int mode, long timeout) throws Exception {
+        Throwable saved = null;
+
+        try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 1)) {
+            near.cache(CACHE_NAME).put(1, 1);
+
+            switch (mode) {
+                case 0:
+                    tx.commit();
+                    break;
+
+                case 1:
+                    tx.commitAsync().get();
+                    break;
+
+                case 2:
+                    tx.rollback();
+                    break;
+
+                case 3:
+                    tx.rollbackAsync().get();
+                    break;
+
+                case 4:
+                    break;
+
+                default:
+                    fail();
+            }
+        }
+        catch (Throwable t) {
+            saved = t;
+        }
+
+        Collection set = U.field(near.context().cache().context().time(), "timeoutObjs");
+
+        for (Object obj : set) {
+            if (obj.getClass().isAssignableFrom(GridNearTxLocal.class)) {
+                log.error("Last saved exception: " + saved, saved);
+
+                fail("Not removed [mode=" + mode + ", timeout=" + timeout + ", tx=" + obj +']');
+            }
+        }
+    }
+
+    /**
+     * @param near Node starting tx which is timed out.
+     * @param other Node starting second tx.
+     * @throws Exception If failed.
+     */
+    private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other) throws Exception {
+        waitingTxUnblockedOnTimeout(near, other, 1000);
+
+        waitingTxUnblockedOnTimeout(near, other, 50);
+    }
+
+    /**
+     * @param near Node starting tx which is timed out.
+     * @param other Node starting second tx.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
+     */
+    private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other, final long timeout) throws Exception {
+        info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']');
+
+        final CountDownLatch blocked = new CountDownLatch(1);
+
+        final CountDownLatch unblocked = new CountDownLatch(1);
+
+        final int recordsCnt = 5;
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) {
+                    try {
+                        for (int i = 0; i < recordsCnt; i++)
+                            near.cache(CACHE_NAME).put(i, i);
+
+                        info("Locked all keys.");
+                    }
+                    catch (CacheException e) {
+                        info("Failed to lock keys: " + e);
+                    }
+                    finally {
+                        blocked.countDown();
+                    }
+
+                    // Will be unblocked after tx timeout occurs.
+                    U.awaitQuiet(unblocked);
+
+                    try {
+                        near.cache(CACHE_NAME).put(0, 0);
+
+                        fail();
+                    }
+                    catch (CacheException e) {
+                        log.info("Expecting error: " + e.getMessage());
+                    }
+
+                    try {
+                        tx.commit();
+
+                        fail();
+                    }
+                    catch (IgniteException e) {
+                        log.info("Expecting error: " + e.getMessage());
+                    }
+                }
+
+                // Check thread is able to start new tx.
+                try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 0)) {
+                    for (int i = 0; i < recordsCnt; i++)
+                        near.cache(CACHE_NAME).put(i, i);
+
+                    tx.commit();
+                }
+            }
+        }, "First");
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                U.awaitQuiet(blocked);
+
+                try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) {
+                    for (int i = 0; i < recordsCnt; i++)
+                        other.cache(CACHE_NAME).put(i, i);
+
+                    // Will wait until timeout on first tx will unblock put.
+                    tx.commit();
+                }
+            }
+        }, "Second");
+
+        fut2.get();
+
+        unblocked.countDown();
+
+        fut1.get();
+    }
+
+    /**
+     * @param near Node starting tx which is timed out.
+     * @param other Node starting second tx.
+     * @throws Exception If failed.
+     */
+    private void waitingTxUnblockedOnThreadDeath(final Ignite near, final Ignite other) throws Exception {
+        waitingTxUnblockedOnThreadDeath0(near, other, 10, 1000); // Try provoke timeout after all keys are locked.
+
+        waitingTxUnblockedOnThreadDeath0(near, other, 1000, 100);  // Try provoke timeout while trying to lock keys.
+    }
+
+    /**
+     * @param near Node starting tx which is timed out.
+     * @param other Node starting second tx.
+     * @param recordsCnt Number of records to locks.
+     * @param timeout Transaction timeout.
+     * @throws Exception If failed.
+     */
+    private void waitingTxUnblockedOnThreadDeath0(final Ignite near,
+        final Ignite other,
+        final int recordsCnt,
+        final long timeout)
+        throws Exception
+    {
+        info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']');
+
+        final CountDownLatch blocked = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, recordsCnt);
+
+                try {
+                    for (int i = 0; i < recordsCnt; i++)
+                        near.cache(CACHE_NAME).put(i, i);
+
+                    log.info("Locked all records.");
+                }
+                catch (Exception e) {
+                    log.info("Failed to locked all records: " + e);
+                }
+                finally {
+                    blocked.countDown();
+                }
+
+                throw new IgniteException("Failure");
+            }
+        }, 1, "First");
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                U.awaitQuiet(blocked);
+
+                try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, recordsCnt)) {
+                    for (int i = 0; i < recordsCnt; i++)
+                        other.cache(CACHE_NAME).put(i, i);
+
+                    // Will wait until timeout on first tx will unblock put.
+                    tx.commit();
+                }
+            }
+        }, 1, "Second");
+
+        try {
+            fut1.get();
+
+            fail();
+        }
+        catch (IgniteCheckedException e) {
+            // No-op.
+        }
+
+        fut2.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
index 164ff6a..b380ebc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledMultiNodeWithGroupFullApiSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapFullApiSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapMultiNodeFullApiSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeout2FullApiTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeoutFullApiTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest;
@@ -123,6 +124,7 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.class);
         suite.addTestSuite(GridCacheAtomicNearEnabledMultiNodeFullApiSelfTest.class);
         suite.addTestSuite(CachePartitionedMultiNodeLongTxTimeoutFullApiTest.class);
+        suite.addTestSuite(CachePartitionedMultiNodeLongTxTimeout2FullApiTest.class);
         suite.addTestSuite(CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.class);
 
         suite.addTestSuite(GridCachePartitionedNearDisabledMultiNodeFullApiSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index a0fb19b..7c71381 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -21,9 +21,14 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
+import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
+
 /**
  * Test suite.
  */
@@ -44,6 +49,11 @@ public class IgniteCacheTestSuite6 extends TestSuite {
 
         suite.addTestSuite(CacheExchangeMergeTest.class);
 
+        suite.addTestSuite(TxRollbackOnTimeoutTest.class);
+        suite.addTestSuite(TxRollbackOnTimeoutNoDeadlockDetectionTest.class);
+        suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class);
+        suite.addTestSuite(IgniteCacheThreadLocalTxTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
index e85baed..f404732 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
@@ -25,11 +25,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 /**
  * Test checks whether hadoop system cache doesn't use user defined TX config.
  */
-public class HadoopTxConfigCacheTest  extends IgniteTxConfigCacheSelfTest {
+public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest {
     /**
      * Success if system caches weren't timed out.
      *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testSystemCacheTx() throws Exception {
         final Ignite ignite = grid(0);


Mime
View raw message