Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40DA0200D01 for ; Fri, 22 Sep 2017 10:47:10 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3F4E01609E6; Fri, 22 Sep 2017 08:47:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 147E41609A7 for ; Fri, 22 Sep 2017 10:47:07 +0200 (CEST) Received: (qmail 52707 invoked by uid 500); 22 Sep 2017 08:20:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 52689 invoked by uid 99); 22 Sep 2017 08:20:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 08:20:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 02D52F54DD; Fri, 22 Sep 2017 08:20:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 22 Sep 2017 08:20:26 -0000 Message-Id: <44b9a3a29f2a427f8888e3ef331e06af@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: ignite-6181 Tx rollback on timeout archived-at: Fri, 22 Sep 2017 08:47:10 -0000 Repository: ignite Updated Branches: refs/heads/master 27295f238 -> 5af30cf11 http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 9a8280f..77634bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -291,7 +291,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { public void rollbackTransactionsForCache(int cacheId) { rollbackTransactionsForCache(cacheId, nearIdMap); - rollbackTransactionsForCache(cacheId, threadMap); + rollbackTransactionsForCache(cacheId, idMap); } /** @@ -304,7 +304,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { for (IgniteTxEntry entry : tx.allEntries()) { if (entry.cacheId() == cacheId) { - rollbackTx(tx); + rollbackTx(tx, false); break; } @@ -316,8 +316,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (Map.Entry e : threadMap.entrySet()) - rollbackTx(e.getValue()); + for (IgniteInternalTx tx : idMap.values()) + rollbackTx(tx, true); + for (IgniteInternalTx tx : nearIdMap.values()) + rollbackTx(tx, true); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -378,6 +380,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); X.println(">>> threadMapSize: " + threadMap.size()); X.println(">>> idMap [size=" + idMap.size() + ']'); + X.println(">>> nearIdMap [size=" + nearIdMap.size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } @@ -490,14 +493,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { IgniteInternalTx t; if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) { - // Add both, explicit and implicit transactions. - // Do not add remote and dht local transactions as remote node may have the same thread ID - // and overwrite local transaction. if (tx.local() && !tx.dht()) { - if (cacheCtx == null || !cacheCtx.systemTx()) - threadMap.put(tx.threadId(), tx); - else - sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); + assert tx instanceof GridNearTxLocal : tx; + + if (!tx.implicit()) { + if (cacheCtx == null || !cacheCtx.systemTx()) + threadMap.put(tx.threadId(), tx); + else + sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); + } } // Handle mapped versions. @@ -633,11 +637,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @return Local transaction. */ - @SuppressWarnings({"unchecked"}) - @Nullable public T localTx() { - IgniteInternalTx tx = tx(); + @Nullable public IgniteTxLocalAdapter localTx() { + IgniteTxLocalAdapter tx = tx(); - return tx != null && tx.local() ? (T)tx : null; + return tx != null && tx.local() ? tx : null; } /** @@ -719,15 +722,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @return Local transaction. - */ - @Nullable public IgniteInternalTx localTxx() { - IgniteInternalTx tx = tx(); - - return tx != null && tx.local() ? tx : null; - } - - /** * @return User transaction for current thread. */ @Nullable public GridNearTxLocal userTx() { @@ -1215,32 +1209,32 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { collectPendingVersions(dhtTxLoc); } - // 4. Unlock write resources. + // 3. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); - // 5. Unlock read resources if required. + // 4. Unlock read resources if required. if (unlockReadEntries(tx)) unlockMultiple(tx, tx.readEntries()); - // 6. Notify evictions. + // 5. Notify evictions. notifyEvictions(tx); - // 7. Remove obsolete entries from cache. + // 6. Remove obsolete entries from cache. removeObsolete(tx); - // 8. Assign transaction number at the end of transaction. + // 7. Assign transaction number at the end of transaction. tx.endVersion(cctx.versions().next(tx.topologyVersion())); - // 9. Remove from per-thread storage. + // 8. Remove from per-thread storage. clearThreadMap(tx); - // 10. Unregister explicit locks. + // 9. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); } - // 11. Remove Near-2-DHT mappings. + // 10. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) { GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); @@ -1248,10 +1242,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { mappedVers.remove(mapped); } - // 12. Clear context. + // 11. Clear context. resetContext(); - // 14. Update metrics. + // 12. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxCommit(); @@ -1275,8 +1269,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Rolls back a transaction. * * @param tx Transaction to rollback. + * @param clearThreadMap {@code True} if need remove tx from thread map. */ - public void rollbackTx(IgniteInternalTx tx) { + public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap) { assert tx != null; if (log.isDebugEnabled()) @@ -1302,7 +1297,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { removeObsolete(tx); // 6. Remove from per-thread storage. - clearThreadMap(tx); + if (clearThreadMap) + clearThreadMap(tx); // 7. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) @@ -1427,8 +1423,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Transaction to clear. */ - private void clearThreadMap(IgniteInternalTx tx) { + public void clearThreadMap(IgniteInternalTx tx) { if (tx.local() && !tx.dht()) { + assert tx instanceof GridNearTxLocal : tx; + if (!tx.system()) threadMap.remove(tx.threadId(), tx); else { @@ -2257,6 +2255,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @see #resumeTx(GridNearTxLocal) * @see GridNearTxLocal#suspend() * @see GridNearTxLocal#resume() + * @throws IgniteCheckedException If failed to suspend transaction. */ public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; @@ -2280,6 +2279,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @see #suspendTx(GridNearTxLocal) * @see GridNearTxLocal#suspend() * @see GridNearTxLocal#resume() + * @throws IgniteCheckedException If failed to resume tx. */ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; @@ -2287,7 +2287,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !transactionMap(tx).containsValue(tx) : tx; assert !haveSystemTxForThread(Thread.currentThread().getId()); - if(!tx.state(ACTIVE)) { + if (!tx.state(ACTIVE)) { throw new IgniteCheckedException("Trying to resume transaction with incorrect state " + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } @@ -2295,10 +2295,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { long threadId = Thread.currentThread().getId(); if (threadMap.putIfAbsent(threadId, tx) != null) - throw new IgniteCheckedException("Thread already start a transaction."); + throw new IgniteCheckedException("Thread already has started a transaction."); if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) - throw new IgniteCheckedException("Thread already start a transaction."); + throw new IgniteCheckedException("Thread already has started a transaction."); tx.threadId(threadId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index 8c71f76..ff6beb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -84,12 +84,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { /** * @param timeoutObj Timeout object. + * @return {@code True} if object was added. */ @SuppressWarnings({"NakedNotify", "CallToNotifyInsteadOfNotifyAll"}) - public void addTimeoutObject(GridTimeoutObject timeoutObj) { + public boolean addTimeoutObject(GridTimeoutObject timeoutObj) { if (timeoutObj.endTime() <= 0 || timeoutObj.endTime() == Long.MAX_VALUE) // Timeout will never happen. - return; + return false; boolean added = timeoutObjs.add(timeoutObj); @@ -100,6 +101,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { mux.notify(); // No need to notifyAll since we only have one thread. } } + + return true; } /** @@ -124,9 +127,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { /** * @param timeoutObj Timeout object. + * @return {@code True} if timeout object was removed. */ - public void removeTimeoutObject(GridTimeoutObject timeoutObj) { - timeoutObjs.remove(timeoutObj); + public boolean removeTimeoutObject(GridTimeoutObject timeoutObj) { + return timeoutObjs.remove(timeoutObj); } /** @@ -149,13 +153,14 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { GridTimeoutObject timeoutObj = iter.next(); if (timeoutObj.endTime() <= now) { - iter.remove(); + try { + boolean rmvd = timeoutObjs.remove(timeoutObj); - if (log.isDebugEnabled()) - log.debug("Timeout has occurred: " + timeoutObj); + if (log.isDebugEnabled()) + log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']'); - try { - timeoutObj.onTimeout(); + if (rmvd) + timeoutObj.onTimeout(); } catch (Throwable e) { if (isCancelled() && !(e instanceof Error)){ http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java index 9f4910a..79316bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheTxFastFinishTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteTransactions; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFastFinishFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -210,8 +211,7 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { IgniteInternalTx tx0 = ((TransactionProxyImpl)tx).tx(); assertNull(fieldValue(tx0, "prepFut")); - assertNull(fieldValue(tx0, "commitFut")); - assertNull(fieldValue(tx0, "rollbackFut")); + assertTrue(fieldValue(tx0, "finishFut") instanceof GridNearTxFastFinishFuture); } /** @@ -225,12 +225,13 @@ public class CacheTxFastFinishTest extends GridCommonAbstractTest { tx.commit(); assertNotNull(fieldValue(tx0, "prepFut")); - assertNotNull(fieldValue(tx0, "commitFut")); + assertNotNull(fieldValue(tx0, "finishFut")); } else { tx.rollback(); - assertNotNull(fieldValue(tx0, "rollbackFut")); + assertNull(fieldValue(tx0, "prepFut")); + assertNotNull(fieldValue(tx0, "finishFut")); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 2efa0cb..f2e17e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -185,6 +185,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { try (final Transaction tx = ignite.transactions().txStart()) { assert tx != null; + cache.put("key0", "val0"); + sleepForTxFailure(); cache.put("key", "val"); @@ -195,7 +197,19 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { assert e.getCause() instanceof TransactionTimeoutException; } + assertNull(ignite.transactions().tx()); + + assert !cache.containsKey("key0"); assert !cache.containsKey("key"); + + // New transaction must succeed. + try (final Transaction tx = ignite.transactions().txStart()) { + cache.put("key", "val"); + + tx.commit(); + } + + assert cache.containsKey("key"); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java new file mode 100644 index 0000000..c8eac20 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheThreadLocalTxTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * + */ +public class IgniteCacheThreadLocalTxTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleNode() throws Exception { + threadLocalTx(startGrid(0)); + } + + /** + * @throws Exception If failed. + */ + public void testMultiNode() throws Exception { + startGridsMultiThreaded(4); + + client = true; + + startGrid(4); + + for (Ignite node : G.allGrids()) + threadLocalTx(node); + } + + /** + * @param node Node. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void threadLocalTx(Ignite node) throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(2); + + IgniteCache cache = node.getOrCreateCache(ccfg); + + checkNoTx(node); + + boolean[] reads = {true, false}; + boolean[] writes = {true, false}; + int endOps = 5; + + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (boolean read : reads) { + for (boolean write : writes) { + for (int i = 0; i < endOps; i++) + checkTx(concurrency, isolation, node, cache, read, write, i); + } + } + } + } + + checkNoTx(node); + + cache.put(1, 1); + + checkNoTx(node); + } + + /** + * @param concurrency Tx concurrency. + * @param isolation Tx isolation. + * @param node Node. + * @param cache Cache. + * @param read {@code True} if read in tx. + * @param write {@code True} if write in tx. + * @param endOp Operation to test. + */ + private void checkTx(TransactionConcurrency concurrency, + TransactionIsolation isolation, + Ignite node, + IgniteCache cache, + boolean read, + boolean write, + int endOp) { + IgniteTransactions txs = node.transactions(); + + checkNoTx(node); + + Transaction tx = txs.txStart(concurrency, isolation); + + assertEquals(tx, txs.tx()); + + try { + txs.txStart(concurrency, isolation); + + fail(); + } + catch (IllegalStateException expected) { + // No-op. + } + + if (read) + cache.get(ThreadLocalRandom.current().nextInt(100_000)); + + if (write) + cache.put(ThreadLocalRandom.current().nextInt(100_000), 1); + + + try { + txs.txStart(concurrency, isolation); + + fail(); + } + catch (IllegalStateException expected) { + // No-op. + } + + assertEquals(tx, txs.tx()); + + IgniteFuture fut = null; + + switch (endOp) { + case 0: + tx.commit(); + + break; + + case 1: + fut = tx.commitAsync(); + + break; + + case 2: + tx.rollback(); + + break; + + case 3: + fut = tx.rollbackAsync(); + + break; + + case 4: + tx.close(); + + break; + + default: + fail(); + } + + if (fut != null) + fut.get(); + + checkNoTx(node); + } + + /** + * @param node Node. + */ + private void checkNoTx(Ignite node) { + IgniteTransactions txs = node.transactions(); + + assertNull(txs.tx()); + assertNull(((IgniteKernal)node).context().cache().context().tm().tx()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java index 37003a7..86c0fa4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -46,7 +46,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; -import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.SUSPENDED; @@ -60,6 +59,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest /** Future timeout */ private static final int FUT_TIMEOUT = 5000; + /** */ private boolean client = false; /** @@ -442,7 +442,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest } }, TransactionTimeoutException.class); - assertEquals(MARKED_ROLLBACK, tx.state()); + assertEquals(ROLLED_BACK, tx.state()); tx.close(); } @@ -476,7 +476,7 @@ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest } }, TransactionTimeoutException.class); - assertEquals(MARKED_ROLLBACK, tx.state()); + assertEquals(ROLLED_BACK, tx.state()); tx.close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java new file mode 100644 index 0000000..aef63d0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedMultiNodeLongTxTimeout2FullApiTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class CachePartitionedMultiNodeLongTxTimeout2FullApiTest extends GridCachePartitionedMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE / 2); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java new file mode 100644 index 0000000..7c1a6dd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNearCacheTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +/** + * + */ +public class TxRollbackOnTimeoutNearCacheTest extends TxRollbackOnTimeoutTest { + /** {@inheritDoc} */ + @Override protected boolean nearCacheEnabled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java new file mode 100644 index 0000000..5123329 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutNoDeadlockDetectionTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; + +/** + * Tests an ability to eagerly rollback timed out transactions. + */ +public class TxRollbackOnTimeoutNoDeadlockDetectionTest extends TxRollbackOnTimeoutTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + System.setProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, "0"); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + System.clearProperty(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS); + } + + /** */ + @Override protected void validateDeadlockException(Exception e) { + assertEquals("TimeoutException is expected", + TransactionTimeoutException.class, e.getCause().getClass()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java new file mode 100644 index 0000000..e1c6c10 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -0,0 +1,655 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionDeadlockException; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests an ability to eagerly rollback timed out transactions. + */ +public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { + /** */ + private static final long TX_MIN_TIMEOUT = 1; + + /** */ + private static final String CACHE_NAME = "test"; + + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + boolean client = "client".equals(igniteInstanceName); + + cfg.setClientMode(client); + + if (!client) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME); + + if (nearCacheEnabled()) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(2); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + } + + return cfg; + } + + /** + * @return Near cache flag. + */ + protected boolean nearCacheEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If f nodeailed. + * @return Started client. + */ + private Ignite startClient() throws Exception { + Ignite client = startGrid("client"); + + assertTrue(client.configuration().isClientMode()); + + if (nearCacheEnabled()) + client.createNearCache(CACHE_NAME, new NearCacheConfiguration<>()); + else + assertNotNull(client.cache(CACHE_NAME)); + + return client; + } + + /** + * @param e Exception. + */ + protected void validateDeadlockException(Exception e) { + assertEquals("Deadlock report is expected", + TransactionDeadlockException.class, e.getCause().getCause().getClass()); + } + + /** + * @throws Exception If failed. + */ + public void testLockAndConcurrentTimeout() throws Exception { + startClient(); + + for (Ignite node : G.allGrids()) { + log.info("Test with node: " + node.name()); + + lock(node, false); + + lock(node, false); + + lock(node, true); + } + } + + /** + * @param node Node. + * @param retry {@code True} + * @throws Exception If failed. + */ + private void lock(final Ignite node, final boolean retry) throws Exception { + final IgniteCache cache = node.cache(CACHE_NAME); + + final int KEYS_PER_THREAD = 10_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure() { + @Override public void apply(Integer idx) { + int start = idx * KEYS_PER_THREAD; + int end = start + KEYS_PER_THREAD; + + int locked = 0; + + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0)) { + for (int i = start; i < end; i++) { + cache.get(i); + + locked++; + } + + tx.commit(); + } + } + catch (Exception e) { + info("Expected error: " + e); + } + + info("Done, locked: " + locked); + + if (retry) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 10 * 60_000, 0)) { + for (int i = start; i < end; i++) + cache.get(i); + + cache.put(start, 0); + + tx.commit(); + } + } + } + }, Math.min(4, Runtime.getRuntime().availableProcessors()), "tx-thread"); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + * + * @throws Exception If failed. + */ + public void testWaitingTxUnblockedOnTimeout() throws Exception { + waitingTxUnblockedOnTimeout(grid(0), grid(0)); + + waitingTxUnblockedOnTimeout(grid(0), grid(1)); + + Ignite client = startClient(); + + waitingTxUnblockedOnTimeout(grid(0), client); + + waitingTxUnblockedOnTimeout(grid(1), client); + + waitingTxUnblockedOnTimeout(client, grid(0)); + + waitingTxUnblockedOnTimeout(client, grid(1)); + + waitingTxUnblockedOnTimeout(client, client); + } + + /** + * Tests if timeout on first tx unblocks second tx waiting for the locked key. + * + * @throws Exception If failed. + */ + public void testWaitingTxUnblockedOnThreadDeath() throws Exception { + waitingTxUnblockedOnThreadDeath(grid(0), grid(0)); + + waitingTxUnblockedOnThreadDeath(grid(0), grid(1)); + + Ignite client = startClient(); + + waitingTxUnblockedOnThreadDeath(grid(0), client); + + waitingTxUnblockedOnThreadDeath(grid(1), client); + + waitingTxUnblockedOnThreadDeath(client, grid(0)); + + waitingTxUnblockedOnThreadDeath(client, grid(1)); + + waitingTxUnblockedOnThreadDeath(client, client); + } + + /** + * Tests if deadlock is resolved on timeout with correct message. + * + * @throws Exception If failed. + */ + public void testDeadlockUnblockedOnTimeout() throws Exception { + deadlockUnblockedOnTimeout(ignite(0), ignite(1)); + + deadlockUnblockedOnTimeout(ignite(0), ignite(0)); + + Ignite client = startClient(); + + deadlockUnblockedOnTimeout(ignite(0), client); + + deadlockUnblockedOnTimeout(client, ignite(0)); + } + + /** + * Tests if deadlock is resolved on timeout with correct message. + * + * @param node1 First node. + * @param node2 Second node. + * @throws Exception If failed. + */ + private void deadlockUnblockedOnTimeout(final Ignite node1, final Ignite node2) throws Exception { + info("Start test [node1=" + node1.name() + ", node2=" + node2.name() + ']'); + + final CountDownLatch l = new CountDownLatch(2); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 5000, 2)) { + node1.cache(CACHE_NAME).put(1, 10); + + l.countDown(); + + U.awaitQuiet(l); + + node1.cache(CACHE_NAME).put(2, 20); + + tx.commit(); + + fail(); + } + } + catch (CacheException e) { + // No-op. + validateDeadlockException(e); + } + } + }, "First"); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 2)) { + node2.cache(CACHE_NAME).put(2, 2); + + l.countDown(); + + U.awaitQuiet(l); + + node2.cache(CACHE_NAME).put(1, 1); + + tx.commit(); + } + } + }, "Second"); + + fut1.get(); + fut2.get(); + + assertTrue("Expecting committed key 2", node1.cache(CACHE_NAME).get(2) != null); + assertTrue("Expecting committed key 1", node1.cache(CACHE_NAME).get(1) != null); + + node1.cache(CACHE_NAME).removeAll(F.asSet(1, 2)); + } + + /** + * Tests timeout object cleanup on tx commit. + * + * @throws Exception If failed. + */ + public void testTimeoutRemoval() throws Exception { + IgniteEx client = (IgniteEx)startClient(); + + final long TX_TIMEOUT = 250; + + final int modesCnt = 5; + + for (int i = 0; i < modesCnt; i++) + testTimeoutRemoval0(grid(0), i, TX_TIMEOUT); + + for (int i = 0; i < modesCnt; i++) + testTimeoutRemoval0(client, i, TX_TIMEOUT); + + for (int i = 0; i < modesCnt; i++) + testTimeoutRemoval0(grid(0), i, TX_MIN_TIMEOUT); + + for (int i = 0; i < modesCnt; i++) + testTimeoutRemoval0(client, i, TX_MIN_TIMEOUT); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + // Repeat with more iterations to make sure everything is cleared. + for (int i = 0; i < 500; i++) + testTimeoutRemoval0(client, rnd.nextInt(modesCnt), TX_MIN_TIMEOUT); + } + + /** + * Tests timeouts in all tx configurations. + * + * @throws Exception If failed. + */ + public void testSimple() throws Exception { + for (TransactionConcurrency concurrency : TransactionConcurrency.values()) + for (TransactionIsolation isolation : TransactionIsolation.values()) { + for (int op = 0; op < 4; op++) + testSimple0(concurrency, isolation, op); + } + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param op Operation to test. + * @throws Exception If failed. + */ + private void testSimple0(TransactionConcurrency concurrency, TransactionIsolation isolation, int op) throws Exception { + Ignite near = grid(0); + + final int key = 1, val = 1; + + final long TX_TIMEOUT = 250; + + IgniteCache cache = near.cache(CACHE_NAME); + + try (Transaction tx = near.transactions().txStart(concurrency, isolation, TX_TIMEOUT, 1)) { + cache.put(key, val); + + U.sleep(TX_TIMEOUT * 2); + + try { + switch (op) { + case 0: + cache.put(key + 1, val); + + break; + + case 1: + cache.remove(key + 1); + + break; + + case 2: + cache.get(key + 1); + + break; + + case 3: + tx.commit(); + + break; + + default: + fail(); + } + + fail("Tx must timeout"); + } + catch (CacheException | IgniteException e) { + assertTrue("Expected exception: " + e, X.hasCause(e, TransactionTimeoutException.class)); + } + } + + assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key)); + assertFalse("Must be removed by rollback on timeout", near.cache(CACHE_NAME).containsKey(key + 1)); + + assertNull(near.transactions().tx()); + } + + /** + * @param near Node. + * @param mode Test mode. + * + * @param timeout Tx timeout. + * @throws Exception If failed. + */ + private void testTimeoutRemoval0(IgniteEx near, int mode, long timeout) throws Exception { + Throwable saved = null; + + try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 1)) { + near.cache(CACHE_NAME).put(1, 1); + + switch (mode) { + case 0: + tx.commit(); + break; + + case 1: + tx.commitAsync().get(); + break; + + case 2: + tx.rollback(); + break; + + case 3: + tx.rollbackAsync().get(); + break; + + case 4: + break; + + default: + fail(); + } + } + catch (Throwable t) { + saved = t; + } + + Collection set = U.field(near.context().cache().context().time(), "timeoutObjs"); + + for (Object obj : set) { + if (obj.getClass().isAssignableFrom(GridNearTxLocal.class)) { + log.error("Last saved exception: " + saved, saved); + + fail("Not removed [mode=" + mode + ", timeout=" + timeout + ", tx=" + obj +']'); + } + } + } + + /** + * @param near Node starting tx which is timed out. + * @param other Node starting second tx. + * @throws Exception If failed. + */ + private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other) throws Exception { + waitingTxUnblockedOnTimeout(near, other, 1000); + + waitingTxUnblockedOnTimeout(near, other, 50); + } + + /** + * @param near Node starting tx which is timed out. + * @param other Node starting second tx. + * @param timeout Timeout. + * @throws Exception If failed. + */ + private void waitingTxUnblockedOnTimeout(final Ignite near, final Ignite other, final long timeout) throws Exception { + info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']'); + + final CountDownLatch blocked = new CountDownLatch(1); + + final CountDownLatch unblocked = new CountDownLatch(1); + + final int recordsCnt = 5; + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, 0)) { + try { + for (int i = 0; i < recordsCnt; i++) + near.cache(CACHE_NAME).put(i, i); + + info("Locked all keys."); + } + catch (CacheException e) { + info("Failed to lock keys: " + e); + } + finally { + blocked.countDown(); + } + + // Will be unblocked after tx timeout occurs. + U.awaitQuiet(unblocked); + + try { + near.cache(CACHE_NAME).put(0, 0); + + fail(); + } + catch (CacheException e) { + log.info("Expecting error: " + e.getMessage()); + } + + try { + tx.commit(); + + fail(); + } + catch (IgniteException e) { + log.info("Expecting error: " + e.getMessage()); + } + } + + // Check thread is able to start new tx. + try (Transaction tx = near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 0)) { + for (int i = 0; i < recordsCnt; i++) + near.cache(CACHE_NAME).put(i, i); + + tx.commit(); + } + } + }, "First"); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + U.awaitQuiet(blocked); + + try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, 1)) { + for (int i = 0; i < recordsCnt; i++) + other.cache(CACHE_NAME).put(i, i); + + // Will wait until timeout on first tx will unblock put. + tx.commit(); + } + } + }, "Second"); + + fut2.get(); + + unblocked.countDown(); + + fut1.get(); + } + + /** + * @param near Node starting tx which is timed out. + * @param other Node starting second tx. + * @throws Exception If failed. + */ + private void waitingTxUnblockedOnThreadDeath(final Ignite near, final Ignite other) throws Exception { + waitingTxUnblockedOnThreadDeath0(near, other, 10, 1000); // Try provoke timeout after all keys are locked. + + waitingTxUnblockedOnThreadDeath0(near, other, 1000, 100); // Try provoke timeout while trying to lock keys. + } + + /** + * @param near Node starting tx which is timed out. + * @param other Node starting second tx. + * @param recordsCnt Number of records to locks. + * @param timeout Transaction timeout. + * @throws Exception If failed. + */ + private void waitingTxUnblockedOnThreadDeath0(final Ignite near, + final Ignite other, + final int recordsCnt, + final long timeout) + throws Exception + { + info("Start test [node1=" + near.name() + ", node2=" + other.name() + ']'); + + final CountDownLatch blocked = new CountDownLatch(1); + + IgniteInternalFuture fut1 = multithreadedAsync(new Runnable() { + @Override public void run() { + near.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, timeout, recordsCnt); + + try { + for (int i = 0; i < recordsCnt; i++) + near.cache(CACHE_NAME).put(i, i); + + log.info("Locked all records."); + } + catch (Exception e) { + log.info("Failed to locked all records: " + e); + } + finally { + blocked.countDown(); + } + + throw new IgniteException("Failure"); + } + }, 1, "First"); + + IgniteInternalFuture fut2 = multithreadedAsync(new Runnable() { + @Override public void run() { + U.awaitQuiet(blocked); + + try (Transaction tx = other.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 0, recordsCnt)) { + for (int i = 0; i < recordsCnt; i++) + other.cache(CACHE_NAME).put(i, i); + + // Will wait until timeout on first tx will unblock put. + tx.commit(); + } + } + }, 1, "Second"); + + try { + fut1.get(); + + fail(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + fut2.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java index 164ff6a..b380ebc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePart import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledMultiNodeWithGroupFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledOnheapMultiNodeFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeout2FullApiTest; import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedMultiNodeLongTxTimeoutFullApiTest; import org.apache.ignite.internal.processors.cache.distributed.near.CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicClientOnlyMultiNodeFullApiSelfTest; @@ -123,6 +124,7 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicMultiNodeP2PDisabledFullApiSelfTest.class); suite.addTestSuite(GridCacheAtomicNearEnabledMultiNodeFullApiSelfTest.class); suite.addTestSuite(CachePartitionedMultiNodeLongTxTimeoutFullApiTest.class); + suite.addTestSuite(CachePartitionedMultiNodeLongTxTimeout2FullApiTest.class); suite.addTestSuite(CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.class); suite.addTestSuite(GridCachePartitionedNearDisabledMultiNodeFullApiSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index a0fb19b..7c71381 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -21,9 +21,14 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest; +import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest; +import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; +import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; + /** * Test suite. */ @@ -44,6 +49,11 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(CacheExchangeMergeTest.class); + suite.addTestSuite(TxRollbackOnTimeoutTest.class); + suite.addTestSuite(TxRollbackOnTimeoutNoDeadlockDetectionTest.class); + suite.addTestSuite(TxRollbackOnTimeoutNearCacheTest.class); + suite.addTestSuite(IgniteCacheThreadLocalTxTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5af30cf1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java index e85baed..f404732 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java @@ -25,11 +25,11 @@ import org.apache.ignite.internal.util.typedef.internal.CU; /** * Test checks whether hadoop system cache doesn't use user defined TX config. */ -public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest { +public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest { /** * Success if system caches weren't timed out. * - * @throws Exception + * @throws Exception If failed. */ public void testSystemCacheTx() throws Exception { final Ignite ignite = grid(0);