Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8F7AA200C1E for ; Thu, 2 Feb 2017 09:49:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8E518160B65; Thu, 2 Feb 2017 08:49:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C295B160B72 for ; Thu, 2 Feb 2017 09:49:01 +0100 (CET) Received: (qmail 14629 invoked by uid 500); 2 Feb 2017 08:48:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 14591 invoked by uid 99); 2 Feb 2017 08:48:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Feb 2017 08:48:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 528A5DFC47; Thu, 2 Feb 2017 08:48:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 02 Feb 2017 08:48:59 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/24] ignite git commit: IGNITE-3430 .NET: Run Ignite transactions via standard TransactionScope API archived-at: Thu, 02 Feb 2017 08:49:05 -0000 Repository: ignite Updated Branches: refs/heads/ignite-2.0 f299d9adb -> 739c606aa IGNITE-3430 .NET: Run Ignite transactions via standard TransactionScope API This closes #1407 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5375dee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5375dee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5375dee Branch: refs/heads/ignite-2.0 Commit: f5375deead7fd40787cfb2e7130ee7c2d58820af Parents: 9d64a28 Author: Pavel Tupitsyn Authored: Thu Jan 26 17:46:36 2017 +0300 Committer: Pavel Tupitsyn Committed: Thu Jan 26 17:46:36 2017 +0300 ---------------------------------------------------------------------- .../transactions/PlatformTransactions.java | 9 + .../Apache.Ignite.Core.Tests.csproj | 1 + .../Cache/CacheAbstractTest.cs | 21 ++ .../Cache/CacheAbstractTransactionalTest.cs | 324 +++++++++++++++++++ .../Apache.Ignite.Core.csproj | 2 + .../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 4 +- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 90 +++++- .../Transactions/CacheTransactionManager.cs | 160 +++++++++ .../Impl/Transactions/Transaction.cs | 8 + .../Impl/Transactions/TransactionImpl.cs | 13 + .../Impl/Transactions/TransactionsImpl.cs | 11 + 11 files changed, 631 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 3cee2b1..21f71fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -22,6 +22,7 @@ import org.apache.ignite.IgniteTransactions; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.util.GridConcurrentFactory; @@ -75,6 +76,9 @@ public class PlatformTransactions extends PlatformAbstractTarget { public static final int OP_RESET_METRICS = 11; /** */ + public static final int OP_PREPARE = 12; + + /** */ private final IgniteTransactions txs; /** Map with currently active transactions. */ @@ -155,6 +159,11 @@ public class PlatformTransactions extends PlatformAbstractTarget { /** {@inheritDoc} */ @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { + case OP_PREPARE: + ((TransactionProxyImpl)tx(val)).tx().prepare(); + + return TRUE; + case OP_COMMIT: tx(val).commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 08352b3..2764848 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -63,6 +63,7 @@ + http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs index 1239794..1bc0f02 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs @@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Tests.Cache using System.Text; using System.Threading; using System.Threading.Tasks; + using System.Transactions; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Expiry; @@ -2646,6 +2647,26 @@ namespace Apache.Ignite.Core.Tests.Cache Assert.AreEqual(5, cache[1]); } + /// + /// Tests that operations in TransactionScope work correctly in any cache mode (tx or non-tx). + /// + [Test] + public void TestTransactionScope() + { + var cache = Cache(); + + cache[1] = 1; + + using (var ts = new TransactionScope()) + { + cache[1] = 2; + + ts.Complete(); + } + + Assert.AreEqual(2, cache[1]); + } + private void TestKeepBinaryFlag(bool async) { var cache0 = async ? Cache().WrapAsync() : Cache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs index 5dcc560..37a22ae 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs @@ -18,10 +18,13 @@ namespace Apache.Ignite.Core.Tests.Cache { using System; + using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; + using System.Transactions; using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Transactions; using NUnit.Framework; @@ -561,5 +564,326 @@ namespace Apache.Ignite.Core.Tests.Cache var deadlockEx = aex.InnerExceptions.OfType().First(); Assert.IsTrue(deadlockEx.Message.Trim().StartsWith("Deadlock detected:"), deadlockEx.Message); } + + /// + /// Test Ignite transaction enlistment in ambient . + /// + [Test] + public void TestTransactionScopeSingleCache() + { + var cache = Cache(); + + cache[1] = 1; + cache[2] = 2; + + // Commit. + using (var ts = new TransactionScope()) + { + cache[1] = 10; + cache[2] = 20; + + Assert.IsNotNull(cache.Ignite.GetTransactions().Tx); + + ts.Complete(); + } + + Assert.AreEqual(10, cache[1]); + Assert.AreEqual(20, cache[2]); + + // Rollback. + using (new TransactionScope()) + { + cache[1] = 100; + cache[2] = 200; + } + + Assert.AreEqual(10, cache[1]); + Assert.AreEqual(20, cache[2]); + } + + /// + /// Test Ignite transaction enlistment in ambient + /// with multiple participating caches. + /// + [Test] + public void TestTransactionScopeMultiCache() + { + var cache1 = Cache(); + + var cache2 = GetIgnite(0).GetOrCreateCache(new CacheConfiguration(cache1.Name + "_") + { + AtomicityMode = CacheAtomicityMode.Transactional + }); + + cache1[1] = 1; + cache2[1] = 2; + + // Commit. + using (var ts = new TransactionScope()) + { + cache1[1] = 10; + cache2[1] = 20; + + ts.Complete(); + } + + Assert.AreEqual(10, cache1[1]); + Assert.AreEqual(20, cache2[1]); + + // Rollback. + using (new TransactionScope()) + { + cache1[1] = 100; + cache2[1] = 200; + } + + Assert.AreEqual(10, cache1[1]); + Assert.AreEqual(20, cache2[1]); + } + + /// + /// Test Ignite transaction enlistment in ambient + /// when Ignite tx is started manually. + /// + [Test] + public void TestTransactionScopeWithManualIgniteTx() + { + var cache = Cache(); + var transactions = cache.Ignite.GetTransactions(); + + cache[1] = 1; + + // When Ignite tx is started manually, it won't be enlisted in TransactionScope. + using (var tx = transactions.TxStart()) + { + using (new TransactionScope()) + { + cache[1] = 2; + } // Revert transaction scope. + + tx.Commit(); // Commit manual tx. + } + + Assert.AreEqual(2, cache[1]); + } + + /// + /// Test Ignite transaction with option. + /// + [Test] + public void TestSuppressedTransactionScope() + { + var cache = Cache(); + + cache[1] = 1; + + using (new TransactionScope(TransactionScopeOption.Suppress)) + { + cache[1] = 2; + } + + // Even though transaction is not completed, the value is updated, because tx is suppressed. + Assert.AreEqual(2, cache[1]); + } + + /// + /// Test Ignite transaction enlistment in ambient with nested scopes. + /// + [Test] + public void TestNestedTransactionScope() + { + var cache = Cache(); + + cache[1] = 1; + + foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew}) + { + // Commit. + using (var ts1 = new TransactionScope()) + { + using (var ts2 = new TransactionScope(option)) + { + cache[1] = 2; + ts2.Complete(); + } + + cache[1] = 3; + ts1.Complete(); + } + + Assert.AreEqual(3, cache[1]); + + // Rollback. + using (new TransactionScope()) + { + using (new TransactionScope(option)) + cache[1] = 4; + + cache[1] = 5; + } + + // In case with Required option there is a single tx + // that gets aborted, second put executes outside the tx. + Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1], option.ToString()); + } + } + + /// + /// Test that ambient options propagate to Ignite transaction. + /// + [Test] + public void TestTransactionScopeOptions() + { + var cache = Cache(); + var transactions = cache.Ignite.GetTransactions(); + + var modes = new[] + { + Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable), + Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead), + Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted), + Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted), + Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted), + Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted), + }; + + foreach (var mode in modes) + { + using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions + { + IsolationLevel = mode.Item1 + })) + { + cache[1] = 1; + + var tx = transactions.Tx; + Assert.AreEqual(mode.Item2, tx.Isolation); + Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency); + } + } + } + + /// + /// Tests all transactional operations with . + /// + [Test] + public void TestTransactionScopeAllOperations() + { + CheckTxOp((cache, key) => cache.Put(key, -5)); + CheckTxOp((cache, key) => cache.PutAsync(key, -5).Wait()); + + CheckTxOp((cache, key) => cache.PutAll(new Dictionary {{key, -7}})); + CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary {{key, -7}}).Wait()); + + CheckTxOp((cache, key) => + { + cache.Remove(key); + cache.PutIfAbsent(key, -10); + }); + CheckTxOp((cache, key) => + { + cache.Remove(key); + cache.PutIfAbsentAsync(key, -10); + }); + + CheckTxOp((cache, key) => cache.GetAndPut(key, -9)); + CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9).Wait()); + + CheckTxOp((cache, key) => + { + cache.Remove(key); + cache.GetAndPutIfAbsent(key, -10); + }); + CheckTxOp((cache, key) => + { + cache.Remove(key); + cache.GetAndPutIfAbsentAsync(key, -10).Wait(); + }); + + CheckTxOp((cache, key) => cache.GetAndRemove(key)); + CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key)); + + CheckTxOp((cache, key) => cache.GetAndReplace(key, -11)); + CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11)); + + CheckTxOp((cache, key) => cache.Invoke(key, new AddProcessor(), 1)); + CheckTxOp((cache, key) => cache.InvokeAsync(key, new AddProcessor(), 1)); + + CheckTxOp((cache, key) => cache.InvokeAll(new[] {key}, new AddProcessor(), 1)); + CheckTxOp((cache, key) => cache.InvokeAllAsync(new[] {key}, new AddProcessor(), 1)); + + CheckTxOp((cache, key) => cache.Remove(key)); + CheckTxOp((cache, key) => cache.RemoveAsync(key)); + + CheckTxOp((cache, key) => cache.RemoveAll(new[] { key })); + CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] { key }).Wait()); + + CheckTxOp((cache, key) => cache.Replace(key, 100)); + CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100)); + + CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100)); + CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100)); + } + + /// + /// Checks that cache operation behaves transactionally. + /// + private void CheckTxOp(Action, int> act) + { + var cache = Cache(); + + cache[1] = 1; + cache[2] = 2; + + // Rollback. + using (new TransactionScope()) + { + act(cache, 1); + + Assert.IsNotNull(cache.Ignite.GetTransactions().Tx, "Transaction has not started."); + } + + Assert.AreEqual(1, cache[1]); + Assert.AreEqual(2, cache[2]); + + using (new TransactionScope()) + { + act(cache, 1); + act(cache, 2); + } + + Assert.AreEqual(1, cache[1]); + Assert.AreEqual(2, cache[2]); + + // Commit. + using (var ts = new TransactionScope()) + { + act(cache, 1); + ts.Complete(); + } + + Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1); + Assert.AreEqual(2, cache[2]); + + using (var ts = new TransactionScope()) + { + act(cache, 1); + act(cache, 2); + ts.Complete(); + } + + Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1); + Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2); + } + + [Serializable] + private class AddProcessor : ICacheEntryProcessor + { + public int Process(IMutableCacheEntry entry, int arg) + { + entry.Value += arg; + return arg; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index d58c872..673449e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -87,6 +87,7 @@ + @@ -189,6 +190,7 @@ + http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs index 2a0ec86..50938e1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs @@ -656,14 +656,14 @@ namespace Apache.Ignite.Core.Cache /// /// Removes all mappings from cache. /// If write-through is enabled, the value will be removed from store. - /// This method is transactional and will enlist the entry into ongoing transaction if there is one. + /// This method is not transactional. /// void RemoveAll(); /// /// Removes all mappings from cache. /// If write-through is enabled, the value will be removed from store. - /// This method is transactional and will enlist the entry into ongoing transaction if there is one. + /// This method is not transactional. /// Task RemoveAllAsync(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index b8dc6cb..a387e1b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl.Cache using System; using System.Collections; using System.Collections.Generic; + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Apache.Ignite.Core.Binary; @@ -36,6 +37,7 @@ namespace Apache.Ignite.Core.Impl.Cache using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Cluster; using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Transactions; using Apache.Ignite.Core.Impl.Unmanaged; /// @@ -56,6 +58,9 @@ namespace Apache.Ignite.Core.Impl.Cache /** Flag: no-retries.*/ private readonly bool _flagNoRetries; + /** Transaction manager. */ + private readonly CacheTransactionManager _txManager; + /// /// Constructor. /// @@ -68,10 +73,16 @@ namespace Apache.Ignite.Core.Impl.Cache public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh, bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries) : base(target, marsh) { + Debug.Assert(grid != null); + _ignite = grid; _flagSkipStore = flagSkipStore; _flagKeepBinary = flagKeepBinary; _flagNoRetries = flagNoRetries; + + _txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional + ? new CacheTransactionManager(grid.GetTransactions()) + : null; } /** */ @@ -416,9 +427,10 @@ namespace Apache.Ignite.Core.Impl.Cache public void Put(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + DoOutOp(CacheOp.Put, key, val); } @@ -428,6 +440,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.PutAsync, key, val); } @@ -437,6 +451,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutInOpNullable(CacheOp.GetAndPut, key, val); } @@ -446,6 +462,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.GetAndPutAsync, w => { w.WriteObject(key); @@ -457,9 +475,10 @@ namespace Apache.Ignite.Core.Impl.Cache public CacheResult GetAndReplace(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutInOpNullable(CacheOp.GetAndReplace, key, val); } @@ -469,6 +488,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w => { w.WriteObject(key); @@ -481,6 +502,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); + StartTx(); + return DoOutInOpNullable(CacheOp.GetAndRemove, key); } @@ -489,6 +512,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); + StartTx(); + return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r)); } @@ -496,9 +521,10 @@ namespace Apache.Ignite.Core.Impl.Cache public bool PutIfAbsent(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOp(CacheOp.PutIfAbsent, key, val); } @@ -508,6 +534,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.PutIfAbsentAsync, key, val); } @@ -515,9 +543,10 @@ namespace Apache.Ignite.Core.Impl.Cache public CacheResult GetAndPutIfAbsent(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val); } @@ -527,6 +556,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w => { w.WriteObject(key); @@ -538,9 +569,10 @@ namespace Apache.Ignite.Core.Impl.Cache public bool Replace(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOp(CacheOp.Replace2, key, val); } @@ -550,6 +582,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.Replace2Async, key, val); } @@ -557,11 +591,11 @@ namespace Apache.Ignite.Core.Impl.Cache public bool Replace(TK key, TV oldVal, TV newVal) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(oldVal, "oldVal"); - IgniteArgumentCheck.NotNull(newVal, "newVal"); + StartTx(); + return DoOutOp(CacheOp.Replace3, key, oldVal, newVal); } @@ -572,6 +606,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(oldVal, "oldVal"); IgniteArgumentCheck.NotNull(newVal, "newVal"); + StartTx(); + return DoOutOpAsync(CacheOp.Replace3Async, w => { w.WriteObject(key); @@ -585,6 +621,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); + StartTx(); + DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals)); } @@ -593,6 +631,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); + StartTx(); + return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals)); } @@ -669,6 +709,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); + StartTx(); + return DoOutOp(CacheOp.RemoveObj, key); } @@ -677,6 +719,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); + StartTx(); + return DoOutOpAsync(CacheOp.RemoveObjAsync, key); } @@ -686,6 +730,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOp(CacheOp.RemoveBool, key, val); } @@ -695,6 +741,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); + StartTx(); + return DoOutOpAsync(CacheOp.RemoveBoolAsync, key, val); } @@ -703,6 +751,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); + StartTx(); + DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); } @@ -711,18 +761,24 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); + StartTx(); + return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys)); } /** */ public void RemoveAll() { + StartTx(); + DoOutInOp((int) CacheOp.RemoveAll2); } /** */ public Task RemoveAllAsync() { + StartTx(); + return DoOutOpAsync(CacheOp.RemoveAll2Async); } @@ -773,9 +829,10 @@ namespace Apache.Ignite.Core.Impl.Cache public TRes Invoke(TK key, ICacheEntryProcessor processor, TArg arg) { IgniteArgumentCheck.NotNull(key, "key"); - IgniteArgumentCheck.NotNull(processor, "processor"); + StartTx(); + var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); @@ -795,6 +852,8 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(processor, "processor"); + StartTx(); + var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); @@ -822,9 +881,10 @@ namespace Apache.Ignite.Core.Impl.Cache ICacheEntryProcessor processor, TArg arg) { IgniteArgumentCheck.NotNull(keys, "keys"); - IgniteArgumentCheck.NotNull(processor, "processor"); + StartTx(); + var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); @@ -842,9 +902,10 @@ namespace Apache.Ignite.Core.Impl.Cache ICacheEntryProcessor processor, TArg arg) { IgniteArgumentCheck.NotNull(keys, "keys"); - IgniteArgumentCheck.NotNull(processor, "processor"); + StartTx(); + var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); @@ -1308,5 +1369,14 @@ namespace Apache.Ignite.Core.Impl.Cache { DoOutInOp((int) CacheOp.CloseLock, id); } + + /// + /// Starts a transaction when applicable. + /// + private void StartTx() + { + if (_txManager != null) + _txManager.StartTx(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs new file mode 100644 index 0000000..f5a1617 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Transactions +{ + using System; + using System.Diagnostics; + using System.Threading; + using System.Transactions; + using Apache.Ignite.Core.Transactions; + + /// + /// Cache transaction enlistment manager, + /// allows using Ignite transactions via standard . + /// + internal class CacheTransactionManager : IEnlistmentNotification + { + /** */ + private readonly ITransactions _transactions; + + /** */ + private static readonly ThreadLocal Enlistment = new ThreadLocal(); + + /// + /// Initializes a new instance of class. + /// + /// Transactions. + public CacheTransactionManager(ITransactions transactions) + { + Debug.Assert(transactions != null); + + _transactions = transactions; + } + + /// + /// If ambient transaction is present, starts an Ignite transaction and enlists it. + /// + public void StartTx() + { + if (_transactions.Tx != null) + { + // Ignite transaction is already present. + // We have either enlisted it already, or it has been started manually and should not be enlisted. + // Java enlists existing Ignite tx in this case (see CacheJtaManager.java), but we do not. + return; + } + + if (Enlistment.Value != null) + { + // We are already enlisted. + // .NET transaction mechanism allows nested transactions, + // and they can be processed differently depending on TransactionScopeOption. + // Ignite, however, allows only one active transaction per thread. + // Therefore we enlist only once on the first transaction that we encounter. + return; + } + + var ambientTx = System.Transactions.Transaction.Current; + + if (ambientTx != null && ambientTx.TransactionInformation.Status == TransactionStatus.Active) + { + _transactions.TxStart(_transactions.DefaultTransactionConcurrency, + ConvertTransactionIsolation(ambientTx.IsolationLevel)); + + Enlistment.Value = ambientTx.EnlistVolatile(this, EnlistmentOptions.None); + } + } + + /** */ + void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment) + { + var igniteTx = _transactions.Tx; + + if (igniteTx != null && Enlistment.Value != null) + { + ((Transaction) igniteTx).Prepare(); + } + + preparingEnlistment.Prepared(); + } + + /** */ + void IEnlistmentNotification.Commit(Enlistment enlistment) + { + var igniteTx = _transactions.Tx; + + if (igniteTx != null && Enlistment.Value != null) + { + Debug.Assert(ReferenceEquals(enlistment, Enlistment.Value)); + + igniteTx.Commit(); + + igniteTx.Dispose(); + + Enlistment.Value = null; + } + + enlistment.Done(); + } + + /** */ + void IEnlistmentNotification.Rollback(Enlistment enlistment) + { + var igniteTx = _transactions.Tx; + + if (igniteTx != null && Enlistment.Value != null) + { + igniteTx.Rollback(); + + igniteTx.Dispose(); + + Enlistment.Value = null; + } + + enlistment.Done(); + } + + /** */ + void IEnlistmentNotification.InDoubt(Enlistment enlistment) + { + enlistment.Done(); + } + + /// + /// Converts the isolation level from .NET-specific to Ignite-specific. + /// + private static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation) + { + switch (isolation) + { + case IsolationLevel.Serializable: + return TransactionIsolation.Serializable; + case IsolationLevel.RepeatableRead: + return TransactionIsolation.RepeatableRead; + case IsolationLevel.ReadCommitted: + case IsolationLevel.ReadUncommitted: + case IsolationLevel.Snapshot: + case IsolationLevel.Chaos: + return TransactionIsolation.ReadCommitted; + default: + throw new ArgumentOutOfRangeException("isolation", isolation, + "Unsupported transaction isolation level: " + isolation); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs index 595300c..f700bfd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs @@ -142,5 +142,13 @@ namespace Apache.Ignite.Core.Impl.Transactions { return _tx.RemoveMeta(name); } + + /// + /// Executes prepare step of the two phase commit. + /// + public void Prepare() + { + _tx.Prepare(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs index d32cd3d..0b04a68 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs @@ -112,6 +112,19 @@ namespace Apache.Ignite.Core.Impl.Transactions } /// + /// Executes prepare step of the two phase commit. + /// + public void Prepare() + { + lock (this) + { + ThrowIfClosed(); + + _txs.TxPrepare(this); + } + } + + /// /// Commits this tx and closes it. /// public void Commit() http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs index 5fa5db8..ff5d6a2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Transactions using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Transactions; @@ -63,6 +64,8 @@ namespace Apache.Ignite.Core.Impl.Transactions /** */ private const int OpResetMetrics = 11; + /** */ + private const int OpPrepare = 12; /** */ private readonly TransactionConcurrency _dfltConcurrency; @@ -177,6 +180,14 @@ namespace Apache.Ignite.Core.Impl.Transactions } /// + /// Executes prepare step of the two phase commit. + /// + internal void TxPrepare(TransactionImpl tx) + { + DoOutInOp(OpPrepare, tx.Id); + } + + /// /// Commit transaction. /// /// Transaction.