ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [09/16] ignite git commit: Internal cache API cleanup.
Date Fri, 17 Mar 2017 14:50:06 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index f5687a0..307c348 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -17,21 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.Collection;
-import java.util.Map;
-import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,141 +46,11 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     public void userRollback() throws IgniteCheckedException;
 
     /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys to get.
-     * @param deserializeBinary Deserialize binary flag.
-     * @param skipVals Skip values flag.
-     * @param keepCacheObjects Keep cache objects
-     * @param skipStore Skip store flag.
-     * @return Future for this get.
-     */
-    public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Collection<KeyCacheObject> keys,
-        boolean deserializeBinary,
-        boolean skipVals,
-        boolean keepCacheObjects,
-        boolean skipStore,
-        boolean needVer);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param map Map to put.
-     * @param retval Flag indicating whether a value should be returned.
-     * @return Future for put operation.
-     */
-    public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Map<? extends K, ? extends V> map,
-        boolean retval);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param key Key.
-     * @param val Value.
-     * @param retval Return value flag.
-     * @param filter Filter.
-     * @return Future for put operation.
-     */
-    public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        K key,
-        V val,
-        boolean retval,
-        CacheEntryPredicate filter);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param key Key.
-     * @param entryProcessor Entry processor.
-     * @param invokeArgs Optional arguments for entry processor.
-     * @return Operation future.
-     */
-    public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        K key,
-        EntryProcessor<K, V, Object> entryProcessor,
-        Object... invokeArgs);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param map Entry processors map.
-     * @param invokeArgs Optional arguments for entry processor.
-     * @return Operation future.
-     */
-    public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
-        Object... invokeArgs);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys to remove.
-     * @param retval Flag indicating whether a value should be returned.
-     * @param filter Filter.
-     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
-     * @return Future for asynchronous remove.
-     */
-    public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
-        GridCacheContext cacheCtx,
-        @Nullable AffinityTopologyVersion entryTopVer,
-        Collection<? extends K> keys,
-        boolean retval,
-        CacheEntryPredicate filter,
-        boolean singleRmv);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param drMap DR map to put.
-     * @return Future for DR put operation.
-     */
-    public IgniteInternalFuture<?> putAllDrAsync(
-        GridCacheContext cacheCtx,
-        Map<KeyCacheObject, GridCacheDrInfo> drMap);
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param drMap DR map.
-     * @return Future for asynchronous remove.
-     */
-    public IgniteInternalFuture<?> removeAllDrAsync(
-        GridCacheContext cacheCtx,
-        Map<KeyCacheObject, GridCacheVersion> drMap);
-
-    /**
      * Finishes transaction (either commit or rollback).
      *
      * @param commit {@code True} if commit, {@code false} if rollback.
      * @return {@code True} if state has been changed.
      * @throws IgniteCheckedException If finish failed.
      */
-    public boolean finish(boolean commit) throws IgniteCheckedException;
-
-    /**
-     * @param cacheCtx  Cache context.
-     * @param readThrough Read through flag.
-     * @param async if {@code True}, then loading will happen in a separate thread.
-     * @param keys Keys.
-     * @param skipVals Skip values flag.
-     * @param needVer If {@code true} version is required for loaded values.
-     * @param c Closure to be applied for loaded values.
-     * @param expiryPlc Expiry policy.
-     * @return Future with {@code True} value if loading took place.
-     */
-    public IgniteInternalFuture<Void> loadMissing(
-        GridCacheContext cacheCtx,
-        AffinityTopologyVersion topVer,
-        boolean readThrough,
-        boolean async,
-        Collection<KeyCacheObject> keys,
-        boolean skipVals,
-        boolean needVer,
-        boolean keepBinary,
-        final ExpiryPolicy expiryPlc,
-        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
+    public boolean localFinish(boolean commit) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index df3bad2..d1334ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
-import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
@@ -82,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -127,7 +125,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /** Slow tx warn timeout (initialized to 0). */
     private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0);
 
-    /** Tx salvage timeout (default 3s). */
+    /** Tx salvage timeout. */
     private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);
 
     /** One phase commit deferred ack request timeout. */
@@ -138,9 +136,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
         Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256);
 
-    /** Version in which deadlock detection introduced. */
-    public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19");
-
     /** Deadlock detection maximum iterations. */
     static int DEADLOCK_MAX_ITERS =
         IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000);
@@ -184,7 +179,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             PER_SEGMENT_Q);
 
     /** Pending one phase commit ack requests sender. */
-    private GridDeferredAckMessageSender deferredAckMessageSender;
+    private GridDeferredAckMessageSender deferredAckMsgSnd;
 
     /** Transaction finish synchronizer. */
     private GridCacheTxFinishSync txFinishSync;
@@ -206,54 +201,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     private TxDeadlockDetection txDeadlockDetection;
 
     /** {@inheritDoc} */
-    @Override protected void onKernalStart0(boolean reconnect) {
-        if (reconnect)
-            return;
-
-        cctx.gridEvents().addLocalEventListener(
-            new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    assert evt instanceof DiscoveryEvent;
-                    assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
-                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-                    UUID nodeId = discoEvt.eventNode().id();
-
-                    cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
-
-                    if (txFinishSync != null)
-                        txFinishSync.onNodeLeft(nodeId);
-
-                    for (TxDeadlockFuture fut : deadlockDetectFuts.values())
-                        fut.onNodeLeft(nodeId);
-
-                    for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
-                        Object obj = entry.getValue();
-
-                        if (obj instanceof GridCacheReturnCompletableWrapper &&
-                            nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
-                            removeTxReturn(entry.getKey());
-                    }
-                }
-            },
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        this.txDeadlockDetection = new TxDeadlockDetection(cctx);
-
-        cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
-
-        for (IgniteInternalTx tx : idMap.values()) {
-            if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) {
-                if (log.isDebugEnabled())
-                    log.debug("Remaining transaction from left node: " + tx);
-
-                salvageTx(tx, true, USER_FINISH);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridIO().removeMessageListener(TOPIC_TX);
     }
@@ -264,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         txHnd = new IgniteTxHandler(cctx);
 
-        deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+        deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
             @Override public int getTimeout() {
                 return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
             }
@@ -293,6 +240,40 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 }
             }
         };
+
+        cctx.gridEvents().addLocalEventListener(
+            new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    assert evt instanceof DiscoveryEvent;
+                    assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                    UUID nodeId = discoEvt.eventNode().id();
+
+                    // Wait some time in case there are some unprocessed messages from failed node.
+                    cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
+
+                    if (txFinishSync != null)
+                        txFinishSync.onNodeLeft(nodeId);
+
+                    for (TxDeadlockFuture fut : deadlockDetectFuts.values())
+                        fut.onNodeLeft(nodeId);
+
+                    for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) {
+                        Object obj = entry.getValue();
+
+                        if (obj instanceof GridCacheReturnCompletableWrapper &&
+                            nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId()))
+                            removeTxReturn(entry.getKey());
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        this.txDeadlockDetection = new TxDeadlockDetection(cctx);
+
+        cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener());
     }
 
     /** {@inheritDoc} */
@@ -320,85 +301,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * Invalidates transaction.
      *
      * @param tx Transaction.
-     * @return {@code True} if transaction was salvaged by this call.
      */
-    public boolean salvageTx(IgniteInternalTx tx) {
-        return salvageTx(tx, false, USER_FINISH);
+    public void salvageTx(IgniteInternalTx tx) {
+        salvageTx(tx, USER_FINISH);
     }
 
     /**
      * Invalidates transaction.
      *
      * @param tx Transaction.
-     * @param warn {@code True} if warning should be logged.
      * @param status Finalization status.
-     * @return {@code True} if transaction was salvaged by this call.
      */
-    private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) {
+    private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) {
         assert tx != null;
 
         TransactionState state = tx.state();
 
-        if (state == ACTIVE || state == PREPARING || state == PREPARED) {
-            try {
-                if (!tx.markFinalizing(status)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
-
-                    return false;
-                }
-
-                tx.systemInvalidate(true);
-
-                tx.prepare();
-
-                if (tx.state() == PREPARING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
-                            "by another thread: " + tx);
-
-                    return false;
-                }
-
-                if (tx instanceof IgniteTxRemoteEx) {
-                    IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
-
-                    rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(),
-                        Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList());
-                }
-
-                tx.commit();
-
-                if (warn) {
-                    // This print out cannot print any peer-deployed entity either
-                    // directly or indirectly.
-                    U.warn(log, "Invalidated transaction because originating node either " +
-                        "crashed or left grid: " + CU.txString(tx));
-                }
-            }
-            catch (IgniteCheckedException ignore) {
+        if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) {
+            if (!tx.markFinalizing(status)) {
                 if (log.isDebugEnabled())
-                    log.debug("Optimistic failure while invalidating transaction (will rollback): " +
-                        tx.xidVersion());
+                    log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx);
 
-                try {
-                    tx.rollback();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
-                }
-            }
-        }
-        else if (state == MARKED_ROLLBACK) {
-            try {
-                tx.rollback();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e);
+                return;
             }
-        }
 
-        return true;
+            tx.salvageTx();
+
+            if (log.isDebugEnabled())
+                log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx));
+        }
     }
 
     /**
@@ -442,7 +373,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return {@code True} if transaction has been committed or rolled back,
      *      {@code false} otherwise.
      */
-    public boolean isCompleted(IgniteInternalTx tx) {
+    private boolean isCompleted(IgniteInternalTx tx) {
         boolean completed = completedVersHashMap.containsKey(tx.xidVersion());
 
         // Need check that for tx with timeout rollback message was not received before lock.
@@ -461,7 +392,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param txSize Expected transaction size.
      * @return New transaction.
      */
-    public IgniteTxLocalAdapter newTx(
+    public GridNearTxLocal newTx(
         boolean implicit,
         boolean implicitSingle,
         @Nullable GridCacheContext sysCacheCtx,
@@ -672,13 +603,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param cctx Cache context.
      * @return Transaction for current thread.
      */
-    @SuppressWarnings({"unchecked"})
-    public <T> T threadLocalTx(GridCacheContext cctx) {
+    public GridNearTxLocal threadLocalTx(GridCacheContext cctx) {
         IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
 
-        return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null;
+        if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) {
+            assert tx instanceof GridNearTxLocal : tx;
+
+            return (GridNearTxLocal)tx;
+        }
+
+        return null;
     }
 
     /**
@@ -747,48 +684,53 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return Local transaction.
      */
     @Nullable public IgniteInternalTx localTxx() {
-        IgniteInternalTx tx = txx();
+        IgniteInternalTx tx = tx();
 
         return tx != null && tx.local() ? tx : null;
     }
 
     /**
-     * @return Transaction for current thread.
-     */
-    @SuppressWarnings({"unchecked"})
-    public IgniteInternalTx txx() {
-        return tx();
-    }
-
-    /**
      * @return User transaction for current thread.
      */
-    @Nullable public IgniteInternalTx userTx() {
+    @Nullable public GridNearTxLocal userTx() {
         IgniteInternalTx tx = txContext();
 
-        if (tx != null && tx.user() && tx.state() == ACTIVE)
-            return tx;
+        if (activeUserTx(tx))
+            return (GridNearTxLocal)tx;
 
         tx = tx(null, Thread.currentThread().getId());
 
-        return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+        if (activeUserTx(tx))
+            return (GridNearTxLocal)tx;
+
+        return null;
     }
 
     /**
+     * @param cctx Cache context.
      * @return User transaction for current thread.
      */
-    @Nullable public IgniteInternalTx userTx(GridCacheContext cctx) {
+    @Nullable GridNearTxLocal userTx(GridCacheContext cctx) {
         IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId());
 
-        return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null;
+        if (activeUserTx(tx))
+            return (GridNearTxLocal)tx;
+
+        return null;
     }
 
     /**
-     * @return User transaction.
+     * @param tx Transaction.
+     * @return {@code True} if given transaction is explicitly started user transaction.
      */
-    @SuppressWarnings({"unchecked"})
-    @Nullable public <T extends IgniteTxLocalEx> T userTxx() {
-        return (T)userTx();
+    private boolean activeUserTx(@Nullable IgniteInternalTx tx) {
+        if (tx != null && tx.user() && tx.state() == ACTIVE) {
+            assert tx instanceof GridNearTxLocal : tx;
+
+            return true;
+        }
+
+        return false;
     }
 
     /**
@@ -1241,7 +1183,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 6. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 7. Remove obsolete entries from cache.
             removeObsolete(tx);
@@ -1314,7 +1256,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 4. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 5. Remove obsolete entries.
             removeObsolete(tx);
@@ -1364,7 +1306,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         if (txIdMap.remove(tx.xidVersion(), tx)) {
             // 1. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 2. Evict near entries.
             if (!tx.readMap().isEmpty()) {
@@ -1400,7 +1342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      *
      * @param tx Tx to uncommit.
      */
-    public void uncommitTx(IgniteInternalTx tx) {
+    void uncommitTx(IgniteInternalTx tx) {
         assert tx != null;
 
         if (log.isDebugEnabled())
@@ -1417,15 +1359,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 unlockMultiple(tx, tx.readEntries());
 
             // 3. Notify evictions.
-            notifyEvitions(tx);
+            notifyEvictions(tx);
 
             // 4. Remove from per-thread storage.
             clearThreadMap(tx);
 
             // 5. Unregister explicit locks.
-            if (!tx.alternateVersions().isEmpty())
+            if (!tx.alternateVersions().isEmpty()) {
                 for (GridCacheVersion ver : tx.alternateVersions())
                     idMap.remove(ver);
+            }
 
             // 6. Remove Near-2-DHT mappings.
             if (tx instanceof GridCacheMappedVersion)
@@ -1481,7 +1424,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     /**
      * @param tx Transaction to notify evictions for.
      */
-    private void notifyEvitions(IgniteInternalTx tx) {
+    private void notifyEvictions(IgniteInternalTx tx) {
         if (tx.internal())
             return;
 
@@ -1981,10 +1924,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return;
         }
 
-        if (tx instanceof GridDistributedTxRemoteAdapter) {
+        if (tx instanceof IgniteTxRemoteEx) {
             IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx;
 
-            rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(),
+            rmtTx.doneRemote(tx.xidVersion(),
+                Collections.<GridCacheVersion>emptyList(),
+                Collections.<GridCacheVersion>emptyList(),
                 Collections.<GridCacheVersion>emptyList());
         }
 
@@ -2058,43 +2003,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             return;
         }
 
-        if (supportsDeadlockDetection(node)) {
-            TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
+        TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys);
 
-            try {
-                if (!cctx.localNodeId().equals(nodeId))
-                    req.prepareMarshal(cctx);
-
-                cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                if (e instanceof ClusterTopologyCheckedException) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to finish deadlock detection, node left: " + nodeId);
-                }
-                else
-                    U.warn(log, "Failed to finish deadlock detection: " + e, e);
+        try {
+            if (!cctx.localNodeId().equals(nodeId))
+                req.prepareMarshal(cctx);
 
-                fut.onDone();
-            }
+            cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL);
         }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node);
+        catch (IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to finish deadlock detection, node left: " + nodeId);
+            }
+            else
+                U.warn(log, "Failed to finish deadlock detection: " + e, e);
 
             fut.onDone();
         }
     }
 
     /**
-     * @param node Node.
-     * @return {@code True} if node supports deadlock detection protocol.
-     */
-    private boolean supportsDeadlockDetection(ClusterNode node) {
-        return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0;
-    }
-
-    /**
      * @param tx Tx.
      * @param txKeys Tx keys.
      * @return {@code True} if key is involved into tx.
@@ -2265,7 +2194,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @param ver Version to ack.
      */
     public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) {
-        deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver);
+        deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver);
     }
 
     /**
@@ -2314,9 +2243,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                         ", failedNodeId=" + evtNodeId + ']');
 
                 for (final IgniteInternalTx tx : txs()) {
-                    if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) {
+                    if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
                         // Invalidate transactions.
-                        salvageTx(tx, false, RECOVERY_FINISH);
+                        salvageTx(tx, RECOVERY_FINISH);
                     }
                     else {
                         // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 8ceca3f..87cc7cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -25,6 +26,16 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  */
 public interface IgniteTxRemoteEx extends IgniteInternalTx {
     /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void commitRemoteTx() throws IgniteCheckedException;
+
+    /**
+     *
+     */
+    public void rollbackRemoteTx();
+
+    /**
      * @param baseVer Base version.
      * @param committedVers Committed version.
      * @param rolledbackVers Rolled back version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 1c2ccbe..3c27bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+    @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index c121b1b..822e44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -104,7 +104,7 @@ public interface IgniteTxState {
      * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
      *      store enabled.
      */
-    public boolean storeUsed(GridCacheSharedContext cctx);
+    public boolean storeWriteThrough(GridCacheSharedContext cctx);
 
     /**
      * @param cctx Context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 76751de..399eea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+    @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) {
         if (!activeCacheIds.isEmpty()) {
             for (int i = 0; i < activeCacheIds.size(); i++) {
                 int cacheId = (int)activeCacheIds.get(i);
 
                 CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
-                if (store.configured())
+                if (store.configured() && store.isWriteThrough())
                     return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 6134b9f..8ffec00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -51,7 +52,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
 
     /** Wrapped transaction. */
     @GridToStringInclude
-    private IgniteInternalTx tx;
+    private GridNearTxLocal tx;
 
     /** Gateway. */
     @GridToStringExclude
@@ -75,7 +76,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
      * @param cctx Shared context.
      * @param async Async flag.
      */
-    public TransactionProxyImpl(IgniteInternalTx tx, GridCacheSharedContext<K, V> cctx, boolean async) {
+    public TransactionProxyImpl(GridNearTxLocal tx, GridCacheSharedContext<K, V> cctx, boolean async) {
         assert tx != null;
         assert cctx != null;
 
@@ -87,7 +88,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
     /**
      * @return Transaction.
      */
-    public IgniteInternalTx tx() {
+    public GridNearTxLocal tx() {
         return tx;
     }
 
@@ -316,7 +317,9 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
     private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
         IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
             @Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException {
-                return fut.get().proxy();
+                fut.get();
+
+                return TransactionProxyImpl.this;
             }
         });
 
@@ -330,7 +333,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        tx = (IgniteInternalTx)in.readObject();
+        tx = (GridNearTxLocal)in.readObject();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 96644a3..0420182 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -67,7 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -95,8 +95,8 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
-import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -342,7 +342,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class);
 
                     // Check that sequence hasn't been created in other thread yet.
@@ -471,7 +471,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class);
 
                     // Check that atomic long hasn't been created in other thread yet.
@@ -551,7 +551,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 if (!create)
                     return c.applyx();
 
-                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                     IgniteCheckedException err =
                         utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
 
@@ -623,7 +623,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         retryTopologySafe(new IgniteOutClosureX<Void>() {
             @Override public Void applyx() throws IgniteCheckedException {
-                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                     T2<Boolean, IgniteCheckedException> res =
                         utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get();
 
@@ -682,7 +682,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue val = cast(dsView.get(key),
                         GridCacheAtomicReferenceValue.class);
 
@@ -786,7 +786,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicStampedValue val = cast(dsView.get(key),
                         GridCacheAtomicStampedValue.class);
 
@@ -1033,7 +1033,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         return retryTopologySafe(new IgniteOutClosureX<T>() {
             @Override public T applyx() throws IgniteCheckedException {
-                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                     T2<String, IgniteCheckedException> res =
                         utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get();
 
@@ -1133,7 +1133,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
                     // Check that count down hasn't been created in other thread yet.
@@ -1198,7 +1198,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
                     GridCacheCountDownLatchValue val =
                             cast(dsView.get(key), GridCacheCountDownLatchValue.class);
@@ -1254,7 +1254,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
 
                     // Check that semaphore hasn't been created in other thread yet.
@@ -1319,7 +1319,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
                     GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
 
@@ -1371,7 +1371,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
 
                     // Check that reentrant lock hasn't been created in other thread yet.
@@ -1438,7 +1438,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
                 dsCacheCtx.gate().enter();
 
-                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                     // Check correctness type of removable object.
                     GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
 
@@ -1474,7 +1474,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         return CU.outTx(
             new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
-                    try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
                         // Check correctness type of removable object.
                         R val = cast(dsView.get(key), cls);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index dfd2122..640b72d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -90,7 +90,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Callable for {@link #incrementAndGet()}. */
     private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
         @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheAtomicLongValue val = atomicView.get(key);
 
                 if (val == null)
@@ -117,7 +117,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Callable for {@link #getAndIncrement()}. */
     private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
         @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheAtomicLongValue val = atomicView.get(key);
 
                 if (val == null)
@@ -144,7 +144,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Callable for {@link #decrementAndGet()}. */
     private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
         @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheAtomicLongValue val = atomicView.get(key);
 
                 if (val == null)
@@ -171,7 +171,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Callable for {@link #getAndDecrement()}. */
     private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
         @Override public Long call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+            try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheAtomicLongValue val = atomicView.get(key);
 
                 if (val == null)
@@ -430,7 +430,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     private Callable<Long> internalAddAndGet(final long l) {
         return retryTopologySafe(new Callable<Long>() {
             @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicLongValue val = atomicView.get(key);
 
                     if (val == null)
@@ -464,7 +464,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     private Callable<Long> internalGetAndAdd(final long l) {
         return retryTopologySafe(new Callable<Long>() {
             @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicLongValue val = atomicView.get(key);
 
                     if (val == null)
@@ -498,7 +498,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     private Callable<Long> internalGetAndSet(final long l) {
         return new Callable<Long>() {
             @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicLongValue val = atomicView.get(key);
 
                     if (val == null)
@@ -534,7 +534,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
         return new Callable<Long>() {
             @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicLongValue val = atomicView.get(key);
 
                     if (val == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 448dd8b..6911b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -213,7 +213,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     private Callable<Boolean> internalSet(final T val) {
         return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
 
                     if (ref == null)
@@ -247,7 +247,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
         return retryTopologySafe(new Callable<T>() {
             @Override public T call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
 
                     if (ref == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 754d8f5..87aae8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -486,7 +486,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     private Callable<Long> internalUpdate(final long l, final boolean updated) {
         return retryTopologySafe(new Callable<Long>() {
             @Override public Long call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicSequenceValue seq = seqView.get(key);
 
                     checkRemoved();

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 6ac303c..14f80e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -267,7 +267,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
     private Callable<Boolean> internalSet(final T val, final S stamp) {
         return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 
                     if (stmp == null)
@@ -305,7 +305,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
         final IgniteClosure<S, S> newStampClos) {
         return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
-                try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
+                try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
 
                     if (stmp == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 723fb55..45c3677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -32,7 +32,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -282,7 +282,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                 internalLatch = CU.outTx(
                     retryTopologySafe(new Callable<CountDownLatch>() {
                         @Override public CountDownLatch call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheCountDownLatchValue val = latchView.get(key);
 
                                 if (val == null) {
@@ -407,7 +407,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
         /** {@inheritDoc} */
         @Override public Integer call() throws Exception {
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+            try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
                 GridCacheCountDownLatchValue latchVal = latchView.get(key);
 
                 if (latchVal == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 1cf78fa..5f0cb44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -49,7 +49,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -520,8 +520,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
-
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheLockState val = lockView.get(key);
 
                                 if (val == null)
@@ -614,7 +613,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheLockState val = lockView.get(key);
 
                                 if (val == null)
@@ -711,7 +710,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheLockState val = lockView.get(key);
 
                                 if (val == null)
@@ -1089,7 +1088,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable
                 sync = CU.outTx(
                     retryTopologySafe(new Callable<Sync>() {
                         @Override public Sync call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheLockState val = lockView.get(key);
 
                                 if (val == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index a11c79d..a1c0515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -285,7 +285,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 return CU.outTx(
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx,
                                 semView,
                                 PESSIMISTIC, REPEATABLE_READ)
                             ) {
@@ -359,7 +359,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                     retryTopologySafe(new Callable<Boolean>() {
                         @Override public Boolean call() throws Exception {
                             try (
-                                IgniteInternalTx tx = CU.txStartInternal(ctx,
+                                GridNearTxLocal tx = CU.txStartInternal(ctx,
                                     semView,
                                     PESSIMISTIC, REPEATABLE_READ)
                             ) {
@@ -454,7 +454,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 sync = CU.outTx(
                     retryTopologySafe(new Callable<Sync>() {
                         @Override public Sync call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+                            try (GridNearTxLocal tx = CU.txStartInternal(ctx,
                                 semView, PESSIMISTIC, REPEATABLE_READ)) {
                                 GridCacheSemaphoreState val = semView.get(key);
 
@@ -465,7 +465,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                                     return null;
                                 }
 
-                                final int count = val.getCount();
+                                final int cnt = val.getCount();
 
                                 Map<UUID, Integer> waiters = val.getWaiters();
 
@@ -473,7 +473,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
                                 tx.commit();
 
-                                return new Sync(count, waiters, failoverSafe);
+                                return new Sync(cnt, waiters, failoverSafe);
                             }
                         }
                     }),
@@ -676,7 +676,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                 retryTopologySafe(new Callable<Integer>() {
                     @Override public Integer call() throws Exception {
                         try (
-                            IgniteInternalTx tx = CU.txStartInternal(ctx,
+                            GridNearTxLocal tx = CU.txStartInternal(ctx,
                                 semView, PESSIMISTIC, REPEATABLE_READ)
                         ) {
                             GridCacheSemaphoreState val = semView.get(key);
@@ -684,11 +684,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
                             if (val == null)
                                 throw new IgniteException("Failed to find semaphore with given name: " + name);
 
-                            int count = val.getCount();
+                            int cnt = val.getCount();
 
                             tx.rollback();
 
-                            return count;
+                            return cnt;
                         }
                     }
                 }),

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 7b80765..846eb69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -25,7 +25,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -57,7 +57,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                 @Override public Boolean call() throws Exception {
                     boolean retVal;
 
-                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                         Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
 
                         if (idx != null) {
@@ -97,7 +97,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                     T retVal;
 
                     while (true) {
-                        try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                             Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
 
                             if (idx != null) {
@@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
                 @Override public Boolean call() throws Exception {
                     boolean retVal;
 
-                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                         Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
 
                         if (idx != null) {
@@ -188,7 +188,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
         try {
             retryTopologySafe(new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                         Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
 
                         if (idx != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 74fc175..acd0a1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,7 +17,31 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
+import java.io.DataInput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -41,7 +65,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
 import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -58,31 +82,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import java.io.DataInput;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
@@ -654,7 +653,7 @@ public class IgfsDataManager extends IgfsManager {
                         // Need to check if block is partially written.
                         // If so, must update it in pessimistic transaction.
                         if (block.length != fileInfo.blockSize()) {
-                            try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                            try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                                 Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
 
                                 byte[] val = vals.get(colocatedKey);
@@ -1062,7 +1061,7 @@ public class IgfsDataManager extends IgfsManager {
         IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null,
             colocatedKey.evictExclude(), colocatedKey.blockId());
 
-        try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+        try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
             // Lock keys.
             Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
 


Mime
View raw message