Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7FCC617547 for ; Wed, 29 Apr 2015 15:04:57 +0000 (UTC) Received: (qmail 52498 invoked by uid 500); 29 Apr 2015 15:04:57 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 52467 invoked by uid 500); 29 Apr 2015 15:04:57 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 52458 invoked by uid 99); 29 Apr 2015 15:04:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 15:04:57 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of root@apache.org designates 54.164.171.186 as permitted sender) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 15:04:51 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id BA81445486 for ; Wed, 29 Apr 2015 15:03:20 +0000 (UTC) Received: (qmail 40553 invoked by uid 99); 29 Apr 2015 15:03:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 15:03:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 15D98E03C7; Wed, 29 Apr 2015 15:03:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 29 Apr 2015 15:04:05 -0000 Message-Id: In-Reply-To: <901cf6cff2dd4bbc96ba32601982ee9f@git.apache.org> References: <901cf6cff2dd4bbc96ba32601982ee9f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] incubator-ignite git commit: # ignite-157-2 Tests and fix for tx recovery issue X-Virus-Checked: Checked by ClamAV on apache.org # ignite-157-2 Tests and fix for tx recovery issue Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5daaa278 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5daaa278 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5daaa278 Branch: refs/heads/ignite-157-2 Commit: 5daaa278afcca7e00be5002e3d5247661c6ba474 Parents: 4b775f0 Author: sboikov Authored: Wed Apr 29 12:48:27 2015 +0300 Committer: sboikov Committed: Wed Apr 29 17:13:48 2015 +0300 ---------------------------------------------------------------------- ...ridCacheOptimisticCheckPreparedTxFuture.java | 78 +++- ...idCacheOptimisticCheckPreparedTxRequest.java | 47 +- .../GridDistributedTxRemoteAdapter.java | 2 +- .../cache/transactions/IgniteInternalTx.java | 5 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxHandler.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- .../cache/transactions/IgniteTxManager.java | 47 +- ...xOriginatingNodeFailureAbstractSelfTest.java | 2 +- ...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 ++ ...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 ++ ...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 ++ ...ePrimaryNodeFailureRecoveryAbstractTest.java | 454 +++++++++++++++++++ .../IgniteCacheTxRecoverySelfTestSuite.java | 4 + 14 files changed, 713 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java index 8a14b48..3e345f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java @@ -70,6 +70,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound /** Transaction nodes mapping. */ private final Map> txNodes; + /** */ + private final boolean nearTxCheck; + /** * @param cctx Context. * @param tx Transaction. @@ -77,8 +80,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound * @param txNodes Transaction mapping. */ @SuppressWarnings("ConstantConditions") - public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext cctx, IgniteInternalTx tx, - UUID failedNodeId, Map> txNodes) { + public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext cctx, + IgniteInternalTx tx, + UUID failedNodeId, + Map> txNodes) + { super(cctx.kernalContext(), CU.boolReducer()); this.cctx = cctx; @@ -114,6 +120,10 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound } } } + + UUID nearNodeId = tx.eventNodeId(); + + nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId); } /** @@ -121,6 +131,48 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound */ @SuppressWarnings("ConstantConditions") public void prepare() { + if (nearTxCheck) { + UUID nearNodeId = tx.eventNodeId(); + + if (cctx.localNodeId().equals(nearNodeId)) { + IgniteInternalFuture fut = cctx.tm().txCommitted(tx.nearXidVersion()); + + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else { + MiniFuture fut = new MiniFuture(tx.eventNodeId()); + + add(fut); + + GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest( + tx, + 0, + true, + futureId(), + fut.futureId()); + + try { + cctx.io().send(nearNodeId, req, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + + markInitialized(); + } + + return; + } + // First check transactions on local node. int locTxNum = nodeTransactions(cctx.localNodeId()); @@ -206,6 +258,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx, nodeTransactions(id), + false, futureId(), fut.futureId()); @@ -228,7 +281,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound add(fut); GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest( - tx, nodeTransactions(nodeId), futureId(), fut.futureId()); + tx, + nodeTransactions(nodeId), + false, + futureId(), + fut.futureId()); try { cctx.io().send(nodeId, req, tx.ioPolicy()); @@ -341,11 +398,18 @@ public class GridCacheOptimisticCheckPreparedTxFuture extends GridCompound cctx.tm().finishOptimisticTxOnRecovery(tx, res); } else { - if (log.isDebugEnabled()) - log.debug("Failed to check prepared transactions, " + - "invalidating transaction [err=" + err + ", tx=" + tx + ']'); + if (nearTxCheck) { + if (log.isDebugEnabled()) + log.debug("Failed to check transaction on near node, " + + "ignoring [err=" + err + ", tx=" + tx + ']'); + } + else { + if (log.isDebugEnabled()) + log.debug("Failed to check prepared transactions, " + + "invalidating transaction [err=" + err + ", tx=" + tx + ']'); - cctx.tm().salvageTx(tx); + cctx.tm().salvageTx(tx); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java index e83db66..4f2a1d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java @@ -27,8 +27,7 @@ import java.io.*; import java.nio.*; /** - * Message sent to check that transactions related to some optimistic transaction - * were prepared on remote node. + * Message sent to check that transactions related to transaction were prepared on remote node. */ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage { /** */ @@ -49,6 +48,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa /** System transaction flag. */ private boolean sys; + /** {@code True} if should check only tx on near node. */ + private boolean nearTxCheck; + /** * Empty constructor required by {@link Externalizable} */ @@ -59,11 +61,16 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa /** * @param tx Transaction. * @param txNum Expected number of transactions on remote node. + * @param nearTxCheck * @param futId Future ID. * @param miniId Mini future ID. */ - public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId, - IgniteUuid miniId) { + public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, + int txNum, + boolean nearTxCheck, + IgniteUuid futId, + IgniteUuid miniId) + { super(tx.xidVersion(), 0); nearXidVer = tx.nearXidVersion(); @@ -72,6 +79,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa this.futId = futId; this.miniId = miniId; this.txNum = txNum; + this.nearTxCheck = nearTxCheck; + } + + /** + * @return {@code True} if should check only tx on near node. + */ + public boolean nearTxCheck() { + return nearTxCheck; } /** @@ -137,18 +152,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa writer.incrementState(); case 10: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeBoolean("nearTxCheck", nearTxCheck)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("sys", sys)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 12: + if (!writer.writeBoolean("sys", sys)) + return false; + + writer.incrementState(); + + case 13: if (!writer.writeInt("txNum", txNum)) return false; @@ -187,7 +208,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa reader.incrementState(); case 10: - nearXidVer = reader.readMessage("nearXidVer"); + nearTxCheck = reader.readBoolean("nearTxCheck"); if (!reader.isLastRead()) return false; @@ -195,7 +216,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa reader.incrementState(); case 11: - sys = reader.readBoolean("sys"); + nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) return false; @@ -203,6 +224,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa reader.incrementState(); case 12: + sys = reader.readBoolean("sys"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: txNum = reader.readInt("txNum"); if (!reader.isLastRead()) @@ -222,7 +251,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 5c75390..3215138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -206,7 +206,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public GridTuple peek(GridCacheContext cacheCtx, + @Override public GridTuple peek(GridCacheContext cacheCtx, boolean failFast, KeyCacheObject key, CacheEntryPredicate[] filter) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 30367c5..8dc07cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -338,8 +338,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * Gets node ID which directly started this transaction. In case of DHT local transaction it will be - * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote - * transaction it will be starter node ID. + * near node ID, in case of DHT remote transaction it will be primary node ID. * * @return Originating node ID. */ @@ -507,7 +506,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { * @return Current value for the key within transaction. * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}. */ - @Nullable public GridTuple peek( + @Nullable public GridTuple peek( GridCacheContext ctx, boolean failFast, KeyCacheObject key, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 1c02356..82d68b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1964,7 +1964,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Nullable @Override public GridTuple peek(GridCacheContext ctx, + @Nullable @Override public GridTuple peek(GridCacheContext ctx, boolean failFast, KeyCacheObject key, @Nullable CacheEntryPredicate[] filter) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 6b45fee..6843075 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1183,7 +1183,8 @@ public class IgniteTxHandler { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); - IgniteInternalFuture fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); + IgniteInternalFuture fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) : + ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); if (fut == null || fut.isDone()) { boolean prepared; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index dfce09c..fc3efba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -330,7 +330,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable @Override public GridTuple peek( + @Nullable @Override public GridTuple peek( GridCacheContext cacheCtx, boolean failFast, KeyCacheObject key, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c494602..19efc5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -727,14 +727,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param txId Transaction ID. - * @return Transaction with given ID. - */ - @Nullable public IgniteInternalTx txx(GridCacheVersion txId) { - return idMap.get(txId); - } - - /** * Handles prepare stage of 2PC. * * @param tx Transaction to prepare. @@ -1770,6 +1762,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param ver Version. + * @return Future for flag indicating if transactions was committed. + */ + public IgniteInternalFuture txCommitted(GridCacheVersion ver) { + final GridFutureAdapter resFut = new GridFutureAdapter<>(); + + final IgniteInternalTx tx = cctx.tm().tx(ver); + + if (tx != null) { + assert tx.near() && tx.local() : tx; + + if (log.isDebugEnabled()) + log.debug("Found near transaction, will wait for completion: " + tx); + + tx.finishFuture().listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + TransactionState state = tx.state(); + + if (log.isDebugEnabled()) + log.debug("Near transaction finished with state: " + state); + + resFut.onDone(state == COMMITTED); + } + }); + + return resFut; + } + + Boolean committed = completedVers.get(ver); + + if (log.isDebugEnabled()) + log.debug("Near transaction committed: " + committed); + + resFut.onDone(committed != null && committed); + + return resFut; + } + + /** * @param nearVer Near version ID. * @param txNum Number of transactions. * @param fut Result future. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index 00bd43f..d664aa8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -156,7 +156,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart(); - IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx"); + IgniteInternalTx txEx = tx.tx(); assertTrue(txEx.optimistic()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java new file mode 100644 index 0000000..62d9b79 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.configuration.*; + +/** + * + */ +public class IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest + extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java new file mode 100644 index 0000000..a40c989 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.configuration.*; + +/** + * + */ +public class IgniteCachePartitionedPrimaryNodeFailureRecoveryTest + extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java new file mode 100644 index 0000000..70eef1d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.configuration.*; + +/** + * + */ +public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest + extends IgniteCachePartitionedPrimaryNodeFailureRecoveryTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + assertEquals(1, ccfg.getBackups()); + + ccfg.setBackups(2); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java new file mode 100644 index 0000000..7a393d8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; +import static org.apache.ignite.transactions.TransactionState.*; + +/** + * + */ +public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends IgniteCacheAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + // No-op + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception { + primaryNodeFailure(false, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception { + primaryNodeFailure(true, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryNodeFailureRollback1() throws Exception { + primaryNodeFailure(false, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryNodeFailureRollback2() throws Exception { + primaryNodeFailure(true, true, true); + } + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryNodeFailureRecovery1() throws Exception { + primaryNodeFailure(false, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryNodeFailureRecovery2() throws Exception { + primaryNodeFailure(true, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryNodeFailureRollback1() throws Exception { + primaryNodeFailure(false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryNodeFailureRollback2() throws Exception { + primaryNodeFailure(true, true, false); + } + + /** + * @param locBackupKey If {@code true} uses recovery for local backup key. + * @param rollback If {@code true} tests rollback after primary node failure. + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void primaryNodeFailure(boolean locBackupKey, final boolean rollback, boolean optimistic) throws Exception { + IgniteCache cache0 = jcache(0); + IgniteCache cache2 = jcache(2); + + Affinity aff = ignite(0).affinity(null); + + Integer key0 = null; + + for (int key = 0; key < 10_000; key++) { + if (aff.isPrimary(ignite(1).cluster().localNode(), key)) { + if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) { + key0 = key; + + break; + } + } + } + + assertNotNull(key0); + + final Integer key1 = key0; + final Integer key2 = primaryKey(cache2); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); + + IgniteTransactions txs = ignite(0).transactions(); + + try (Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ)) { + log.info("Put key1: " + key1); + + cache0.put(key1, key1); + + log.info("Put key2: " + key2); + + cache0.put(key2, key2); + + log.info("Start prepare."); + + IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx(); + + commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. + + IgniteInternalFuture prepFut = txEx.prepareAsync(); + + waitPrepared(ignite(1)); + + log.info("Stop one primary node."); + + stopGrid(1); + + U.sleep(1000); // Wait some time to catch possible issues in tx recovery. + + commSpi.stopBlock(); + + prepFut.get(10_000); + + if (rollback) { + log.info("Rollback."); + + tx.rollback(); + } + else { + log.info("Commit."); + + tx.commit(); + } + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkKey(key1, rollback); + checkKey(key2, rollback); + + return true; + } + catch (AssertionError e) { + log.info("Check failed: " + e); + + return false; + } + } + }, 5000); + + checkKey(key1, rollback); + checkKey(key2, rollback); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception { + primaryAndOriginatingNodeFailure(false, true); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception { + primaryAndOriginatingNodeFailure(true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception { + primaryAndOriginatingNodeFailure(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception { + primaryAndOriginatingNodeFailure(true, false); + } + + /** + * @param rollback If {@code true} tests rollback after primary node failure. + * @param optimistic If {@code true} tests optimistic transaction. + * @throws Exception If failed. + */ + private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception { + IgniteCache cache0 = jcache(0); + IgniteCache cache1 = jcache(1); + IgniteCache cache2 = jcache(2); + + final Integer key1 = primaryKey(cache1); + final Integer key2 = primaryKey(cache2); + + TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi(); + + IgniteTransactions txs = ignite(0).transactions(); + + Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ); + + log.info("Put key1: " + key1); + + cache0.put(key1, key1); + + log.info("Put key2: " + key2); + + cache0.put(key2, key2); + + log.info("Start prepare."); + + IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx(); + + commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2. + + IgniteInternalFuture prepFut = txEx.prepareAsync(); + + waitPrepared(ignite(1)); + + log.info("Stop one primary node."); + + stopGrid(1); + + U.sleep(1000); // Wait some time to catch possible issues in tx recovery. + + if (!rollback) { + commSpi.stopBlock(); + + prepFut.get(10_000); + } + + log.info("Stop originating node."); + + stopGrid(0); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkKey(key1, rollback); + checkKey(key2, rollback); + + return true; + } + catch (AssertionError e) { + log.info("Check failed: " + e); + + return false; + } + } + }, 5000); + + checkKey(key1, rollback); + checkKey(key2, rollback); + } + + /** + * @param key Key. + * @param expNull {@code True} if {@code null} value is expected. + */ + private void checkKey(Integer key, boolean expNull) { + Affinity aff = ignite(2).affinity(null); + + Collection nodes = aff.mapKeyToPrimaryAndBackups(key); + + assertFalse(nodes.isEmpty()); + + for (ClusterNode node : nodes) { + Ignite ignite = grid(node); + + IgniteCache cache = ignite.cache(null); + + if (expNull) + assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key)); + else + assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key)); + } + } + + /** + * @param ignite Node. + * @throws Exception If failed. + */ + private void waitPrepared(Ignite ignite) throws Exception { + final IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm(); + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + GridDhtTxLocal locTx = null; + + for (IgniteInternalTx tx : tm.txs()) { + if (tx instanceof GridDhtTxLocal) { + assertNull("Only one tx is expected.", locTx); + + locTx = (GridDhtTxLocal)tx; + } + } + + log.info("Wait for tx, state: " + (locTx != null ? locTx.state() : null)); + + return locTx != null && locTx.state() == PREPARED; + } + }, 5000); + + assertTrue("Failed to wait for tx.", wait); + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Logger. */ + @LoggerResource + private IgniteLogger log; + + /** */ + private UUID blockNodeId; + + /** */ + private List> blockedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + if (msg0 instanceof GridNearTxPrepareRequest) { + synchronized (this) { + if (blockNodeId != null && blockNodeId.equals(node.id())) { + log.info("Block message: " + msg0); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param nodeId Node ID. + */ + void blockMessages(UUID nodeId) { + blockNodeId = nodeId; + } + + /** + * + */ + void stopBlock() { + synchronized (this) { + blockNodeId = null; + + for (T2 msg : blockedMsgs) { + log.info("Send blocked message: " + msg.get2().message()); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java index e832099..1bd0e5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java @@ -33,6 +33,10 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Cache tx recovery test suite"); + suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class); + suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class); + suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class); + suite.addTestSuite(GridCachePartitionedTxOriginatingNodeFailureSelfTest.class); suite.addTestSuite(GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.class); suite.addTestSuite(GridCacheReplicatedTxOriginatingNodeFailureSelfTest.class);