ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [14/16] ignite git commit: Internal cache API cleanup.
Date Fri, 17 Mar 2017 14:50:11 GMT
Internal cache API cleanup.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/decb0c7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/decb0c7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/decb0c7a

Branch: refs/heads/ignite-4565-ddl
Commit: decb0c7aa62f9354b25ee0a09ca19b424a688e8b
Parents: 82f016f
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 16 18:25:36 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Mar 16 18:25:36 2017 +0300

----------------------------------------------------------------------
 .../ClientAbstractMultiNodeSelfTest.java        |   13 +-
 .../ignite/internal/IgniteTransactionsEx.java   |    8 +-
 .../processors/cache/GridCacheAdapter.java      |   98 +-
 .../processors/cache/GridCacheProxyImpl.java    |    6 +-
 .../cache/GridCacheSharedContext.java           |   11 +-
 .../processors/cache/GridCacheUtils.java        |    6 +-
 .../processors/cache/IgniteInternalCache.java   |    5 +-
 .../distributed/GridCacheCommittedTxInfo.java   |  117 -
 .../GridDistributedCacheAdapter.java            |    2 +-
 .../GridDistributedTxRemoteAdapter.java         |   59 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   57 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  126 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   28 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   65 +-
 .../dht/colocated/GridDhtColocatedCache.java    |    8 +-
 .../colocated/GridDhtColocatedLockFuture.java   |    7 +-
 .../distributed/near/GridNearLockFuture.java    |    4 +-
 .../distributed/near/GridNearLockRequest.java   |  200 +-
 .../near/GridNearTransactionalCache.java        |    6 +-
 .../near/GridNearTxFinishFuture.java            |    4 +-
 .../cache/distributed/near/GridNearTxLocal.java | 2732 ++++++++++++++++-
 .../near/GridNearTxPrepareFutureAdapter.java    |    5 +-
 .../near/GridNearTxPrepareRequest.java          |    2 +-
 .../distributed/near/GridNearTxRemote.java      |    4 +-
 .../store/GridCacheStoreManagerAdapter.java     |  142 +-
 .../cache/transactions/IgniteInternalTx.java    |   80 +-
 .../transactions/IgniteTransactionsImpl.java    |   12 +-
 .../cache/transactions/IgniteTxAdapter.java     |  165 +-
 .../cache/transactions/IgniteTxEntry.java       |    4 +-
 .../cache/transactions/IgniteTxHandler.java     |   67 +-
 .../IgniteTxImplicitSingleStateImpl.java        |    4 +-
 .../transactions/IgniteTxLocalAdapter.java      | 2801 ++----------------
 .../cache/transactions/IgniteTxLocalEx.java     |  145 +-
 .../cache/transactions/IgniteTxManager.java     |  293 +-
 .../cache/transactions/IgniteTxRemoteEx.java    |   11 +
 .../IgniteTxRemoteStateAdapter.java             |    2 +-
 .../cache/transactions/IgniteTxState.java       |    2 +-
 .../cache/transactions/IgniteTxStateImpl.java   |    4 +-
 .../transactions/TransactionProxyImpl.java      |   13 +-
 .../datastructures/DataStructuresProcessor.java |   32 +-
 .../datastructures/GridCacheAtomicLongImpl.java |   18 +-
 .../GridCacheAtomicReferenceImpl.java           |    6 +-
 .../GridCacheAtomicSequenceImpl.java            |    4 +-
 .../GridCacheAtomicStampedImpl.java             |    6 +-
 .../GridCacheCountDownLatchImpl.java            |    6 +-
 .../datastructures/GridCacheLockImpl.java       |   11 +-
 .../datastructures/GridCacheSemaphoreImpl.java  |   18 +-
 .../GridTransactionalCacheQueueImpl.java        |   10 +-
 .../processors/igfs/IgfsDataManager.java        |   55 +-
 .../processors/igfs/IgfsMetaManager.java        |   73 +-
 .../service/GridServiceProcessor.java           |    6 +-
 .../internal/TestRecordingCommunicationSpi.java |   29 +
 .../processors/cache/GridCacheTestStore.java    |    2 -
 .../cache/IgniteTxConfigCacheSelfTest.java      |    4 +-
 .../IgniteCacheSystemTransactionsSelfTest.java  |    7 +-
 .../IgniteTxCachePrimarySyncTest.java           |    5 +
 ...xOriginatingNodeFailureAbstractSelfTest.java |    6 +-
 ...cOriginatingNodeFailureAbstractSelfTest.java |    4 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java |    9 +-
 .../dht/IgniteCacheTxRecoveryRollbackTest.java  |  501 ++++
 .../GridCachePartitionedTxSalvageSelfTest.java  |    7 +-
 .../TxOptimisticDeadlockDetectionTest.java      |   30 +-
 ...lockMessageSystemPoolStarvationSelfTest.java |    6 +-
 .../IgniteCacheRestartTestSuite2.java           |    3 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |    3 +
 .../HibernateReadWriteAccessStrategy.java       |   12 +-
 .../processors/cache/jta/CacheJtaManager.java   |    3 +-
 .../processors/cache/jta/CacheJtaResource.java  |    8 +-
 68 files changed, 4148 insertions(+), 4054 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
index 7fb2385..2fba49a 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -480,11 +481,11 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
     @SuppressWarnings("unchecked")
     private static class TestCommunicationSpi extends TcpCommunicationSpi {
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             checkSyncFlags((GridIoMessage)msg);
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -512,13 +513,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
             IgniteInternalTx t = tm.tx(v);
 
             if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1"))))
-                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2"))))
-                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3"))))
-                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
             else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4"))))
-                assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+                assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
index 9772dcc..4133ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 
@@ -35,7 +35,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
      * @param txSize Number of entries participating in transaction (may be approximate).
      * @return New transaction.
      */
-    public IgniteInternalTx txStartEx(GridCacheContext ctx,
+    public GridNearTxLocal txStartEx(GridCacheContext ctx,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
         long timeout,
@@ -47,5 +47,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
      * @param isolation Isolation.
      * @return New transaction.
      */
-    public IgniteInternalTx txStartEx(GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation);
+    public GridNearTxLocal txStartEx(GridCacheContext ctx,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 71be718..3bfd1f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,18 +86,16 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
 import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -1876,7 +1874,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
 
-        IgniteTxLocalAdapter tx = null;
+        GridNearTxLocal tx = null;
 
         if (checkTx) {
             try {
@@ -2132,7 +2130,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
         else {
             return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
-                @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                     return tx.getAllAsync(ctx,
                         readyTopVer,
                         keys,
@@ -2187,7 +2185,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter)
         throws IgniteCheckedException {
         return syncOp(new SyncOp<V>(true) {
-            @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
                 return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value();
             }
 
@@ -2237,7 +2235,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         @Nullable final CacheEntryPredicate filter)
     {
         return asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
                     .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
@@ -2293,7 +2291,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     protected boolean put0(final K key, final V val, final CacheEntryPredicate filter)
         throws IgniteCheckedException {
         Boolean res = syncOp(new SyncOp<Boolean>(true) {
-            @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
                 return tx.putAsync(ctx, null, key, val, false, filter).get().success();
             }
 
@@ -2316,7 +2314,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         syncOp(new SyncInOp(drMap.size() == 1) {
-            @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
                 tx.putAllDrAsync(ctx, drMap).get();
             }
 
@@ -2335,7 +2333,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         return asyncOp(new AsyncOp(drMap.keySet()) {
-            @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.putAllDrAsync(ctx, drMap);
             }
 
@@ -2380,7 +2378,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
-            @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+            @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 assert topVer == null || tx.implicit();
 
@@ -2418,7 +2416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKeys(keys);
 
         return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
-            @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+            @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {
@@ -2448,7 +2446,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
-            @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
                     Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
 
@@ -2491,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKeys(keys);
 
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
-            @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx,
+            @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx,
                 AffinityTopologyVersion readyTopVer) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
                     @Override public EntryProcessor apply(K k) {
@@ -2532,7 +2530,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKeys(map.keySet());
 
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
-            @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.invokeAsync(ctx,
                     readyTopVer,
                     (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map,
@@ -2568,7 +2566,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKeys(map.keySet());
 
         return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
-            @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+            @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 IgniteInternalFuture<GridCacheReturn> fut =
                     tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
@@ -2616,7 +2614,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
         @Nullable final CacheEntryPredicate filter) {
         return asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.putAsync(ctx,
                     readyTopVer,
                     key,
@@ -2721,7 +2719,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException {
         syncOp(new SyncInOp(m.size() == 1) {
-            @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
                 tx.putAllAsync(ctx, null, m, false).get();
             }
 
@@ -2748,7 +2746,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) {
         return asyncOp(new AsyncOp(m.keySet()) {
-            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.putAllAsync(ctx,
                     readyTopVer,
                     m,
@@ -2789,7 +2787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean keepBinary = ctx.keepBinary();
 
         return syncOp(new SyncOp<V>(true) {
-            @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
                 K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key;
 
                 V ret = tx.removeAllAsync(ctx,
@@ -2839,7 +2837,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) {
         return asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 // TODO should we invoke interceptor here?
                 return tx.removeAllAsync(ctx,
                     readyTopVer,
@@ -2897,7 +2895,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException {
         syncOp(new SyncInOp(keys.size() == 1) {
-            @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
                 tx.removeAllAsync(ctx,
                     null,
                     keys,
@@ -2938,7 +2936,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) {
         return asyncOp(new AsyncOp(keys) {
-            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.removeAllAsync(ctx,
                     readyTopVer,
                     keys,
@@ -2990,7 +2988,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
         Boolean res = syncOp(new SyncOp<Boolean>(true) {
-            @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+            @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
                 return tx.removeAllAsync(ctx,
                     null,
                     Collections.singletonList(key),
@@ -3046,7 +3044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) {
         return asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+            @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                 return tx.removeAllAsync(ctx,
                     readyTopVer,
                     Collections.singletonList(key),
@@ -3071,8 +3069,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         syncOp(new SyncInOp(false) {
-            @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                tx.removeAllDrAsync(ctx, (Map)drMap).get();
+            @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
+                tx.removeAllDrAsync(ctx, drMap).get();
             }
 
             @Override public String toString() {
@@ -3090,8 +3088,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
         return asyncOp(new AsyncOp(drMap.keySet()) {
-            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
-                return tx.removeAllDrAsync(ctx, (Map)drMap);
+            @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
+                return tx.removeAllDrAsync(ctx, drMap);
             }
 
             @Override public String toString() {
@@ -3160,10 +3158,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Transaction tx() {
-        IgniteTxAdapter tx = ctx.tm().threadLocalTx(ctx);
-
-        return tx == null ? null : new TransactionProxyImpl<>(tx, ctx.shared(), false);
+    @Nullable @Override public GridNearTxLocal tx() {
+        return ctx.tm().threadLocalTx(ctx);
     }
 
     /** {@inheritDoc} */
@@ -3291,7 +3287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+    @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
         IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions();
 
         return txs.txStartEx(ctx, concurrency, isolation);
@@ -4142,7 +4138,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Transaction commit future.
      */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final IgniteInternalTx tx) {
+    IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) {
         FutureHolder holder = lastFut.get();
 
         holder.lock();
@@ -4154,7 +4150,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
                     new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
-                            return tx.commitAsync();
+                            return tx.commitNearTxLocalAsync();
                         }
                     });
 
@@ -4163,7 +4159,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return f;
             }
 
-            IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync();
+            IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
 
             saveFuture(holder, f);
 
@@ -4208,7 +4204,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         awaitLastFut();
 
-        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx == null || tx.implicit()) {
             TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());
@@ -4304,7 +4300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (log.isDebugEnabled())
             log.debug("Performing async op: " + op);
 
-        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -4348,7 +4344,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     @SuppressWarnings("unchecked")
     protected <T> IgniteInternalFuture<T> asyncOp(
-        IgniteTxLocalAdapter tx,
+        GridNearTxLocal tx,
         final AsyncOp<T> op,
         final CacheOperationContext opCtx
     ) {
@@ -4364,7 +4360,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         try {
             IgniteInternalFuture fut = holder.future();
 
-            final IgniteTxLocalAdapter tx0 = tx;
+            final GridNearTxLocal tx0 = tx;
 
             if (fut != null && !fut.isDone()) {
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
@@ -4383,7 +4379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                         throw e;
                                     }
                                     catch (IgniteCheckedException e1) {
-                                        tx0.rollbackAsync();
+                                        tx0.rollbackNearTxLocalAsync();
 
                                         throw e1;
                                     }
@@ -4409,7 +4405,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         throw e;
                     }
                     catch (IgniteCheckedException e1) {
-                        tx0.rollbackAsync();
+                        tx0.rollbackNearTxLocalAsync();
 
                         throw e1;
                     }
@@ -4925,7 +4921,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         private int retries;
 
         /** */
-        private IgniteTxLocalAdapter tx;
+        private GridNearTxLocal tx;
 
         /** */
         private CacheOperationContext opCtx;
@@ -5173,7 +5169,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          * @return Operation return value.
          * @throws IgniteCheckedException If failed.
          */
-        @Nullable public abstract T op(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+        @Nullable public abstract T op(GridNearTxLocal tx) throws IgniteCheckedException;
     }
 
     /**
@@ -5188,7 +5184,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public final Object op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+        @Nullable @Override public final Object op(GridNearTxLocal tx) throws IgniteCheckedException {
             inOp(tx);
 
             return null;
@@ -5198,7 +5194,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          * @param tx Transaction.
          * @throws IgniteCheckedException If failed.
          */
-        public abstract void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+        public abstract void inOp(GridNearTxLocal tx) throws IgniteCheckedException;
     }
 
     /**
@@ -5234,14 +5230,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          * @param readyTopVer Ready topology version.
          * @return Operation future.
          */
-        public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer);
+        public abstract IgniteInternalFuture<T> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer);
 
         /**
          * @param tx Transaction.
          * @param opCtx Operation context.
          * @return Operation future.
          */
-        public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) {
+        public IgniteInternalFuture<T> op(final GridNearTxLocal tx, CacheOperationContext opCtx) {
             AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
 
             if (txTopVer != null)
@@ -5267,7 +5263,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          */
         private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
             final AffinityTopologyVersion topVer,
-            final IgniteTxLocalAdapter tx,
+            final GridNearTxLocal tx,
             final CacheOperationContext opCtx) {
             final GridFutureAdapter fut0 = new GridFutureAdapter();
 
@@ -5304,7 +5300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
          * @param opCtx Operation context.
          * @return Future.
          */
-        private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx,
+        private IgniteInternalFuture<T> runOp(GridNearTxLocal tx,
             AffinityTopologyVersion topVer,
             CacheOperationContext opCtx) {
             ctx.operationContextPerCall(opCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 00898ec..787a767 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -40,8 +40,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -939,7 +939,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+    @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -977,7 +977,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Override public Transaction tx() {
+    @Override public GridNearTxLocal tx() {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0f79100..39a3baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -731,7 +732,7 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to close.
      * @throws IgniteCheckedException If failed.
      */
-    public void endTx(IgniteInternalTx tx) throws IgniteCheckedException {
+    public void endTx(GridNearTxLocal tx) throws IgniteCheckedException {
         tx.txState().awaitLastFut(this);
 
         tx.close();
@@ -742,13 +743,13 @@ public class GridCacheSharedContext<K, V> {
      * @return Commit future.
      */
     @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) {
+    public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) {
         GridCacheContext ctx = tx.txState().singleCacheContext(this);
 
         if (ctx == null) {
             tx.txState().awaitLastFut(this);
 
-            return tx.commitAsync();
+            return tx.commitNearTxLocalAsync();
         }
         else
             return ctx.cache().commitTxAsync(tx);
@@ -759,10 +760,10 @@ public class GridCacheSharedContext<K, V> {
      * @throws IgniteCheckedException If failed.
      * @return Rollback future.
      */
-    public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
+    public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
         tx.txState().awaitLastFut(this);
 
-        return tx.rollbackAsync();
+        return tx.rollbackNearTxLocalAsync();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 3e68b70..7131612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -97,7 +98,6 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -891,7 +891,7 @@ public class GridCacheUtils {
      * @param isolation Isolation.
      * @return New transaction.
      */
-    public static IgniteInternalTx txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
+    public static GridNearTxLocal txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
         TransactionConcurrency concurrency, TransactionIsolation isolation) {
         assert ctx != null;
         assert prj != null;
@@ -1257,7 +1257,7 @@ public class GridCacheUtils {
     public static <K, V> void inTx(IgniteInternalCache<K, V> cache, TransactionConcurrency concurrency,
         TransactionIsolation isolation, IgniteInClosureX<IgniteInternalCache<K ,V>> clo) throws IgniteCheckedException {
 
-        try (IgniteInternalTx tx = cache.txStartEx(concurrency, isolation);) {
+        try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation);) {
             clo.applyx(cache);
 
             tx.commit();

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 0ac98fb..5471335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -952,7 +953,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      * @param isolation Isolation.
      * @return New transaction.
      */
-    public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
+    public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
 
     /**
      * Starts transaction with specified isolation, concurrency, timeout, invalidation flag,
@@ -976,7 +977,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      * @return Transaction started by this thread or {@code null} if this thread
      *      does not have a transaction.
      */
-    @Nullable public Transaction tx();
+    @Nullable public GridNearTxLocal tx();
 
     /**
      * Evicts entry associated with given key from cache. Note, that entry will be evicted

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
deleted file mode 100644
index 875ada0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Committed transaction information. Contains recovery writes that will be used to set commit values
- * in case if originating node crashes.
- */
-@Deprecated
-public class GridCacheCommittedTxInfo implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Originating transaction ID. */
-    private GridCacheVersion originatingTxId;
-
-    /** Originating node ID. */
-    private UUID originatingNodeId;
-
-    /** Recovery writes, i.e. values that have never been sent to remote nodes. */
-    @GridToStringInclude
-    private Collection<IgniteTxEntry> recoveryWrites;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridCacheCommittedTxInfo() {
-        // No-op.
-    }
-
-    /**
-     * @param tx Committed cache transaction.
-     */
-    public GridCacheCommittedTxInfo(IgniteInternalTx tx) {
-        assert !tx.local() || !tx.replicated();
-
-        originatingTxId = tx.nearXidVersion();
-        originatingNodeId = tx.eventNodeId();
-    }
-
-    /**
-     * @return Originating transaction ID (the transaction ID for replicated cache and near transaction ID
-     *      for partitioned cache).
-     */
-    public GridCacheVersion originatingTxId() {
-        return originatingTxId;
-    }
-
-    /**
-     * @return Originating node ID (the local transaction node ID for replicated cache and near node ID
-     *      for partitioned cache).
-     */
-    public UUID originatingNodeId() {
-        return originatingNodeId;
-    }
-
-    /**
-     * @return Collection of recovery writes.
-     */
-    public Collection<IgniteTxEntry> recoveryWrites() {
-        return recoveryWrites == null ? Collections.<IgniteTxEntry>emptyList() : recoveryWrites;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        originatingTxId.writeExternal(out);
-
-        U.writeUuid(out, originatingNodeId);
-
-        U.writeCollection(out, recoveryWrites);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        originatingTxId = new GridCacheVersion();
-
-        originatingTxId.readExternal(in);
-
-        originatingNodeId = U.readUuid(in);
-
-        recoveryWrites = U.readCollection(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheCommittedTxInfo.class, this, "recoveryWrites", recoveryWrites);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 7e4deff..00bc6d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -111,7 +111,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout) {
-        IgniteTxLocalEx tx = ctx.tm().userTxx();
+        IgniteTxLocalEx tx = ctx.tm().userTx();
 
         // Return value flag is true because we choose to bring values for explicit locks.
         return lockAllAsync(ctx.cacheKeysView(keys),

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 68c0e57..b31a7be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -83,7 +83,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 /**
  * Transaction created by system implicitly on remote nodes.
  */
-public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
+public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     implements IgniteTxRemoteEx {
     /** */
     private static final long serialVersionUID = 0L;
@@ -180,11 +180,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<UUID> masterNodeIds() {
-        return Collections.singleton(nodeId);
-    }
-
-    /** {@inheritDoc} */
     @Override public UUID originatingNodeId() {
         return nodeId;
     }
@@ -347,12 +342,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
-        assert false;
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public Set<IgniteTxKey> readSet() {
         return txState.readSet();
     }
@@ -378,11 +367,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /**
-     * Prepare phase.
-     *
-     * @throws IgniteCheckedException If prepare failed.
+     * @throws IgniteCheckedException If failed.
      */
-    @Override public void prepare() throws IgniteCheckedException {
+    public final void prepareRemoteTx() throws IgniteCheckedException {
         // If another thread is doing prepare or rollback.
         if (!state(PREPARING)) {
             // In optimistic mode prepare may be called multiple times.
@@ -729,7 +716,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public void commit() throws IgniteCheckedException {
+    @Override public final void commitRemoteTx() throws IgniteCheckedException {
         if (optimistic())
             state(PREPARED);
 
@@ -748,7 +735,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             if (!isSystemInvalidate())
                 throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
 
-            rollback();
+            rollbackRemoteTx();
         }
 
         commitIfLocked();
@@ -766,7 +753,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
         try {
-            commit();
+            commitRemoteTx();
 
             return new GridFinishedFuture<IgniteInternalTx>(this);
         }
@@ -776,8 +763,36 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"CatchGenericClass"})
-    @Override public void rollback() {
+    @Override public final IgniteInternalFuture<?> salvageTx() {
+        try {
+            systemInvalidate(true);
+
+            prepareRemoteTx();
+
+            if (state() == PREPARING) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+                        "by another thread: " + this);
+
+                return null;
+            }
+
+            doneRemote(xidVersion(),
+                Collections.<GridCacheVersion>emptyList(),
+                Collections.<GridCacheVersion>emptyList(),
+                Collections.<GridCacheVersion>emptyList());
+
+            commitRemoteTx();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to invalidate transaction: " + xidVersion(), e);
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void rollbackRemoteTx() {
         try {
             // Note that we don't evict near entries here -
             // they will be deleted by their corresponding transactions.
@@ -796,7 +811,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
-        rollback();
+        rollbackRemoteTx();
 
         return new GridFinishedFuture<IgniteInternalTx>(this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dea4072..1e09eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -178,7 +178,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @throws GridDistributedLockCancelledException If lock has been cancelled.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable GridDhtTxRemote startRemoteTx(UUID nodeId,
+    @Nullable private GridDhtTxRemote startRemoteTx(UUID nodeId,
         GridDhtLockRequest req,
         GridDhtLockResponse res)
         throws IgniteCheckedException, GridDistributedLockCancelledException {
@@ -307,7 +307,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 if (tx.state() == COMMITTING)
                                     tx.forceCommit();
                                 else
-                                    tx.rollback();
+                                    tx.rollbackRemoteTx();
                             }
 
                             return null;
@@ -362,7 +362,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             if (log.isDebugEnabled())
                 log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']');
 
-            tx.rollback();
+            tx.rollbackRemoteTx();
 
             tx = null;
         }
@@ -374,7 +374,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
+    private void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
         if (txLockMsgLog.isDebugEnabled()) {
             txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() +
                 ", dhtTxId=" + req.version() +
@@ -452,7 +452,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
+    private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
         assert nodeId != null;
         assert req != null;
         assert !nodeId.equals(locNodeId);
@@ -561,10 +561,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
         if (fail) {
             if (dhtTx != null)
-                dhtTx.rollback();
+                dhtTx.rollbackRemoteTx();
 
             if (nearTx != null) // Even though this should never happen, we leave this check for consistency.
-                nearTx.rollback();
+                nearTx.rollbackRemoteTx();
 
             List<KeyCacheObject> keys = req.keys();
 
@@ -602,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
+    private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
         clearLocks(nodeId, req);
 
         if (isNearEnabled(cacheCfg))
@@ -961,8 +961,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             req.futureId(),
                             req.miniId(),
                             req.threadId(),
-                            req.implicitTx(),
-                            req.implicitSingleTx(),
+                            /*implicitTx*/false,
+                            /*implicitSingleTx*/false,
                             ctx.systemTx(),
                             false,
                             ctx.ioPolicy(),
@@ -989,7 +989,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             U.warn(log, msg);
 
                             if (tx != null)
-                                tx.rollback();
+                                tx.rollbackDhtLocal();
 
                             return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
                         }
@@ -1038,31 +1038,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                 t.xidVersion(),
                                 e);
 
-                            if (resp.error() == null && t.onePhaseCommit()) {
-                                assert t.implicit();
-
-                                return t.commitAsync().chain(
-                                    new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
-                                        @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
-                                            try {
-                                                // Check for error.
-                                                f.get();
-                                            }
-                                            catch (IgniteCheckedException e1) {
-                                                resp.error(e1);
-                                            }
-
-                                            sendLockReply(nearNode, t, req, resp);
-
-                                            return resp;
-                                        }
-                                    });
-                            }
-                            else {
-                                sendLockReply(nearNode, t, req, resp);
+                            assert !t.implicit() : t;
+                            assert !t.onePhaseCommit() : t;
 
-                                return new GridFinishedFuture<>(resp);
-                            }
+                            sendLockReply(nearNode, t, req, resp);
+
+                            return new GridFinishedFuture<>(resp);
                         }
                     }
                 );
@@ -1105,7 +1086,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
             if (tx != null) {
                 try {
-                    tx.rollback();
+                    tx.rollbackDhtLocal();
                 }
                 catch (IgniteCheckedException ex) {
                     U.error(log, "Failed to rollback the transaction: " + tx, ex);
@@ -1309,7 +1290,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
      */
     private void sendLockReply(
         ClusterNode nearNode,
-        @Nullable IgniteInternalTx tx,
+        @Nullable GridDhtTxLocal tx,
         GridNearLockRequest req,
         GridNearLockResponse res
     ) {
@@ -1347,7 +1328,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 ", res=" + res + ']', e);
 
             if (tx != null)
-                tx.rollbackAsync();
+                tx.rollbackDhtLocalAsync();
 
             // Convert to closure exception as this method is only called form closures.
             throw new GridClosureException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index bff69bc..b1c7e5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -292,82 +292,22 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> prepareAsync() {
-        if (optimistic()) {
-            assert isSystemInvalidate();
-
-            return prepareAsync(
-                null,
-                null,
-                Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-                0,
-                nearMiniId,
-                null,
-                true);
-        }
-
-        long timeout = remainingTime();
+    @Override public IgniteInternalFuture<?> salvageTx() {
+        systemInvalidate(true);
 
-        // For pessimistic mode we don't distribute prepare request.
-        GridDhtTxPrepareFuture fut = prepFut;
+        state(PREPARED);
 
-        if (fut == null) {
-            // Future must be created before any exception can be thrown.
-            if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
-                cctx,
-                this,
-                timeout,
-                nearMiniId,
-                Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-                true,
-                needReturnValue()))) {
-                if (timeout == -1)
-                    prepFut.onError(timeoutException());
+        if (state() == PREPARING) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+                    "by another thread: " + this);
 
-                return prepFut;
-            }
-        }
-        else
-            // Prepare was called explicitly.
-            return fut;
-
-        if (!state(PREPARING)) {
-            if (setRollbackOnly()) {
-                if (timeout == -1)
-                    fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
-                        this));
-                else
-                    fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() +
-                        ", tx=" + this + ']'));
-            }
-            else
-                fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" +
-                    state() + ", tx=" + this + ']'));
-
-            return fut;
+            return null;
         }
 
-        try {
-            userPrepare();
-
-            if (!state(PREPARED)) {
-                setRollbackOnly();
-
-                fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() +
-                    ", tx=" + this + ']'));
-
-                return fut;
-            }
+        setRollbackOnly();
 
-            fut.complete();
-
-            return fut;
-        }
-        catch (IgniteCheckedException e) {
-            fut.onError(e);
-
-            return fut;
-        }
+        return rollbackDhtLocalAsync();
     }
 
     /**
@@ -382,7 +322,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @param last {@code True} if this is last prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
+    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
         @Nullable Collection<IgniteTxEntry> reads,
         @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -478,7 +418,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
 
             try {
-                rollback();
+                rollbackDhtLocal();
             }
             catch (IgniteTxOptimisticCheckedException e1) {
                 if (log.isDebugEnabled())
@@ -523,7 +463,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (prepFut != null)
                 prepFut.get(); // Check for errors.
 
-            boolean finished = finish(commit);
+            boolean finished = localFinish(commit);
 
             if (!finished)
                 err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit +
@@ -544,16 +484,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             fut.finish(commit);
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+    /**
+     * @return Commit future.
+     */
+    public IgniteInternalFuture<IgniteInternalTx> commitDhtLocalAsync() {
         if (log.isDebugEnabled())
             log.debug("Committing dht local tx: " + this);
 
-        // In optimistic mode prepare was called explicitly.
-        if (pessimistic())
-            prepareAsync();
-
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
 
         cctx.mvcc().addFuture(fut, fut.futureId());
@@ -581,15 +518,29 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+        return commitDhtLocalAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
         assert optimistic();
 
         PREP_FUT_UPD.compareAndSet(this, fut, null);
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void rollbackDhtLocal() throws IgniteCheckedException {
+        rollbackDhtLocalAsync().get();
+    }
+
+    /**
+     * @return Rollback future.
+     */
+    public IgniteInternalFuture<IgniteInternalTx> rollbackDhtLocalAsync() {
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false);
 
         cctx.mvcc().addFuture(fut, fut.futureId());
@@ -612,8 +563,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+        return rollbackDhtLocalAsync();
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
         assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate()
             || onePhaseCommit() || state() == PREPARED :
             "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
@@ -621,7 +577,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         assert nearMiniId != 0;
 
-        return super.finish(commit);
+        return super.localFinish(commit);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 67e1993..0329386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -161,7 +160,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /**
      * @param node Node.
      */
-    public void addLockTransactionNode(ClusterNode node) {
+    void addLockTransactionNode(ClusterNode node) {
         assert node != null;
         assert !node.isLocal();
 
@@ -185,7 +184,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      *
      * @return Has near cache flag.
      */
-    public boolean nearOnOriginatingNode() {
+    boolean nearOnOriginatingNode() {
         return nearOnOriginatingNode;
     }
 
@@ -206,7 +205,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /**
      * @return Nodes where transactions were started on lock step.
      */
-    @Nullable public Set<ClusterNode> lockTransactionNodes() {
+    @Nullable Set<ClusterNode> lockTransactionNodes() {
         return lockTxNodes;
     }
 
@@ -349,14 +348,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /**
      * @param mappings Mappings to add.
      */
-    void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+    private void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
         addMapping(mappings, dhtMap);
     }
 
     /**
      * @param mappings Mappings to add.
      */
-    void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+    private void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
         addMapping(mappings, nearMap);
     }
 
@@ -654,7 +653,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 needRetVal,
                 createTtl,
                 accessTtl,
-                null,
                 skipStore,
                 keepBinary);
         }
@@ -673,7 +671,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param needRetVal Return value flag.
      * @param createTtl TTL for create operation.
      * @param accessTtl TTL for read operation.
-     * @param filter Entry write filter.
      * @param skipStore Skip store flag.
      * @return Future for lock acquisition.
      */
@@ -685,7 +682,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         final boolean needRetVal,
         final long createTtl,
         final long accessTtl,
-        @Nullable final CacheEntryPredicate[] filter,
         boolean skipStore,
         boolean keepBinary) {
         if (log.isDebugEnabled())
@@ -729,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         /*retval*/false,
                         /*read*/read,
                         accessTtl,
-                        filter == null ? CU.empty0() : filter,
+                        CU.empty0(),
                         /*computeInvoke*/false);
 
                     return ret;
@@ -740,7 +736,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
-    @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+    @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]");
 
@@ -858,16 +854,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void rollback() throws IgniteCheckedException {
-        try {
-            rollbackAsync().get();
-        }
-        finally {
-            cctx.tm().resetContext();
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
             "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 56884ff..93ea30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -718,18 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
                         new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
                             @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
-                                try {
-                                    if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
-                                        sendPrepareResponse(res);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + tx.nearNodeId() +
-                                        ", res=" + res,
-                                        ", tx=" + tx,
-                                        e);
-                                }
+                                if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
+                                    sendPrepareResponse(res);
                             }
                         };
 
@@ -761,18 +751,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
             }
             else {
-                try {
-                    if (REPLIED_UPD.compareAndSet(this, 0, 1))
-                        sendPrepareResponse(res);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                        ", dhtTxId=" + tx.xidVersion() +
-                        ", node=" + tx.nearNodeId() +
-                        ", res=" + res,
-                        ", tx=" + tx,
-                        e);
-                }
+                if (REPLIED_UPD.compareAndSet(this, 0, 1))
+                    sendPrepareResponse(res);
             }
 
             return true;
@@ -784,14 +764,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 try {
                     sendPrepareResponse(res);
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
-                        ", dhtTxId=" + tx.xidVersion() +
-                        ", node=" + tx.nearNodeId() +
-                        ", res=" + res,
-                        ", tx=" + tx,
-                        e);
-                }
                 finally {
                     // Will call super.onDone().
                     onComplete(res);
@@ -819,9 +791,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /**
      * @param res Response.
-     * @throws IgniteCheckedException If failed to send response.
      */
-    private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
+    private void sendPrepareResponse(GridNearTxPrepareResponse res) {
         if (!tx.nearNodeId().equals(cctx.localNodeId())) {
             Throwable err = this.err;
 
@@ -837,13 +808,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 return;
             }
 
-            cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+            try {
+                cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
 
-            if (msgLog.isDebugEnabled()) {
-                msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + tx.nearNodeId() +
+                        ", res=" + res + ']');
+                }
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("Failed to send prepare response, node left [txId=" + tx.nearXidVersion() + "," +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + tx.nearNodeId() +
+                        ", res=" + res + ']');
+                }
+            }
+            catch (IgniteCheckedException e) {
+                U.error(msgLog, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
                     ", dhtTxId=" + tx.xidVersion() +
                     ", node=" + tx.nearNodeId() +
-                    ", res=" + res + ']');
+                    ", res=" + res,
+                    ", tx=" + tx + ']',
+                    e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index e1e0ec2..03bbfe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -207,13 +207,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (keyCheck)
             validateCacheKey(key);
 
-        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<V>() {
-                @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                     IgniteInternalFuture<Map<Object, Object>>  fut = tx.getAllAsync(ctx,
                         readyTopVer,
                         Collections.singleton(ctx.toCacheKeyObject(key)),
@@ -289,13 +289,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (keyCheck)
             validateCacheKeys(keys);
 
-        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                     return tx.getAllAsync(ctx,
                         readyTopVer,
                         ctx.cacheKeysView(keys),

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 0ce380d..79c15fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -917,6 +917,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                         first = false;
                                     }
 
+                                    assert !implicitTx() && !implicitSingleTx() : tx;
+
                                     req = new GridNearLockRequest(
                                         cctx.cacheId(),
                                         topVer,
@@ -925,8 +927,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                                         futId,
                                         lockVer,
                                         inTx(),
-                                        implicitTx(),
-                                        implicitSingleTx(),
                                         read,
                                         retval,
                                         isolation(),
@@ -982,9 +982,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 }
             }
 
-            if (inTx() && req != null)
-                req.hasTransforms(tx.hasTransforms());
-
             if (!distributedKeys.isEmpty()) {
                 mapping.distributedKeys(distributedKeys);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ffc84d8..1948df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1045,6 +1045,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                                 first = false;
                                             }
 
+                                            assert !implicitTx() && !implicitSingleTx() : tx;
+
                                             req = new GridNearLockRequest(
                                                 cctx.cacheId(),
                                                 topVer,
@@ -1053,8 +1055,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                                 futId,
                                                 lockVer,
                                                 inTx(),
-                                                implicitTx(),
-                                                implicitSingleTx(),
                                                 read,
                                                 retval,
                                                 isolation(),


Mime
View raw message