Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8ED6710520 for ; Tue, 28 Apr 2015 13:54:28 +0000 (UTC) Received: (qmail 67912 invoked by uid 500); 28 Apr 2015 13:54:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 67884 invoked by uid 500); 28 Apr 2015 13:54:28 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 67875 invoked by uid 99); 28 Apr 2015 13:54:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 13:54:28 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 13:54:23 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 722F34541D for ; Tue, 28 Apr 2015 13:54:01 +0000 (UTC) Received: (qmail 66350 invoked by uid 99); 28 Apr 2015 13:54:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 13:54:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E82A1E0772; Tue, 28 Apr 2015 13:54:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 28 Apr 2015 13:54:06 -0000 Message-Id: In-Reply-To: <229644b863b24dd096774e83f976d9d3@git.apache.org> References: <229644b863b24dd096774e83f976d9d3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/17] incubator-ignite git commit: # ignite-157 wait for 'preparing' transactions in 'processCheckPreparedTxRequest' X-Virus-Checked: Checked by ClamAV on apache.org # ignite-157 wait for 'preparing' transactions in 'processCheckPreparedTxRequest' Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7b5f91a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7b5f91a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7b5f91a2 Branch: refs/heads/ignite-sprint-4 Commit: 7b5f91a2d7723a7c4f4c87bbdc8b2b5772253651 Parents: 62d8053 Author: sboikov Authored: Fri Apr 24 15:08:04 2015 +0300 Committer: sboikov Committed: Fri Apr 24 15:43:52 2015 +0300 ---------------------------------------------------------------------- .../cache/transactions/IgniteTxManager.java | 22 +++-- ...xOriginatingNodeFailureAbstractSelfTest.java | 91 +++++++++++++++++++- ...itionedTxOriginatingNodeFailureSelfTest.java | 86 ++++++++++++++++-- 3 files changed, 184 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d139afd..e4cf28b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -2054,7 +2054,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() + ", failedNodeId=" + evtNodeId + ']'); - for (IgniteInternalTx tx : txs()) { + for (final IgniteInternalTx tx : txs()) { if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. salvageTx(tx, false, RECOVERY_FINISH); @@ -2062,12 +2062,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { else { // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. if (tx.originatingNodeId().equals(evtNodeId)) { - if (tx.state() == PREPARED) + if (tx.optimistic() && tx.state() == PREPARED) commitIfPrepared(tx); else { - if (tx.setRollbackOnly()) - tx.rollbackAsync(); - // If we could not mark tx as rollback, it means that transaction is being committed. + IgniteInternalFuture prepFut = tx.currentPrepareFuture(); + + if (prepFut != null) { + prepFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + if (tx.setRollbackOnly()) + tx.rollbackAsync(); + } + }); + } + else { + // If we could not mark tx as rollback, it means that transaction is being committed. + if (tx.setRollbackOnly()) + tx.rollbackAsync(); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index ea8c60b..865f1f6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -37,7 +37,9 @@ import org.apache.ignite.testframework.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.cache.CachePeekMode.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * Abstract test for originating node failure. @@ -67,6 +69,18 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri /** * @throws Exception If failed. */ + public void testPessimisticManyKeysCommit() throws Exception { + Collection keys = new ArrayList<>(200); + + for (int i = 0; i < 200; i++) + keys.add(i); + + testPessimisticTxOriginatingNodeFails(keys); + } + + /** + * @throws Exception If failed. + */ public void testManyKeysRollback() throws Exception { Collection keys = new ArrayList<>(200); @@ -103,6 +117,75 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri /** * @param keys Keys to update. + * @throws Exception If failed. + */ + protected void testPessimisticTxOriginatingNodeFails(Collection keys) throws Exception { + final Map map = new HashMap<>(); + + final String initVal = "initialValue"; + + for (Integer key : keys) { + grid(originatingNode()).cache(null).put(key, initVal); + + map.put(key, String.valueOf(key)); + } + + ClusterNode txNode = grid(originatingNode()).localNode(); + + final Ignite txIgniteNode = G.ignite(txNode.id()); + + info("Starting pessimistic tx " + + "[values=" + map + ", topVer=" + (grid(1)).context().discovery().topologyVersion() + ']'); + + GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + IgniteCache cache = txIgniteNode.cache(null); + + assertNotNull(cache); + + TransactionProxyImpl tx = + (TransactionProxyImpl)txIgniteNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx"); + + assertTrue(txEx.pessimistic()); + + cache.putAll(map); + + return null; + } + }).get(); + + info("Stopping originating node " + txNode); + + G.stop(G.ignite(txNode.id()).name(), true); + + info("Stopped grid, waiting for transactions to complete."); + + boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Ignite ignite : G.allGrids()) { + IgniteKernal g = (IgniteKernal)ignite; + + GridCacheSharedContext ctx = g.context().cache().context(); + + int txNum = ctx.tm().idMapSize(); + + if (txNum != 0) + return false; + } + + return true; + } + }, 10_000); + + assertTrue(txFinished); + + info("Transactions finished."); + } + + /** + * @param keys Keys to update. * @param partial Flag indicating whether to simulate partial prepared state. * @throws Exception If failed. */ @@ -140,8 +223,8 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri nodeMap.put(key, nodes); } - info("Starting tx [values=" + map + ", topVer=" + - (grid(1)).context().discovery().topologyVersion() + ']'); + info("Starting optimistic tx " + + "[values=" + map + ", topVer=" + (grid(1)).context().discovery().topologyVersion() + ']'); if (partial) ignoreMessages(grid(1).localNode().id(), ignoreMessageClass()); @@ -158,6 +241,8 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx"); + assertTrue(txEx.optimistic()); + cache.putAll(map); try { @@ -214,7 +299,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri assertNotNull(cache); - assertEquals(partial ? initVal : val, cache.localPeek(key, CachePeekMode.ONHEAP)); + assertEquals(partial ? initVal : val, cache.localPeek(key, ONHEAP)); return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java index d4e84a5..fc1c403 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java @@ -20,13 +20,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import java.util.*; +import static org.apache.ignite.cache.CacheMode.*; + /** * Tests transaction consistency when originating node fails. */ @@ -37,7 +37,7 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends /** {@inheritDoc} */ @Override protected CacheMode cacheMode() { - return CacheMode.PARTITIONED; + return PARTITIONED; } /** {@inheritDoc} */ @@ -58,6 +58,21 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends * @throws Exception If failed. */ public void testTxFromPrimary() throws Exception { + txFromPrimary(true); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxFromPrimary() throws Exception { + txFromPrimary(false); + } + + /** + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void txFromPrimary(boolean optimistic) throws Exception { ClusterNode txNode = grid(originatingNode()).localNode(); Integer key = null; @@ -72,13 +87,31 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends assertNotNull(key); - testTxOriginatingNodeFails(Collections.singleton(key), false); + if (optimistic) + testTxOriginatingNodeFails(Collections.singleton(key), false); + else + testPessimisticTxOriginatingNodeFails(Collections.singleton(key)); } /** * @throws Exception If failed. */ public void testTxFromBackup() throws Exception { + txFromBackup(true); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxFromBackup() throws Exception { + txFromBackup(false); + } + + /** + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void txFromBackup(boolean optimistic) throws Exception { ClusterNode txNode = grid(originatingNode()).localNode(); Integer key = null; @@ -93,13 +126,31 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends assertNotNull(key); - testTxOriginatingNodeFails(Collections.singleton(key), false); + if (optimistic) + testTxOriginatingNodeFails(Collections.singleton(key), false); + else + testPessimisticTxOriginatingNodeFails(Collections.singleton(key)); } /** * @throws Exception If failed. */ public void testTxFromNotColocated() throws Exception { + txFromNotColocated(true); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxFromNotColocated() throws Exception { + txFromNotColocated(false); + } + + /** + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void txFromNotColocated(boolean optimistic) throws Exception { ClusterNode txNode = grid(originatingNode()).localNode(); Integer key = null; @@ -115,13 +166,31 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends assertNotNull(key); - testTxOriginatingNodeFails(Collections.singleton(key), false); + if (optimistic) + testTxOriginatingNodeFails(Collections.singleton(key), false); + else + testPessimisticTxOriginatingNodeFails(Collections.singleton(key)); } /** * @throws Exception If failed. */ public void testTxAllNodes() throws Exception { + txAllNodes(true); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxAllNodes() throws Exception { + txAllNodes(false); + } + + /** + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void txAllNodes(boolean optimistic) throws Exception { List allNodes = new ArrayList<>(GRID_CNT); for (int i = 0; i < GRID_CNT; i++) @@ -145,6 +214,9 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends assertEquals(GRID_CNT, keys.size()); - testTxOriginatingNodeFails(keys, false); + if (optimistic) + testTxOriginatingNodeFails(keys, false); + else + testPessimisticTxOriginatingNodeFails(keys); } }