Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 74B0E200CD8 for ; Wed, 2 Aug 2017 12:38:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 730671692FE; Wed, 2 Aug 2017 10:38:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5411E1692FA for ; Wed, 2 Aug 2017 12:38:22 +0200 (CEST) Received: (qmail 40274 invoked by uid 500); 2 Aug 2017 10:38:21 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 40143 invoked by uid 99); 2 Aug 2017 10:38:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 10:38:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A295AF5531; Wed, 2 Aug 2017 10:38:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 02 Aug 2017 10:38:28 -0000 Message-Id: <35a418b5eb854d568cc3d9965e521816@git.apache.org> In-Reply-To: <51ffcada024346458c241b314c8a74cd@git.apache.org> References: <51ffcada024346458c241b314c8a74cd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/17] ignite git commit: ignite-5712 Context switching for optimistic transactions archived-at: Wed, 02 Aug 2017 10:38:24 -0000 ignite-5712 Context switching for optimistic transactions Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20d9ad5a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20d9ad5a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20d9ad5a Branch: refs/heads/ignite-5578-debug Commit: 20d9ad5a4f66e6e38af087cc02dc9f52b94237ec Parents: c6fbe2d Author: Nikolay Izhikov Authored: Wed Aug 2 11:52:44 2017 +0300 Committer: sboikov Committed: Wed Aug 2 11:52:44 2017 +0300 ---------------------------------------------------------------------- .../ignite/tests/utils/TestTransaction.java | 10 + .../cache/GridCacheSharedContext.java | 24 + .../cache/distributed/near/GridNearTxLocal.java | 51 ++ .../store/GridCacheStoreManagerAdapter.java | 10 + .../cache/transactions/IgniteTxAdapter.java | 20 +- .../cache/transactions/IgniteTxManager.java | 74 ++ .../cache/transactions/IgniteTxMap.java | 2 +- .../transactions/TransactionProxyImpl.java | 46 +- .../apache/ignite/transactions/Transaction.java | 14 + .../ignite/transactions/TransactionState.java | 7 +- ...ptimisticTxSuspendResumeMultiServerTest.java | 30 + .../IgniteOptimisticTxSuspendResumeTest.java | 751 +++++++++++++++++++ .../IgnitePessimisticTxSuspendResumeTest.java | 91 +++ .../ignite/testframework/GridTestUtils.java | 26 + .../cache/GridAbstractCacheStoreSelfTest.java | 10 + .../testsuites/IgniteCacheTestSuite6.java | 7 + .../processors/cache/jta/CacheJtaManager.java | 5 +- .../processors/cache/jta/CacheJtaResource.java | 28 +- .../GridJtaTransactionManagerSelfTest.java | 208 +++++ .../ignite/testsuites/IgniteJtaTestSuite.java | 3 + 20 files changed, 1402 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java index 4a03d25..e587bd7 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java @@ -140,4 +140,14 @@ public class TestTransaction implements Transaction { @Override public IgniteFuture rollbackAsync() throws IgniteException { return null; } + + /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException{ + // No-op. + } + + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 1876023..82d960a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -944,6 +944,30 @@ public class GridCacheSharedContext { } /** + * Suspends transaction. It could be resume later. Supported only for optimistic transactions. + * + * @param tx Transaction to suspend. + * @throws IgniteCheckedException If suspension failed. + */ + public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException { + tx.txState().awaitLastFuture(this); + + tx.suspend(); + } + + /** + * Resume transaction if it was previously suspended. + * + * @param tx Transaction to resume. + * @throws IgniteCheckedException If resume failed. + */ + public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { + tx.txState().awaitLastFuture(this); + + tx.resume(); + } + + /** * @return Store session listeners. */ @Nullable public Collection storeSessionListeners() { http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 58ecee9..55d6bdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -105,12 +105,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; +import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** @@ -2851,6 +2853,47 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** + * Suspends transaction. It could be resumed later. Supported only for optimistic transactions. + * + * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. + */ + public void suspend() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Suspend near local tx: " + this); + + if (pessimistic()) + throw new UnsupportedOperationException("Suspension is not supported for pessimistic transactions."); + + if (threadId() != Thread.currentThread().getId()) + throw new IgniteCheckedException("Only thread started transaction can suspend it."); + + synchronized (this) { + checkValid(); + + cctx.tm().suspendTx(this); + } + } + + /** + * Resumes transaction (possibly in another thread) if it was previously suspended. + * + * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. + */ + public void resume() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Resume near local tx: " + this); + + if (pessimistic()) + throw new UnsupportedOperationException("Resume is not supported for pessimistic transactions."); + + synchronized (this) { + checkValid(); + + cctx.tm().resumeTx(this); + } + } + + /** * @param maps Mappings. */ void addEntryMapping(@Nullable Collection maps) { @@ -3956,6 +3999,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea } /** + * @param threadId new owner of transaction. + * @throws IgniteCheckedException if method executed not in the middle of resume or suspend. + */ + public void threadId(long threadId) { + this.threadId = threadId; + } + + /** * Post-lock closure. * * @param Return type. http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 9cba3dd..83f07fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -1362,6 +1362,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public long timeout() { return tx.timeout(); } @@ -1407,6 +1412,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public IgniteAsyncSupport withAsync() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 91ce3ce..61ca78c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -97,6 +97,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; /** * Managed transaction adapter. @@ -977,10 +978,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement switch (state) { case ACTIVE: { - valid = false; + valid = prev == SUSPENDED; break; - } // Active is initial state and cannot be transitioned to. + } case PREPARING: { valid = prev == ACTIVE; @@ -1025,15 +1026,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } case MARKED_ROLLBACK: { - valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED; + valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == SUSPENDED; break; } case ROLLING_BACK: { - valid = - prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING || - prev == PREPARED || (prev == COMMITTING && local() && !dht()); + valid = prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING || + prev == PREPARED || prev == SUSPENDED || (prev == COMMITTING && local() && !dht()); + + break; + } + + case SUSPENDED: { + valid = prev == ACTIVE; break; } @@ -1064,7 +1070,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (valid) { // Seal transactions maps. - if (state != ACTIVE) + if (state != ACTIVE && state != SUSPENDED) seal(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 82692ae..20d306c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -113,6 +113,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; @@ -2241,6 +2242,79 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Suspends transaction. + * Should not be used directly. Use tx.suspend() instead. + * + * @param tx Transaction to be suspended. + * + * @see #resumeTx(GridNearTxLocal) + * @see GridNearTxLocal#suspend() + * @see GridNearTxLocal#resume() + */ + public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException { + assert tx != null && !tx.system() : tx; + + if (!tx.state(SUSPENDED)) { + throw new IgniteCheckedException("Trying to suspend transaction with incorrect state " + + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']'); + } + + clearThreadMap(tx); + + transactionMap(tx).remove(tx.xidVersion(), tx); + } + + /** + * Resume transaction in current thread. + * Please don't use directly. Use tx.resume() instead. + * + * @param tx Transaction to be resumed. + * + * @see #suspendTx(GridNearTxLocal) + * @see GridNearTxLocal#suspend() + * @see GridNearTxLocal#resume() + */ + public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { + assert tx != null && !tx.system() : tx; + assert !threadMap.containsValue(tx) : tx; + assert !transactionMap(tx).containsValue(tx) : tx; + assert !haveSystemTxForThread(Thread.currentThread().getId()); + + if(!tx.state(ACTIVE)) { + throw new IgniteCheckedException("Trying to resume transaction with incorrect state " + + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); + } + + long threadId = Thread.currentThread().getId(); + + if (threadMap.putIfAbsent(threadId, tx) != null) + throw new IgniteCheckedException("Thread already start a transaction."); + + if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) + throw new IgniteCheckedException("Thread already start a transaction."); + + tx.threadId(threadId); + } + + /** + * @param threadId Thread id. + * @return True if thread have system transaction. False otherwise. + */ + private boolean haveSystemTxForThread(long threadId) { + if (!sysThreadMap.isEmpty()) { + for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { + if (!cacheCtx.systemTx()) + continue; + + if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId()))) + return true; + } + } + + return false; + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java index 429c995..6b79550 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java @@ -190,4 +190,4 @@ public class IgniteTxMap extends AbstractMap impleme @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { throw new IllegalStateException("Transaction view map should never be serialized: " + this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 8750cab..f25fc36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -44,6 +44,8 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; + /** * Cache transaction proxy. */ @@ -98,6 +100,18 @@ public class TransactionProxyImpl implements TransactionProxy, Externaliza * Enters a call. */ private void enter() { + enter(false); + } + + /** + * Enters a call. + * + * @param resume Flag to indicate that resume operation in progress. + */ + private void enter(boolean resume) { + if (!resume && state() == SUSPENDED) + throw new IgniteException("Tx in SUSPENDED state. All operations except resume are prohibited."); + if (cctx.deploymentEnabled()) cctx.deploy().onEnter(); @@ -204,6 +218,21 @@ public class TransactionProxyImpl implements TransactionProxy, Externaliza } /** {@inheritDoc} */ + @Override public void suspend() throws IgniteException { + enter(); + + try { + cctx.suspendTx(tx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leave(); + } + } + + /** {@inheritDoc} */ @Override public long timeout(long timeout) { return tx.timeout(timeout); } @@ -333,6 +362,21 @@ public class TransactionProxyImpl implements TransactionProxy, Externaliza } } + /** {@inheritDoc} */ + @Override public void resume() throws IgniteException { + enter(true); + + try { + cctx.resumeTx(tx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leave(); + } + } + /** * @param res Result to convert to finished future. */ @@ -377,4 +421,4 @@ public class TransactionProxyImpl implements TransactionProxy, Externaliza @Override public String toString() { return S.toString(TransactionProxyImpl.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index 57a2b00..a1b4d78 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -272,4 +272,18 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { * @throws IgniteException If rollback failed. */ public IgniteFuture rollbackAsync() throws IgniteException; + + /** + * Resume transaction if it was previously suspended. Supported only for optimistic transactions. + * + * @throws IgniteException If resume failed. + */ + public void resume() throws IgniteException; + + /** + * Suspends transaction. It could be resumed later. Supported only for optimistic transactions. + * + * @throws IgniteException If suspension failed. + */ + public void suspend() throws IgniteException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java index 1980242..d01c0fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java @@ -48,7 +48,10 @@ public enum TransactionState { ROLLED_BACK, /** Transaction rollback failed or is otherwise unknown state. */ - UNKNOWN; + UNKNOWN, + + /** Transaction has been suspended by user. */ + SUSPENDED; /** Enumerated values. */ private static final TransactionState[] VALS = values(); @@ -62,4 +65,4 @@ public enum TransactionState { @Nullable public static TransactionState fromOrdinal(int ord) { return ord >= 0 && ord < VALS.length ? VALS[ord] : null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java new file mode 100644 index 0000000..a6318d4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +/** + * + */ +public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest { + /** + * @return Number of server nodes. + */ + protected int serversNumber() { + return 4; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java new file mode 100644 index 0000000..d16aebd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionTimeoutException; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionState.ACTIVE; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; +import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; + +/** + * + */ +public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest { + /** Transaction timeout. */ + private static final long TX_TIMEOUT = 100; + + /** Future timeout */ + private static final int FUT_TIMEOUT = 5000; + + private boolean client = false; + + /** + * List of closures to execute transaction operation that prohibited in suspended state. + */ + private static final List> SUSPENDED_TX_PROHIBITED_OPS = Arrays.asList( + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.suspend(); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.close(); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.commit(); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.commitAsync().get(FUT_TIMEOUT); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.rollback(); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.rollbackAsync().get(FUT_TIMEOUT); + } + }, + new CI1Exc() { + @Override public void applyx(Transaction tx) throws Exception { + tx.setRollbackOnly(); + } + } + ); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(serversNumber()); + + if (serversNumber() > 1) { + client = true; + + startGrid(serversNumber()); + + startGrid(serversNumber() + 1); + + client = false; + } + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(true); + } + + /** + * @return Number of server nodes. + */ + protected int serversNumber() { + return 1; + } + + /** + * Test for transaction starting in one thread, continuing in another. + * + * @throws Exception If failed. + */ + public void testResumeTxInAnotherThread() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + final AtomicInteger cntr = new AtomicInteger(0); + + cache.put(-1, -1); + cache.put(cntr.get(), cntr.getAndIncrement()); + + tx.suspend(); + + assertEquals(SUSPENDED, tx.state()); + + assertNull("Thread already have tx", ignite.transactions().tx()); + + assertNull(cache.get(-1)); + assertNull(cache.get(cntr.get())); + + for (int i = 0; i < 10; i++) { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + assertEquals(SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(ACTIVE, tx.state()); + + cache.put(cntr.get(), cntr.getAndIncrement()); + + tx.suspend(); + } + }).get(FUT_TIMEOUT); + } + + tx.resume(); + + cache.remove(-1); + + tx.commit(); + + assertEquals(COMMITTED, tx.state()); + + for (int i = 0; i < cntr.get(); i++) + assertEquals(i, (int)cache.get(i)); + + assertFalse(cache.containsKey(-1)); + + cache.removeAll(); + } + } + }); + } + + /** + * Test for transaction starting in one thread, continuing in another, and resuming in initiating thread. + * Cache operations performed for a couple of caches. + * + * @throws Exception If failed. + */ + public void testCrossCacheTxInAnotherThread() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final IgniteCache otherCache = + ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache")); + + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + final AtomicInteger cntr = new AtomicInteger(0); + + cache.put(-1, -1); + otherCache.put(-1, -1); + + tx.suspend(); + + for (int i = 0; i < 10; i++) { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + tx.resume(); + + assertEquals(ACTIVE, tx.state()); + + cache.put(cntr.get(), cntr.get()); + otherCache.put(cntr.get(), cntr.getAndIncrement()); + + tx.suspend(); + } + }).get(FUT_TIMEOUT); + } + + tx.resume(); + + cache.remove(-1); + otherCache.remove(-1); + + tx.commit(); + + assertEquals(COMMITTED, tx.state()); + + for (int i = 0; i < cntr.get(); i++) { + assertEquals(i, (int)cache.get(i)); + assertEquals(i, (int)otherCache.get(i)); + } + + assertFalse(cache.containsKey(-1)); + assertFalse(otherCache.containsKey(-1)); + + cache.removeAll(); + otherCache.removeAll(); + } + } + }); + } + + /** + * Test for transaction rollback. + * + * @throws Exception If failed. + */ + public void testTxRollback() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + cache.put(1, 1); + cache.put(2, 2); + + tx.suspend(); + + assertNull("There is no transaction for current thread", ignite.transactions().tx()); + + assertEquals(SUSPENDED, tx.state()); + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + tx.resume(); + + assertEquals(ACTIVE, tx.state()); + + cache.put(3, 3); + + tx.rollback(); + } + }).get(FUT_TIMEOUT); + + assertEquals(ROLLED_BACK, tx.state()); + + assertFalse(cache.containsKey(1)); + assertFalse(cache.containsKey(2)); + assertFalse(cache.containsKey(3)); + + cache.removeAll(); + } + } + }); + } + + /** + * Test for starting and suspending transactions, and then resuming and committing in another thread. + * + * @throws Exception If failed. + */ + public void testMultiTxSuspendResume() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final List clientTxs = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + cache.put(i, i); + + tx.suspend(); + + clientTxs.add(tx); + } + + GridTestUtils.runMultiThreaded(new CI1Exc() { + public void applyx(Integer idx) throws Exception { + Transaction tx = clientTxs.get(idx); + + assertEquals(SUSPENDED, tx.state()); + + tx.resume(); + + assertEquals(ACTIVE, tx.state()); + + tx.commit(); + } + }, 10, "th-suspend"); + + for (int i = 0; i < 10; i++) + assertEquals(i, (int)cache.get(i)); + + cache.removeAll(); + } + } + }); + } + + /** + * Test checking all operations(exception resume) on suspended transaction from the other thread are prohibited. + * + * @throws Exception If failed. + */ + public void testOpsProhibitedOnSuspendedTxFromOtherThread() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (final CI1Exc txOperation : SUSPENDED_TX_PROHIBITED_OPS) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + cache.put(1, 1); + + tx.suspend(); + + multithreaded(new RunnableX() { + @Override public void runx() throws Exception { + GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class); + } + }, 1); + + tx.resume(); + tx.close(); + + assertNull(cache.get(1)); + } + } + } + }); + } + + /** + * Test checking all operations(exception resume) on suspended transaction are prohibited. + * + * @throws Exception If failed. + */ + public void testOpsProhibitedOnSuspendedTx() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (CI1Exc txOperation : SUSPENDED_TX_PROHIBITED_OPS) { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + cache.put(1, 1); + + tx.suspend(); + + GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class); + + tx.resume(); + tx.close(); + + assertNull(cache.get(1)); + } + } + } + }); + } + + /** + * Test checking timeout on resumed transaction. + * + * @throws Exception If failed. + */ + public void testTxTimeoutOnResumed() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0); + + cache.put(1, 1); + + tx.suspend(); + + Thread.sleep(TX_TIMEOUT * 2); + + GridTestUtils.assertThrowsWithCause(new Callable() { + @Override public Object call() throws Exception { + tx.resume(); + + return null; + } + }, TransactionTimeoutException.class); + + assertEquals(MARKED_ROLLBACK, tx.state()); + + tx.close(); + } + } + }); + } + + /** + * Test checking timeout on suspended transaction. + * + * @throws Exception If failed. + */ + public void testTxTimeoutOnSuspend() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0); + + cache.put(1, 1); + + Thread.sleep(TX_TIMEOUT * 2); + + GridTestUtils.assertThrowsWithCause(new Callable() { + @Override public Object call() throws Exception { + tx.suspend(); + + return null; + } + }, TransactionTimeoutException.class); + + assertEquals(MARKED_ROLLBACK, tx.state()); + + tx.close(); + + assertNull(cache.get(1)); + } + } + }); + } + + /** + * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write + * the same key and commit it. + * + * @throws Exception If failed. + */ + public void testSuspendTxAndStartNew() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) { + for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) { + Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation); + + cache.put(1, 1); + + tx1.suspend(); + + assertFalse(cache.containsKey(1)); + + Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation); + + cache.put(1, 2); + + tx2.commit(); + + assertEquals(2, (int)cache.get(1)); + + tx1.resume(); + + assertEquals(1, (int)cache.get(1)); + + tx1.close(); + + cache.removeAll(); + } + } + } + }); + } + + /** + * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write + * the same key. + * + * @throws Exception If failed. + */ + public void testSuspendTxAndStartNewWithoutCommit() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) { + for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) { + Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation); + + cache.put(1, 1); + + tx1.suspend(); + + assertFalse(cache.containsKey(1)); + + Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation); + + cache.put(1, 2); + + tx2.suspend(); + + assertFalse(cache.containsKey(1)); + + tx1.resume(); + + assertEquals(1, (int)cache.get(1)); + + tx1.suspend(); + + tx2.resume(); + + assertEquals(2, (int)cache.get(1)); + + tx2.rollback(); + + tx1.resume(); + tx1.rollback(); + + cache.removeAll(); + } + } + } + }); + } + + /** + * Test we can resume and complete transaction if topology changed while transaction is suspended. + * + * @throws Exception If failed. + */ + public void testSuspendTxAndResumeAfterTopologyChange() throws Exception { + executeTestForAllCaches(new CI2Exc>() { + @Override public void applyx(Ignite ignite, final IgniteCache cache) throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation); + + cache.put(1, 1); + + tx.suspend(); + + assertEquals(SUSPENDED, tx.state()); + + try (IgniteEx g = startGrid(serversNumber() + 3)) { + tx.resume(); + + assertEquals(ACTIVE, tx.state()); + + assertEquals(1, (int)cache.get(1)); + + tx.commit(); + + assertEquals(1, (int)cache.get(1)); + } + + cache.removeAll(); + } + } + }); + } + + /** + * @return Cache configurations to test. + */ + private List> cacheConfigurations() { + List> cfgs = new ArrayList<>(); + + cfgs.add(cacheConfiguration(PARTITIONED, 0, false)); + cfgs.add(cacheConfiguration(PARTITIONED, 1, false)); + cfgs.add(cacheConfiguration(PARTITIONED, 1, true)); + cfgs.add(cacheConfiguration(REPLICATED, 0, false)); + + return cfgs; + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + int backups, + boolean nearCache) { + CacheConfiguration ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + return ccfg; + } + + /** + * @param c Closure. + * @throws Exception If failed. + */ + private void executeTestForAllCaches(CI2> c) throws Exception { + for (CacheConfiguration ccfg : cacheConfigurations()) { + ignite(0).createCache(ccfg); + + log.info("Run test for cache [cache=" + ccfg.getCacheMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + (ccfg.getNearConfiguration() != null) + "]"); + + int srvNum = serversNumber(); + if (serversNumber() > 1) { + ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>()); + srvNum += 2; + } + + try { + for (int i = 0; i < srvNum; i++) { + Ignite ignite = ignite(i); + + log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']'); + + c.apply(ignite, ignite.cache(ccfg.getName())); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * Closure with 2 parameters that can throw any exception. + * + * @param Type of first closure parameter. + * @param Type of second closure parameter. + */ + public static abstract class CI2Exc implements CI2 { + /** + * Closure body. + * + * @param e1 First closure argument. + * @param e2 Second closure argument. + * @throws Exception If failed. + */ + public abstract void applyx(E1 e1, E2 e2) throws Exception; + + /** {@inheritdoc} */ + @Override public void apply(E1 e1, E2 e2) { + try { + applyx(e1, e2); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Closure that can throw any exception. + * + * @param Type of closure parameter. + */ + public static abstract class CI1Exc implements CI1 { + /** + * Closure body. + * + * @param o Closure argument. + * @throws Exception If failed. + */ + public abstract void applyx(T o) throws Exception; + + /** {@inheritdoc} */ + @Override public void apply(T o) { + try { + applyx(o); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Runnable that can throw any exception. + */ + public static abstract class RunnableX implements Runnable { + /** + * Closure body. + * + * @throws Exception If failed. + */ + public abstract void runx() throws Exception; + + /** {@inheritdoc} */ + @Override public void run() { + try { + runx(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java new file mode 100644 index 0000000..57a1470 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgnitePessimisticTxSuspendResumeTest extends GridCommonAbstractTest { + /** + * Creates new cache configuration. + * + * @return CacheConfiguration New cache configuration. + */ + protected CacheConfiguration getCacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setClientMode(false); + cfg.setCacheConfiguration(getCacheConfiguration()); + + return cfg; + } + + /** + * Test for suspension on pessimistic transaction. + * + * @throws Exception If failed. + */ + public void testSuspendPessimisticTx() throws Exception { + try (Ignite g = startGrid()) { + IgniteCache cache = jcache(); + + IgniteTransactions txs = g.transactions(); + + for (TransactionIsolation isolation : TransactionIsolation.values()) { + final Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, isolation); + + cache.put(1, "1"); + + GridTestUtils.assertThrowsWithCause(new Callable() { + @Override public Object call() throws Exception { + tx.suspend(); + + return null; + } + }, UnsupportedOperationException.class); + + tx.close(); + + assertNull(cache.get(1)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index cbcbaee..585c759 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -426,6 +426,32 @@ public final class GridTestUtils { } /** + * Checks whether closure throws exception, which is itself of a specified + * class, or has a cause of the specified class. + * + * @param call Closure. + * @param p Parameter passed to closure. + * @param cls Expected class. + * @return Thrown throwable. + */ + public static

Throwable assertThrowsWithCause(IgniteInClosure

call, P p, Class cls) { + assert call != null; + assert cls != null; + + try { + call.apply(p); + } + catch (Throwable e) { + if (!X.hasCause(e, cls)) + fail("Exception is neither of a specified class, nor has a cause of the specified class: " + cls, e); + + return e; + } + + throw new AssertionError("Exception has not been thrown."); + } + + /** * Throw assertion error with specified error message and initialized cause. * * @param msg Error message. http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java index c5673b3..f764212 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java @@ -579,6 +579,16 @@ public abstract class GridAbstractCacheStoreSelfTest rollbackAsync() throws IgniteException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index bb32d24..771e974 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -19,6 +19,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; +import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest; /** * Test suite. @@ -33,6 +36,10 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(CachePartitionStateTest.class); + suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class); + suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class); + suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 5047491..dd5f6b7 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -28,10 +28,11 @@ import org.apache.ignite.cache.jta.CacheTmLookup; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.lifecycle.LifecycleAware; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.transactions.TransactionState.SUSPENDED; + /** * Implementation of {@link CacheJtaManagerAdapter}. */ @@ -147,7 +148,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter { if (jtaTm != null) { CacheJtaResource rsrc = this.rsrc.get(); - if (rsrc == null || rsrc.isFinished()) { + if (rsrc == null || rsrc.isFinished() || rsrc.cacheTx().state() == SUSPENDED) { try { Transaction jtaTx = jtaTm.getTransaction(); http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java index c63dafa..5b1b37a 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java @@ -75,12 +75,21 @@ final class CacheJtaResource implements XAResource, Synchronization { } /** {@inheritDoc} */ - @Override public void start(Xid xid, int flags) { + @Override public void start(Xid xid, int flags) throws XAException { if (log.isDebugEnabled()) log.debug("XA resource start(...) [xid=" + xid + ", flags=<" + flags(flags) + ">]"); // Simply save global transaction id. this.xid = xid; + + if ((flags & TMRESUME) == TMRESUME) { + try { + cacheTx.resume(); + } + catch (IgniteCheckedException e) { + throwException("Failed to resume cache transaction: " + e.getMessage(), e); + } + } } /** @@ -132,7 +141,7 @@ final class CacheJtaResource implements XAResource, Synchronization { } /** {@inheritDoc} */ - @Override public void end(Xid xid, int flags) { + @Override public void end(Xid xid, int flags) throws XAException { assert this.xid.equals(xid); if (log.isDebugEnabled()) @@ -140,6 +149,14 @@ final class CacheJtaResource implements XAResource, Synchronization { if ((flags & TMFAIL) > 0) cacheTx.setRollbackOnly(); + else if ((flags & TMSUSPEND) == TMSUSPEND) { + try { + cacheTx.suspend(); + } + catch (IgniteCheckedException e) { + throwException("Failed to suspend cache transaction: " + e.getMessage(), e); + } + } } /** {@inheritDoc} */ @@ -299,6 +316,13 @@ final class CacheJtaResource implements XAResource, Synchronization { return state == COMMITTED || state == ROLLED_BACK; } + /** + * @return Internal tx + */ + GridNearTxLocal cacheTx() { + return cacheTx; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheJtaResource.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java new file mode 100644 index 0000000..a181068 --- /dev/null +++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import javax.cache.configuration.Factory; +import javax.transaction.Transaction; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.objectweb.jotm.Current; +import org.objectweb.jotm.Jotm; +import org.objectweb.transaction.jta.TransactionManager; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.transactions.TransactionState.ACTIVE; + +/** + * JTA Tx Manager test. + */ +public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest { + /** Java Open Transaction Manager facade. */ + private static Jotm jotm; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName). + setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED)); + + cfg.getTransactionConfiguration().setTxManagerFactory(new Factory() { + private static final long serialVersionUID = 0L; + + @Override public TransactionManager create() { + return jotm.getTransactionManager(); + } + }); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + jotm = new Jotm(true, false); + + Current.setAppServer(false); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + + jotm.stop(); + } + + /** + * Test for switching tx context by JTA Manager. + * + * @throws Exception If failed. + */ + public void testJtaTxContextSwitch() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration(); + + cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC); + cfg.setDefaultTxIsolation(isolation); + + TransactionManager jtaTm = jotm.getTransactionManager(); + + IgniteCache cache = jcache(); + + assertNull(grid().transactions().tx()); + + jtaTm.begin(); + + Transaction tx1 = jtaTm.getTransaction(); + + cache.put(1, Integer.toString(1)); + + assertNotNull(grid().transactions().tx()); + + assertEquals(ACTIVE, grid().transactions().tx().state()); + + assertEquals(Integer.toString(1), cache.get(1)); + + jtaTm.suspend(); + + assertNull(grid().transactions().tx()); + + assertNull(cache.get(1)); + + jtaTm.begin(); + + Transaction tx2 = jtaTm.getTransaction(); + + assertNotSame(tx1, tx2); + + cache.put(2, Integer.toString(2)); + + assertNotNull(grid().transactions().tx()); + + assertEquals(ACTIVE, grid().transactions().tx().state()); + + assertEquals(Integer.toString(2), cache.get(2)); + + jtaTm.commit(); + + assertNull(grid().transactions().tx()); + + assertEquals(Integer.toString(2), cache.get(2)); + + jtaTm.resume(tx1); + + assertNotNull(grid().transactions().tx()); + + assertEquals(ACTIVE, grid().transactions().tx().state()); + + cache.put(3, Integer.toString(3)); + + jtaTm.commit(); + + assertEquals("1", cache.get(1)); + assertEquals("2", cache.get(2)); + assertEquals("3", cache.get(3)); + + assertNull(grid().transactions().tx()); + + cache.removeAll(); + } + } + + /** + * @throws Exception If failed. + */ + public void testJtaTxContextSwitchWithExistingTx() throws Exception { + for (TransactionIsolation isolation : TransactionIsolation.values()) { + TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration(); + + cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC); + cfg.setDefaultTxIsolation(isolation); + + TransactionManager jtaTm = jotm.getTransactionManager(); + + IgniteCache cache = jcache(); + + jtaTm.begin(); + + Transaction tx1 = jtaTm.getTransaction(); + + cache.put(1, Integer.toString(1)); + + assertNotNull(grid().transactions().tx()); + + assertEquals(ACTIVE, grid().transactions().tx().state()); + + assertEquals(Integer.toString(1), cache.get(1)); + + jtaTm.suspend(); + + jtaTm.begin(); + + Transaction tx2 = jtaTm.getTransaction(); + + assertNotSame(tx1, tx2); + + cache.put(2, Integer.toString(2)); + + try { + jtaTm.resume(tx1); + + fail("jtaTm.resume shouldn't success."); + } + catch (IllegalStateException ignored) { + // No-op. + } + finally { + jtaTm.rollback(); //rolling back tx2 + } + + jtaTm.resume(tx1); + jtaTm.rollback(); + + cache.removeAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java index 677f485..3cc7935 100644 --- a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java +++ b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java @@ -21,6 +21,7 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheJndiTmFactorySelfTest; import org.apache.ignite.internal.processors.cache.GridCacheJtaConfigurationValidationSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheJtaFactoryConfigValidationSelfTest; +import org.apache.ignite.internal.processors.cache.GridJtaTransactionManagerSelfTest; import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaFactorySelfTest; import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaFactoryUseSyncSelfTest; import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaLookupClassNameSelfTest; @@ -54,6 +55,8 @@ public class IgniteJtaTestSuite extends TestSuite { suite.addTestSuite(GridCacheJtaConfigurationValidationSelfTest.class); suite.addTestSuite(GridCacheJtaFactoryConfigValidationSelfTest.class); + suite.addTestSuite(GridJtaTransactionManagerSelfTest.class); + // Factory suite.addTestSuite(CacheJndiTmFactorySelfTest.class);