ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [07/16] ignite git commit: IGNITE-3430 .NET: Run Ignite transactions via standard TransactionScope API
Date Mon, 30 Jan 2017 11:01:05 GMT
IGNITE-3430 .NET: Run Ignite transactions via standard TransactionScope API

This closes #1407


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5375dee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5375dee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5375dee

Branch: refs/heads/ignite-4436-2
Commit: f5375deead7fd40787cfb2e7130ee7c2d58820af
Parents: 9d64a28
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Thu Jan 26 17:46:36 2017 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Thu Jan 26 17:46:36 2017 +0300

----------------------------------------------------------------------
 .../transactions/PlatformTransactions.java      |   9 +
 .../Apache.Ignite.Core.Tests.csproj             |   1 +
 .../Cache/CacheAbstractTest.cs                  |  21 ++
 .../Cache/CacheAbstractTransactionalTest.cs     | 324 +++++++++++++++++++
 .../Apache.Ignite.Core.csproj                   |   2 +
 .../dotnet/Apache.Ignite.Core/Cache/ICache.cs   |   4 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |  90 +++++-
 .../Transactions/CacheTransactionManager.cs     | 160 +++++++++
 .../Impl/Transactions/Transaction.cs            |   8 +
 .../Impl/Transactions/TransactionImpl.cs        |  13 +
 .../Impl/Transactions/TransactionsImpl.cs       |  11 +
 11 files changed, 631 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 3cee2b1..21f71fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
@@ -75,6 +76,9 @@ public class PlatformTransactions extends PlatformAbstractTarget {
     public static final int OP_RESET_METRICS = 11;
 
     /** */
+    public static final int OP_PREPARE = 12;
+
+    /** */
     private final IgniteTransactions txs;
 
     /** Map with currently active transactions. */
@@ -155,6 +159,11 @@ public class PlatformTransactions extends PlatformAbstractTarget {
     /** {@inheritDoc} */
     @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException
{
         switch (type) {
+            case OP_PREPARE:
+                ((TransactionProxyImpl)tx(val)).tx().prepare();
+
+                return TRUE;
+
             case OP_COMMIT:
                 tx(val).commit();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 08352b3..2764848 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -63,6 +63,7 @@
     <Reference Include="System.Core" />
     <Reference Include="System.Runtime.Serialization" />
     <Reference Include="System.ServiceProcess" />
+    <Reference Include="System.Transactions" />
     <Reference Include="System.XML" />
     <Reference Include="System.Xml.Linq" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
index 1239794..1bc0f02 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Tests.Cache
     using System.Text;
     using System.Threading;
     using System.Threading.Tasks;
+    using System.Transactions;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Expiry;
@@ -2646,6 +2647,26 @@ namespace Apache.Ignite.Core.Tests.Cache
             Assert.AreEqual(5, cache[1]);
         }
 
+        /// <summary>
+        /// Tests that operations in TransactionScope work correctly in any cache mode (tx
or non-tx).
+        /// </summary>
+        [Test]
+        public void TestTransactionScope()
+        {
+            var cache = Cache();
+
+            cache[1] = 1;
+
+            using (var ts = new TransactionScope())
+            {
+                cache[1] = 2;
+
+                ts.Complete();
+            }
+
+            Assert.AreEqual(2, cache[1]);
+        }
+
         private void TestKeepBinaryFlag(bool async)
         {
             var cache0 = async ? Cache().WrapAsync() : Cache();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
index 5dcc560..37a22ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
@@ -18,10 +18,13 @@
 namespace Apache.Ignite.Core.Tests.Cache
 {
     using System;
+    using System.Collections.Generic;
     using System.Linq;
     using System.Threading;
     using System.Threading.Tasks;
+    using System.Transactions;
     using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Transactions;
     using NUnit.Framework;
 
@@ -561,5 +564,326 @@ namespace Apache.Ignite.Core.Tests.Cache
             var deadlockEx = aex.InnerExceptions.OfType<TransactionDeadlockException>().First();
             Assert.IsTrue(deadlockEx.Message.Trim().StartsWith("Deadlock detected:"), deadlockEx.Message);
         }
+
+        /// <summary>
+        /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>.
+        /// </summary>
+        [Test]
+        public void TestTransactionScopeSingleCache()
+        {
+            var cache = Cache();
+
+            cache[1] = 1;
+            cache[2] = 2;
+
+            // Commit.
+            using (var ts = new TransactionScope())
+            {
+                cache[1] = 10;
+                cache[2] = 20;
+
+                Assert.IsNotNull(cache.Ignite.GetTransactions().Tx);
+
+                ts.Complete();
+            }
+
+            Assert.AreEqual(10, cache[1]);
+            Assert.AreEqual(20, cache[2]);
+
+            // Rollback.
+            using (new TransactionScope())
+            {
+                cache[1] = 100;
+                cache[2] = 200;
+            }
+
+            Assert.AreEqual(10, cache[1]);
+            Assert.AreEqual(20, cache[2]);
+        }
+
+        /// <summary>
+        /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>

+        /// with multiple participating caches.
+        /// </summary>
+        [Test]
+        public void TestTransactionScopeMultiCache()
+        {
+            var cache1 = Cache();
+
+            var cache2 = GetIgnite(0).GetOrCreateCache<int, int>(new CacheConfiguration(cache1.Name
+ "_")
+            {
+                AtomicityMode = CacheAtomicityMode.Transactional
+            });
+
+            cache1[1] = 1;
+            cache2[1] = 2;
+
+            // Commit.
+            using (var ts = new TransactionScope())
+            {
+                cache1[1] = 10;
+                cache2[1] = 20;
+
+                ts.Complete();
+            }
+
+            Assert.AreEqual(10, cache1[1]);
+            Assert.AreEqual(20, cache2[1]);
+
+            // Rollback.
+            using (new TransactionScope())
+            {
+                cache1[1] = 100;
+                cache2[1] = 200;
+            }
+
+            Assert.AreEqual(10, cache1[1]);
+            Assert.AreEqual(20, cache2[1]);
+        }
+
+        /// <summary>
+        /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>

+        /// when Ignite tx is started manually.
+        /// </summary>
+        [Test]
+        public void TestTransactionScopeWithManualIgniteTx()
+        {
+            var cache = Cache();
+            var transactions = cache.Ignite.GetTransactions();
+
+            cache[1] = 1;
+
+            // When Ignite tx is started manually, it won't be enlisted in TransactionScope.
+            using (var tx = transactions.TxStart())            
+            {
+                using (new TransactionScope())
+                {
+                    cache[1] = 2;
+                }  // Revert transaction scope.
+
+                tx.Commit();  // Commit manual tx.
+            }
+
+            Assert.AreEqual(2, cache[1]);
+        }
+
+        /// <summary>
+        /// Test Ignite transaction with <see cref="TransactionScopeOption.Suppress"/>
option.
+        /// </summary>
+        [Test]
+        public void TestSuppressedTransactionScope()
+        {
+            var cache = Cache();
+
+            cache[1] = 1;
+
+            using (new TransactionScope(TransactionScopeOption.Suppress))
+            {
+                cache[1] = 2;
+            }
+
+            // Even though transaction is not completed, the value is updated, because tx
is suppressed.
+            Assert.AreEqual(2, cache[1]);
+        }
+
+        /// <summary>
+        /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>
with nested scopes.
+        /// </summary>
+        [Test]
+        public void TestNestedTransactionScope()
+        {
+            var cache = Cache();
+
+            cache[1] = 1;
+
+            foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew})
+            {
+                // Commit.
+                using (var ts1 = new TransactionScope())
+                {
+                    using (var ts2 = new TransactionScope(option))
+                    {
+                        cache[1] = 2;
+                        ts2.Complete();
+                    }
+
+                    cache[1] = 3;
+                    ts1.Complete();
+                }
+
+                Assert.AreEqual(3, cache[1]);
+
+                // Rollback.
+                using (new TransactionScope())
+                {
+                    using (new TransactionScope(option))
+                        cache[1] = 4;
+
+                    cache[1] = 5;
+                }
+                
+                // In case with Required option there is a single tx
+                // that gets aborted, second put executes outside the tx.
+                Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1],
option.ToString());
+            }
+        }
+
+        /// <summary>
+        /// Test that ambient <see cref="TransactionScope"/> options propagate to Ignite
transaction.
+        /// </summary>
+        [Test]
+        public void TestTransactionScopeOptions()
+        {
+            var cache = Cache();
+            var transactions = cache.Ignite.GetTransactions();
+
+            var modes = new[]
+            {
+                Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable),
+                Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead),
+                Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted),
+                Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted),
+                Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted),
+                Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted),
+            };
+
+            foreach (var mode in modes)
+            {
+                using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
+                {
+                    IsolationLevel = mode.Item1
+                }))
+                {
+                    cache[1] = 1;
+
+                    var tx = transactions.Tx;
+                    Assert.AreEqual(mode.Item2, tx.Isolation);
+                    Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tests all transactional operations with <see cref="TransactionScope"/>.
+        /// </summary>
+        [Test]
+        public void TestTransactionScopeAllOperations()
+        {
+            CheckTxOp((cache, key) => cache.Put(key, -5));
+            CheckTxOp((cache, key) => cache.PutAsync(key, -5).Wait());
+
+            CheckTxOp((cache, key) => cache.PutAll(new Dictionary<int, int> {{key,
-7}}));
+            CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary<int, int>
{{key, -7}}).Wait());
+
+            CheckTxOp((cache, key) =>
+            {
+                cache.Remove(key);
+                cache.PutIfAbsent(key, -10);
+            });
+            CheckTxOp((cache, key) =>
+            {
+                cache.Remove(key);
+                cache.PutIfAbsentAsync(key, -10);
+            });
+
+            CheckTxOp((cache, key) => cache.GetAndPut(key, -9));
+            CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9).Wait());
+
+            CheckTxOp((cache, key) =>
+            {
+                cache.Remove(key);
+                cache.GetAndPutIfAbsent(key, -10);
+            });
+            CheckTxOp((cache, key) =>
+            {
+                cache.Remove(key);
+                cache.GetAndPutIfAbsentAsync(key, -10).Wait();
+            });
+
+            CheckTxOp((cache, key) => cache.GetAndRemove(key));
+            CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key));
+
+            CheckTxOp((cache, key) => cache.GetAndReplace(key, -11));
+            CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11));
+
+            CheckTxOp((cache, key) => cache.Invoke(key, new AddProcessor(), 1));
+            CheckTxOp((cache, key) => cache.InvokeAsync(key, new AddProcessor(), 1));
+
+            CheckTxOp((cache, key) => cache.InvokeAll(new[] {key}, new AddProcessor(),
1));
+            CheckTxOp((cache, key) => cache.InvokeAllAsync(new[] {key}, new AddProcessor(),
1));
+
+            CheckTxOp((cache, key) => cache.Remove(key));
+            CheckTxOp((cache, key) => cache.RemoveAsync(key));
+
+            CheckTxOp((cache, key) => cache.RemoveAll(new[] { key }));
+            CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] { key }).Wait());
+
+            CheckTxOp((cache, key) => cache.Replace(key, 100));
+            CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100));
+
+            CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100));
+            CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100));
+        }
+
+        /// <summary>
+        /// Checks that cache operation behaves transactionally.
+        /// </summary>
+        private void CheckTxOp(Action<ICache<int, int>, int> act)
+        {
+            var cache = Cache();
+
+            cache[1] = 1;
+            cache[2] = 2;
+
+            // Rollback.
+            using (new TransactionScope())
+            {
+                act(cache, 1);
+
+                Assert.IsNotNull(cache.Ignite.GetTransactions().Tx, "Transaction has not
started.");
+            }
+
+            Assert.AreEqual(1, cache[1]);
+            Assert.AreEqual(2, cache[2]);
+
+            using (new TransactionScope())
+            {
+                act(cache, 1);
+                act(cache, 2);
+            }
+
+            Assert.AreEqual(1, cache[1]);
+            Assert.AreEqual(2, cache[2]);
+
+            // Commit.
+            using (var ts = new TransactionScope())
+            {
+                act(cache, 1);
+                ts.Complete();
+            }
+
+            Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+            Assert.AreEqual(2, cache[2]);
+
+            using (var ts = new TransactionScope())
+            {
+                act(cache, 1);
+                act(cache, 2);
+                ts.Complete();
+            }
+
+            Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+            Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2);
+        }
+
+        [Serializable]
+        private class AddProcessor : ICacheEntryProcessor<int, int, int, int>
+        {
+            public int Process(IMutableCacheEntry<int, int> entry, int arg)
+            {
+                entry.Value += arg;
+                return arg;
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index d58c872..673449e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -87,6 +87,7 @@
     <Reference Include="System" />
     <Reference Include="System.configuration" />
     <Reference Include="System.Core" />
+    <Reference Include="System.Transactions" />
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
@@ -189,6 +190,7 @@
     <Compile Include="Impl\Binary\SerializableSerializer.cs" />
     <Compile Include="Impl\Binary\BinaryWriterExtensions.cs" />
     <Compile Include="Impl\Cache\Affinity\AffinityFunctionBase.cs" />
+    <Compile Include="Impl\Transactions\CacheTransactionManager.cs" />
     <Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" />
     <Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" />
     <Compile Include="Impl\Cache\ICacheLockInternal.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index 2a0ec86..50938e1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -656,14 +656,14 @@ namespace Apache.Ignite.Core.Cache
         /// <summary>
         /// Removes all mappings from cache.
         /// If write-through is enabled, the value will be removed from store.
-        /// This method is transactional and will enlist the entry into ongoing transaction
if there is one.
+        /// This method is not transactional.
         /// </summary>
         void RemoveAll();
 
         /// <summary>
         /// Removes all mappings from cache.
         /// If write-through is enabled, the value will be removed from store.
-        /// This method is transactional and will enlist the entry into ongoing transaction
if there is one.
+        /// This method is not transactional.
         /// </summary>
         Task RemoveAllAsync();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index b8dc6cb..a387e1b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl.Cache
     using System;
     using System.Collections;
     using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
@@ -36,6 +37,7 @@ namespace Apache.Ignite.Core.Impl.Cache
     using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
     using Apache.Ignite.Core.Impl.Cluster;
     using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Transactions;
     using Apache.Ignite.Core.Impl.Unmanaged;
 
     /// <summary>
@@ -56,6 +58,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** Flag: no-retries.*/
         private readonly bool _flagNoRetries;
 
+        /** Transaction manager. */
+        private readonly CacheTransactionManager _txManager;
+
         /// <summary>
         /// Constructor.
         /// </summary>
@@ -68,10 +73,16 @@ namespace Apache.Ignite.Core.Impl.Cache
         public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh,
             bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries) : base(target, marsh)
         {
+            Debug.Assert(grid != null);
+
             _ignite = grid;
             _flagSkipStore = flagSkipStore;
             _flagKeepBinary = flagKeepBinary;
             _flagNoRetries = flagNoRetries;
+
+            _txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional
+                ? new CacheTransactionManager(grid.GetTransactions())
+                : null;
         }
 
         /** <inheritDoc /> */
@@ -416,9 +427,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public void Put(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             DoOutOp(CacheOp.Put, key, val);
         }
 
@@ -428,6 +440,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.PutAsync, key, val);
         }
 
@@ -437,6 +451,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
         }
 
@@ -446,6 +462,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.GetAndPutAsync, w =>
             {
                 w.WriteObject(key);
@@ -457,9 +475,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public CacheResult<TV> GetAndReplace(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutInOpNullable(CacheOp.GetAndReplace, key, val);
         }
 
@@ -469,6 +488,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w =>
             {
                 w.WriteObject(key);
@@ -481,6 +502,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
+            StartTx();
+
             return DoOutInOpNullable(CacheOp.GetAndRemove, key);
         }
 
@@ -489,6 +512,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r
=> GetCacheResult(r));
         }
 
@@ -496,9 +521,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public bool PutIfAbsent(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOp(CacheOp.PutIfAbsent, key, val);
         }
 
@@ -508,6 +534,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val);
         }
 
@@ -515,9 +543,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val);
         }
 
@@ -527,6 +556,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w =>
             {
                 w.WriteObject(key);
@@ -538,9 +569,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public bool Replace(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOp(CacheOp.Replace2, key, val);
         }
 
@@ -550,6 +582,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val);
         }
 
@@ -557,11 +591,11 @@ namespace Apache.Ignite.Core.Impl.Cache
         public bool Replace(TK key, TV oldVal, TV newVal)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(oldVal, "oldVal");
-
             IgniteArgumentCheck.NotNull(newVal, "newVal");
 
+            StartTx();
+
             return DoOutOp(CacheOp.Replace3, key, oldVal, newVal);
         }
 
@@ -572,6 +606,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(oldVal, "oldVal");
             IgniteArgumentCheck.NotNull(newVal, "newVal");
 
+            StartTx();
+
             return DoOutOpAsync<bool>(CacheOp.Replace3Async, w =>
             {
                 w.WriteObject(key);
@@ -585,6 +621,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(vals, "vals");
 
+            StartTx();
+
             DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals));
         }
 
@@ -593,6 +631,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(vals, "vals");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer,
vals));
         }
 
@@ -669,6 +709,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
+            StartTx();
+
             return DoOutOp(CacheOp.RemoveObj, key);
         }
 
@@ -677,6 +719,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
+            StartTx();
+
             return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key);
         }
 
@@ -686,6 +730,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOp(CacheOp.RemoveBool, key, val);
         }
 
@@ -695,6 +741,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
+            StartTx();
+
             return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val);
         }
 
@@ -703,6 +751,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
 
+            StartTx();
+
             DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
         }
 
@@ -711,18 +761,24 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
 
+            StartTx();
+
             return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer,
keys));
         }
 
         /** <inheritDoc /> */
         public void RemoveAll()
         {
+            StartTx();
+
             DoOutInOp((int) CacheOp.RemoveAll2);
         }
 
         /** <inheritDoc /> */
         public Task RemoveAllAsync()
         {
+            StartTx();
+
             return DoOutOpAsync(CacheOp.RemoveAll2Async);
         }
 
@@ -773,9 +829,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg,
TRes> processor, TArg arg)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(processor, "processor");
 
+            StartTx();
+
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a),
typeof(TK), typeof(TV));
 
@@ -795,6 +852,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(processor, "processor");
 
+            StartTx();
+
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a),
typeof(TK), typeof(TV));
 
@@ -822,9 +881,10 @@ namespace Apache.Ignite.Core.Impl.Cache
             ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
-
             IgniteArgumentCheck.NotNull(processor, "processor");
 
+            StartTx();
+
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a),
typeof(TK), typeof(TV));
 
@@ -842,9 +902,10 @@ namespace Apache.Ignite.Core.Impl.Cache
             ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
-
             IgniteArgumentCheck.NotNull(processor, "processor");
 
+            StartTx();
+
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a),
typeof(TK), typeof(TV));
 
@@ -1308,5 +1369,14 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             DoOutInOp((int) CacheOp.CloseLock, id);
         }
+
+        /// <summary>
+        /// Starts a transaction when applicable.
+        /// </summary>
+        private void StartTx()
+        {
+            if (_txManager != null)
+                _txManager.StartTx();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
new file mode 100644
index 0000000..f5a1617
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Transactions
+{
+    using System;
+    using System.Diagnostics;
+    using System.Threading;
+    using System.Transactions;
+    using Apache.Ignite.Core.Transactions;
+
+    /// <summary>
+    /// Cache transaction enlistment manager, 
+    /// allows using Ignite transactions via standard <see cref="TransactionScope"/>.
+    /// </summary>
+    internal class CacheTransactionManager : IEnlistmentNotification
+    {
+        /** */
+        private readonly ITransactions _transactions;
+
+        /** */
+        private static readonly ThreadLocal<Enlistment> Enlistment = new ThreadLocal<Enlistment>();
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="CacheTransactionManager"/> class.
+        /// </summary>
+        /// <param name="transactions">Transactions.</param>
+        public CacheTransactionManager(ITransactions transactions)
+        {
+            Debug.Assert(transactions != null);
+
+            _transactions = transactions;
+        }
+
+        /// <summary>
+        /// If ambient transaction is present, starts an Ignite transaction and enlists it.
+        /// </summary>
+        public void StartTx()
+        {
+            if (_transactions.Tx != null)
+            {
+                // Ignite transaction is already present.
+                // We have either enlisted it already, or it has been started manually and
should not be enlisted.
+                // Java enlists existing Ignite tx in this case (see CacheJtaManager.java),
but we do not.
+                return;
+            }
+
+            if (Enlistment.Value != null)
+            {
+                // We are already enlisted.
+                // .NET transaction mechanism allows nested transactions,
+                // and they can be processed differently depending on TransactionScopeOption.
+                // Ignite, however, allows only one active transaction per thread.
+                // Therefore we enlist only once on the first transaction that we encounter.
+                return;
+            }
+
+            var ambientTx = System.Transactions.Transaction.Current;
+
+            if (ambientTx != null && ambientTx.TransactionInformation.Status == TransactionStatus.Active)
+            {
+                _transactions.TxStart(_transactions.DefaultTransactionConcurrency, 
+                    ConvertTransactionIsolation(ambientTx.IsolationLevel));
+
+                Enlistment.Value = ambientTx.EnlistVolatile(this, EnlistmentOptions.None);
+            }
+        }
+
+        /** <inheritdoc /> */
+        void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
+        {
+            var igniteTx = _transactions.Tx;
+
+            if (igniteTx != null && Enlistment.Value != null)
+            {
+                ((Transaction) igniteTx).Prepare();
+            }
+
+            preparingEnlistment.Prepared();
+        }
+
+        /** <inheritdoc /> */
+        void IEnlistmentNotification.Commit(Enlistment enlistment)
+        {
+            var igniteTx = _transactions.Tx;
+
+            if (igniteTx != null && Enlistment.Value != null)
+            {
+                Debug.Assert(ReferenceEquals(enlistment, Enlistment.Value));
+
+                igniteTx.Commit();
+
+                igniteTx.Dispose();
+
+                Enlistment.Value = null;
+            }
+
+            enlistment.Done();
+        }
+
+        /** <inheritdoc /> */
+        void IEnlistmentNotification.Rollback(Enlistment enlistment)
+        {
+            var igniteTx = _transactions.Tx;
+
+            if (igniteTx != null && Enlistment.Value != null)
+            {
+                igniteTx.Rollback();
+
+                igniteTx.Dispose();
+
+                Enlistment.Value = null;
+            }
+
+            enlistment.Done();
+        }
+
+        /** <inheritdoc /> */
+        void IEnlistmentNotification.InDoubt(Enlistment enlistment)
+        {
+            enlistment.Done();
+        }
+
+        /// <summary>
+        /// Converts the isolation level from .NET-specific to Ignite-specific.
+        /// </summary>
+        private static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation)
+        {
+            switch (isolation)
+            {
+                case IsolationLevel.Serializable:
+                    return TransactionIsolation.Serializable;
+                case IsolationLevel.RepeatableRead:
+                    return TransactionIsolation.RepeatableRead;
+                case IsolationLevel.ReadCommitted:
+                case IsolationLevel.ReadUncommitted:
+                case IsolationLevel.Snapshot:
+                case IsolationLevel.Chaos:
+                    return TransactionIsolation.ReadCommitted;
+                default:
+                    throw new ArgumentOutOfRangeException("isolation", isolation, 
+                        "Unsupported transaction isolation level: " + isolation);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
index 595300c..f700bfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
@@ -142,5 +142,13 @@ namespace Apache.Ignite.Core.Impl.Transactions
         {
             return _tx.RemoveMeta<TV>(name);
         }
+
+        /// <summary>
+        /// Executes prepare step of the two phase commit.
+        /// </summary>
+        public void Prepare()
+        {
+            _tx.Prepare();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
index d32cd3d..0b04a68 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
@@ -112,6 +112,19 @@ namespace Apache.Ignite.Core.Impl.Transactions
         }
 
         /// <summary>
+        /// Executes prepare step of the two phase commit.
+        /// </summary>
+        public void Prepare()
+        {
+            lock (this)
+            {
+                ThrowIfClosed();
+
+                _txs.TxPrepare(this);
+            }
+        }
+
+        /// <summary>
         /// Commits this tx and closes it.
         /// </summary>
         public void Commit()

http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
index 5fa5db8..ff5d6a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Impl.Binary;
+    using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Unmanaged;
     using Apache.Ignite.Core.Transactions;
 
@@ -63,6 +64,8 @@ namespace Apache.Ignite.Core.Impl.Transactions
         /** */
         private const int OpResetMetrics = 11;
 
+        /** */
+        private const int OpPrepare = 12;
 
         /** */
         private readonly TransactionConcurrency _dfltConcurrency;
@@ -177,6 +180,14 @@ namespace Apache.Ignite.Core.Impl.Transactions
         }
 
         /// <summary>
+        /// Executes prepare step of the two phase commit.
+        /// </summary>
+        internal void TxPrepare(TransactionImpl tx)
+        {
+            DoOutInOp(OpPrepare, tx.Id);
+        }
+
+        /// <summary>
         /// Commit transaction.
         /// </summary>
         /// <param name="tx">Transaction.</param>


Mime
View raw message