ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [23/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..81184a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** Flag indicating whether cache operation requires a previous value. */
     private boolean retVal;
 
+    /** {@code True} if first lock request for lock operation sent from client node. */
+    private boolean firstClientReq;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param implicitTx Flag to indicate that transaction is implicit.
      * @param implicitSingleTx Implicit-transaction-with-one-key flag.
      * @param isRead Indicates whether implicit lock is for read or write operation.
+     * @param retVal Return value flag.
      * @param isolation Transaction isolation.
      * @param isInvalidate Invalidation flag.
      * @param timeout Lock timeout.
@@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param taskNameHash Task name hash code.
      * @param accessTtl TTL for read operation.
      * @param skipStore Skip store flag.
+     * @param firstClientReq {@code True} if first lock request for lock operation sent from client node.
      */
     public GridNearLockRequest(
         int cacheId,
@@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         @Nullable UUID subjId,
         int taskNameHash,
         long accessTtl,
-        boolean skipStore
+        boolean skipStore,
+        boolean firstClientReq
     ) {
         super(
             cacheId,
@@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         this.taskNameHash = taskNameHash;
         this.accessTtl = accessTtl;
         this.retVal = retVal;
+        this.firstClientReq = firstClientReq;
 
         dhtVers = new GridCacheVersion[keyCnt];
     }
 
     /**
+     * @return {@code True} if first lock request for lock operation sent from client node.
+     */
+    public boolean firstClientRequest() {
+        return firstClientReq;
+    }
+
+    /**
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
@@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBoolean("hasTransforms", hasTransforms))
+                if (!writer.writeBoolean("firstClientReq", firstClientReq))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
+                if (!writer.writeBoolean("hasTransforms", hasTransforms))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("implicitTx", implicitTx))
+                if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeBoolean("implicitTx", implicitTx))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeBoolean("retVal", retVal))
+                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeBoolean("syncCommit", syncCommit))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBoolean("syncCommit", syncCommit))
                     return false;
 
                 writer.incrementState();
 
             case 33:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 34:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 24:
-                hasTransforms = reader.readBoolean("hasTransforms");
+                firstClientReq = reader.readBoolean("firstClientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 25:
-                implicitSingleTx = reader.readBoolean("implicitSingleTx");
+                hasTransforms = reader.readBoolean("hasTransforms");
 
                 if (!reader.isLastRead())
                     return false;
@@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 26:
-                implicitTx = reader.readBoolean("implicitTx");
+                implicitSingleTx = reader.readBoolean("implicitSingleTx");
 
                 if (!reader.isLastRead())
                     return false;
@@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 27:
-                miniId = reader.readIgniteUuid("miniId");
+                implicitTx = reader.readBoolean("implicitTx");
 
                 if (!reader.isLastRead())
                     return false;
@@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 28:
-                onePhaseCommit = reader.readBoolean("onePhaseCommit");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 29:
-                retVal = reader.readBoolean("retVal");
+                onePhaseCommit = reader.readBoolean("onePhaseCommit");
 
                 if (!reader.isLastRead())
                     return false;
@@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 30:
-                subjId = reader.readUuid("subjId");
+                retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 31:
-                syncCommit = reader.readBoolean("syncCommit");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 32:
-                taskNameHash = reader.readInt("taskNameHash");
+                syncCommit = reader.readBoolean("syncCommit");
 
                 if (!reader.isLastRead())
                     return false;
@@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 33:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 34:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 20928de..f324198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     /** Filter evaluation results for fast-commit transactions. */
     private boolean[] filterRes;
 
+    /** {@code True} if client node should remap lock request. */
+    private AffinityTopologyVersion clientRemapVer;
+
     /**
      * Empty constructor (required by {@link Externalizable}).
      */
@@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
      * @param filterRes {@code True} if need to allocate array for filter evaluation results.
      * @param cnt Count.
      * @param err Error.
+     * @param clientRemapVer {@code True} if client node should remap lock request.
      */
     public GridNearLockResponse(
         int cacheId,
@@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
         IgniteUuid miniId,
         boolean filterRes,
         int cnt,
-        Throwable err
+        Throwable err,
+        AffinityTopologyVersion clientRemapVer
     ) {
         super(cacheId, lockVer, futId, cnt, err);
 
         assert miniId != null;
 
         this.miniId = miniId;
+        this.clientRemapVer = clientRemapVer;
 
         dhtVers = new GridCacheVersion[cnt];
         mappedVers = new GridCacheVersion[cnt];
@@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     }
 
     /**
+     * @return {@code True} if client node should remap lock request.
+     */
+    @Nullable public AffinityTopologyVersion clientRemapVersion() {
+        return clientRemapVer;
+    }
+
+    /**
      * Gets pending versions that are less than {@link #version()}.
      *
      * @return Pending versions.
@@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
         switch (writer.state()) {
             case 11:
-                if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("clientRemapVer", clientRemapVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeBooleanArray("filterRes", filterRes))
+                if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
+                if (!writer.writeBooleanArray("filterRes", filterRes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
+                if (!writer.writeIgniteUuid("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
                 if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
@@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
         switch (reader.state()) {
             case 11:
-                dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+                clientRemapVer = reader.readMessage("clientRemapVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 12:
-                filterRes = reader.readBooleanArray("filterRes");
+                dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 13:
-                mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+                filterRes = reader.readBooleanArray("filterRes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 14:
-                miniId = reader.readIgniteUuid("miniId");
+                mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 15:
+                miniId = reader.readIgniteUuid("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
                 pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 17;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4f74303..44b7997 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -221,18 +221,19 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         if (topVer != null) {
             tx.topologyVersion(topVer);
 
-            prepare0();
+            prepare0(false);
 
             return;
         }
 
-        prepareOnTopology();
+        prepareOnTopology(false, null);
     }
 
     /**
-     *
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
      */
-    private void prepareOnTopology() {
+    private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
         GridDhtTopologyFuture topFut = topologyReadLock();
 
         try {
@@ -265,16 +266,22 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                     return;
                 }
 
-                tx.topologyVersion(topFut.topologyVersion());
+                if (remap)
+                    tx.onRemap(topFut.topologyVersion());
+                else
+                    tx.topologyVersion(topFut.topologyVersion());
+
+                prepare0(remap);
 
-                prepare0();
+                if (c != null)
+                    c.run();
             }
             else {
                 topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                         cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
                             @Override public void run() {
-                                prepareOnTopology();
+                                prepareOnTopology(remap, c);
                             }
                         });
                     }
@@ -346,10 +353,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
     /**
      * Initializes future.
+     *
+     * @param remap Remap flag.
      */
-    private void prepare0() {
+    private void prepare0(boolean remap) {
         try {
-            if (!tx.state(PREPARING)) {
+            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+            if (!txStateCheck) {
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
                         onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
@@ -366,7 +377,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             }
 
             // Make sure to add future before calling prepare.
-            cctx.mvcc().addFuture(this);
+            if (!remap)
+                cctx.mvcc().addFuture(this);
 
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
@@ -502,7 +514,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
             tx.implicitSingle(),
             m.explicitLock(),
             tx.subjectId(),
-            tx.taskNameHash());
+            tx.taskNameHash(),
+            m.clientFirst());
 
         for (IgniteTxEntry txEntry : m.writes()) {
             if (txEntry.op() == TRANSFORM)
@@ -560,13 +573,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param cur Current mapping.
+     * @param waitLock Wait lock flag.
      * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key.
      * @return Mapping.
      */
     private GridDistributedTxMapping map(
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
-        GridDistributedTxMapping cur,
+        @Nullable GridDistributedTxMapping cur,
         boolean waitLock
     ) throws IgniteCheckedException {
         GridCacheContext cacheCtx = entry.context();
@@ -599,10 +613,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
 
         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) {
+            boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+
             cur = new GridDistributedTxMapping(primary);
 
             // Initialize near flag right away.
             cur.near(cacheCtx.isNear());
+
+            cur.clientFirst(clientFirst);
         }
 
         cur.add(entry);
@@ -748,18 +766,47 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                     onError(nodeId, mappings, res.error());
                 }
                 else {
-                    onPrepareResponse(m, res);
+                    if (res.clientRemapVersion() != null) {
+                        assert cctx.kernalContext().clientNode();
+                        assert m.clientFirst();
+
+                        IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                        if (affFut != null && !affFut.isDone()) {
+                            affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    remap();
+                                }
+                            });
+                        }
+                        else
+                            remap();
+                    }
+                    else {
+                        onPrepareResponse(m, res);
 
-                    // Proceed prepare before finishing mini future.
-                    if (mappings != null)
-                        proceedPrepare(mappings);
+                        // Proceed prepare before finishing mini future.
+                        if (mappings != null)
+                            proceedPrepare(mappings);
 
-                    // Finish this mini future.
-                    onDone(tx);
+                        // Finish this mini future.
+                        onDone(tx);
+                    }
                 }
             }
         }
 
+        /**
+         *
+         */
+        private void remap() {
+            prepareOnTopology(true, new Runnable() {
+                @Override public void run() {
+                    onDone(tx);
+                }
+            });
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index bce62c1..7006114 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -84,6 +84,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
         if (!isDone()) {
+            assert res.clientRemapVersion() == null : res;
+
             for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
                 MiniFuture f = (MiniFuture)fut;
 
@@ -187,7 +189,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 tx.implicitSingle(),
                 m.explicitLock(),
                 tx.subjectId(),
-                tx.taskNameHash());
+                tx.taskNameHash(),
+                false);
 
             for (IgniteTxEntry txEntry : m.writes()) {
                 if (txEntry.op() == TRANSFORM)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index df7a65f..696acfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -403,7 +403,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         assert nodeId != null;
         assert res != null;
 
-        GridNearLockFuture<K, V> fut = (GridNearLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
+        GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().<Boolean>future(res.version(),
             res.futureId());
 
         if (fut != null)
@@ -423,7 +423,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
     ) {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        GridNearLockFuture<K, V> fut = new GridNearLockFuture<>(ctx,
+        GridNearLockFuture fut = new GridNearLockFuture(ctx,
             keys,
             (GridNearTxLocal)tx,
             isRead,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index c38965d..fa8877a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -56,8 +56,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** DHT mappings. */
-    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings =
-        new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
 
     /** Future. */
     @GridToStringExclude
@@ -65,13 +64,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** */
     @GridToStringExclude
-    private final AtomicReference<GridNearTxFinishFuture> commitFut =
-        new AtomicReference<>();
+    private final AtomicReference<GridNearTxFinishFuture> commitFut = new AtomicReference<>();
 
     /** */
     @GridToStringExclude
-    private final AtomicReference<GridNearTxFinishFuture> rollbackFut =
-        new AtomicReference<>();
+    private final AtomicReference<GridNearTxFinishFuture> rollbackFut = new AtomicReference<>();
 
     /** Entries to lock on next step of prepare stage. */
     private Collection<IgniteTxEntry> optimisticLockEntries = Collections.emptyList();
@@ -85,6 +82,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /** Info for entries accessed locally in optimistic transaction. */
     private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
 
+    /** */
+    private boolean hasRemoteLocks;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -97,6 +97,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
      * @param implicit Implicit flag.
      * @param implicitSingle Implicit with one key flag.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param concurrency Concurrency.
      * @param isolation Isolation.
      * @param timeout Timeout.
@@ -1185,6 +1186,36 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void onRemap(AffinityTopologyVersion topVer) {
+        assert cctx.kernalContext().clientNode();
+
+        mapped.set(false);
+        nearLocallyMapped = false;
+        colocatedLocallyMapped = false;
+        txNodes = null;
+        onePhaseCommit = false;
+        nearMap.clear();
+        dhtMap.clear();
+        mappings.clear();
+
+        this.topVer.set(topVer);
+    }
+
+    /**
+     * @param hasRemoteLocks {@code True} if tx has remote locks acquired.
+     */
+    public void hasRemoteLocks(boolean hasRemoteLocks) {
+        this.hasRemoteLocks = hasRemoteLocks;
+    }
+
+    /**
+     * @return {@code True} if tx has remote locks acquired.
+     */
+    public boolean hasRemoteLocks() {
+        return hasRemoteLocks;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index a08637d..b602a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -75,6 +75,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Task name hash. */
     private int taskNameHash;
 
+    /** {@code True} if first optimistic tx prepare request sent from client node. */
+    private boolean firstClientReq;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -92,8 +95,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param txNodes Transaction nodes mapping.
      * @param last {@code True} if this last prepare request for node.
      * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
+     * @param onePhaseCommit One phase commit flag.
+     * @param retVal Return value flag.
+     * @param implicitSingle Implicit single flag.
+     * @param explicitLock Explicit lock flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
+     * @param firstClientReq {@code True} if first optimistic tx prepare request sent from client node.
      */
     public GridNearTxPrepareRequest(
         IgniteUuid futId,
@@ -110,11 +118,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         boolean implicitSingle,
         boolean explicitLock,
         @Nullable UUID subjId,
-        int taskNameHash
+        int taskNameHash,
+        boolean firstClientReq
     ) {
         super(tx, reads, writes, txNodes, onePhaseCommit);
 
         assert futId != null;
+        assert !firstClientReq || tx.optimistic() : tx;
 
         this.futId = futId;
         this.topVer = topVer;
@@ -126,6 +136,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.explicitLock = explicitLock;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.firstClientReq = firstClientReq;
+    }
+
+    /**
+     * @return {@code True} if first optimistic tx prepare request sent from client node.
+     */
+    public boolean firstClientRequest() {
+        return firstClientReq;
     }
 
     /**
@@ -273,60 +291,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeBoolean("firstClientReq", firstClientReq))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("implicitSingle", implicitSingle))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("last", last))
+                if (!writer.writeBoolean("implicitSingle", implicitSingle))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
+                if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeBoolean("near", near))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeBoolean("retVal", retVal))
+                if (!writer.writeBoolean("near", near))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 33:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 34:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -357,7 +381,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                futId = reader.readIgniteUuid("futId");
+                firstClientReq = reader.readBoolean("firstClientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -365,7 +389,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                implicitSingle = reader.readBoolean("implicitSingle");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -373,7 +397,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                last = reader.readBoolean("last");
+                implicitSingle = reader.readBoolean("implicitSingle");
 
                 if (!reader.isLastRead())
                     return false;
@@ -381,7 +405,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
+                last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
                     return false;
@@ -389,7 +413,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                miniId = reader.readIgniteUuid("miniId");
+                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -397,7 +421,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                near = reader.readBoolean("near");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -405,7 +429,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                retVal = reader.readBoolean("retVal");
+                near = reader.readBoolean("near");
 
                 if (!reader.isLastRead())
                     return false;
@@ -413,7 +437,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
-                subjId = reader.readUuid("subjId");
+                retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -421,7 +445,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 32:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -429,6 +453,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 33:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 34:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -448,7 +480,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index f8c07f7..0f0b2c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -83,6 +85,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @GridDirectCollection(IgniteTxKey.class)
     private Collection<IgniteTxKey> filterFailedKeys;
 
+    /** Not {@code null} if client node should remap transaction. */
+    private AffinityTopologyVersion clientRemapVer;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -95,9 +100,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
      * @param futId Future ID.
      * @param miniId Mini future ID.
      * @param dhtVer DHT version.
+     * @param writeVer Write version.
      * @param invalidParts Invalid partitions.
      * @param retVal Return value.
      * @param err Error.
+     * @param clientRemapVer Not {@code null} if client node should remap transaction.
      */
     public GridNearTxPrepareResponse(
         GridCacheVersion xid,
@@ -107,7 +114,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         GridCacheVersion writeVer,
         Collection<Integer> invalidParts,
         GridCacheReturn retVal,
-        Throwable err
+        Throwable err,
+        AffinityTopologyVersion clientRemapVer
     ) {
         super(xid, err);
 
@@ -121,6 +129,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.writeVer = writeVer;
         this.invalidParts = invalidParts;
         this.retVal = retVal;
+        this.clientRemapVer = clientRemapVer;
+    }
+
+    /**
+     * @return {@code True} if client node should remap transaction.
+     */
+    @Nullable public AffinityTopologyVersion clientRemapVersion() {
+        return clientRemapVer;
     }
 
     /**
@@ -330,60 +346,66 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
         switch (writer.state()) {
             case 10:
-                if (!writer.writeMessage("dhtVer", dhtVer))
+                if (!writer.writeMessage("clientRemapVer", clientRemapVer))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("dhtVer", dhtVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeCollection("filterFailedKeys", filterFailedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeMessage("retVal", retVal))
+                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 19:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 20:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -406,7 +428,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
         switch (reader.state()) {
             case 10:
-                dhtVer = reader.readMessage("dhtVer");
+                clientRemapVer = reader.readMessage("clientRemapVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -414,7 +436,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 11:
-                filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
+                dhtVer = reader.readMessage("dhtVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -422,7 +444,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 12:
-                futId = reader.readIgniteUuid("futId");
+                filterFailedKeys = reader.readCollection("filterFailedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -430,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 13:
-                invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -438,7 +460,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 14:
-                miniId = reader.readIgniteUuid("miniId");
+                invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -446,7 +468,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 15:
-                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -454,7 +476,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 16:
-                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
+                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -462,7 +484,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 17:
-                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -470,7 +492,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 18:
-                retVal = reader.readMessage("retVal");
+                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -478,6 +500,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 19:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -497,7 +527,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 20;
+        return 21;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 6120e25..4adcff5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -41,7 +41,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private GridCachePreloader<K,V> preldr;
+    private GridCachePreloader preldr;
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -56,7 +56,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     public GridLocalCache(GridCacheContext<K, V> ctx) {
         super(ctx, ctx.config().getStartSize());
 
-        preldr = new GridCachePreloaderAdapter<>(ctx);
+        preldr = new GridCachePreloaderAdapter(ctx);
     }
 
     /** {@inheritDoc} */
@@ -65,7 +65,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return preldr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 819b0f0..bcbdec4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -53,7 +53,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();
 
     /** */
-    private GridCachePreloader<K,V> preldr;
+    private GridCachePreloader preldr;
 
     /**
      * Empty constructor required by {@link Externalizable}.
@@ -68,7 +68,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     public GridLocalAtomicCache(GridCacheContext<K, V> ctx) {
         super(ctx, ctx.config().getStartSize());
 
-        preldr = new GridCachePreloaderAdapter<>(ctx);
+        preldr = new GridCachePreloaderAdapter(ctx);
     }
 
     /** {@inheritDoc} */
@@ -94,7 +94,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return preldr;
     }
 
@@ -119,7 +119,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
     @Override public boolean put(K key, V val, CacheEntryPredicate[] filter) throws IgniteCheckedException {
         A.notNull(key, "key", val, "val");
 
-        return (Boolean)updateAllInternal(UPDATE,
+        boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
+        boolean res = (Boolean)updateAllInternal(UPDATE,
             Collections.singleton(key),
             Collections.singleton(val),
             null,
@@ -129,6 +133,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             filter,
             ctx.writeThrough(),
             ctx.readThrough());
+
+        if (statsEnabled)
+            metrics0().addPutTimeNanos(System.nanoTime() - start);
+
+        return res;
     }
 
     /** {@inheritDoc} */
@@ -268,6 +277,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public void putAll(Map<? extends K, ? extends V> m) throws IgniteCheckedException {
+        boolean statsEnabled = ctx.config().isStatisticsEnabled();
+
+        long start = statsEnabled ? System.nanoTime() : 0L;
+
         updateAllInternal(UPDATE,
             m.keySet(),
             m.values(),
@@ -278,6 +291,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
             CU.empty0(),
             ctx.writeThrough(),
             ctx.readThrough());
+
+        if (statsEnabled)
+            metrics0().addPutTimeNanos(System.nanoTime() - start);
     }
 
     /** {@inheritDoc} */
@@ -727,7 +743,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
         final ExpiryPolicy expiry = expiryPerCall();
 
-        return asyncOp(new Callable<Object>() {
+        IgniteInternalFuture fut = asyncOp(new Callable<Object>() {
             @Override public Object call() throws Exception {
                 return updateAllInternal(op,
                     keys,
@@ -741,6 +757,11 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                     readThrough);
             }
         });
+
+        if (ctx.config().isStatisticsEnabled())
+            fut.listen(new UpdatePutTimeStatClosure(metrics0(), System.nanoTime()));
+
+        return fut;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 4b1fc87..7f0a5ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -348,17 +348,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
      * @param duration Duration.
      */
     public void onExecuted(Object res, Throwable err, long startTime, long duration) {
-        boolean fail = err != null;
-
-        // Update own metrics.
-        metrics.onQueryExecute(duration, fail);
-
-        // Update metrics in query manager.
-        cctx.queries().onMetricsUpdate(duration, fail);
-
-        if (log.isDebugEnabled())
-            log.debug("Query execution finished [qry=" + this + ", startTime=" + startTime +
-                ", duration=" + duration + ", fail=" + fail + ", res=" + res + ']');
+        GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log);
     }
 
     /** {@inheritDoc} */
@@ -376,10 +366,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         return execute(null, rmtTransform, args);
     }
 
+    /** {@inheritDoc} */
     @Override public QueryMetrics metrics() {
         return metrics.copy();
     }
 
+    /** {@inheritDoc} */
     @Override public void resetMetrics() {
         metrics = new GridCacheQueryMetricsAdapter();
     }
@@ -470,10 +462,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) {
         assert cctx != null;
 
-        return F.view(CU.allNodes(cctx), new P1<ClusterNode>() {
+        Collection<ClusterNode> affNodes = CU.affinityNodes(cctx);
+
+        if (prj == null)
+            return affNodes;
+
+        return F.view(affNodes, new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode n) {
-                return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
-                    (prj == null || prj.node(n.id()) != null);
+                return prj.node(n.id()) != null;
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
index 2999e7b..15eb368 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
@@ -43,6 +43,8 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection<
 
     /** {@inheritDoc} */
     @Nullable @Override public T next() throws IgniteCheckedException {
+        get();
+
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 16a8028..32e9d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -773,7 +773,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc);
 
-            private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator();
+            private Iterator<K> iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
 
             {
                 advance();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 759a949..6277c5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -250,8 +250,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(CacheEntryUpdatedListener locLsnr, CacheEntryEventSerializableFilter rmtFilter,
-        int bufSize, long timeInterval, boolean autoUnsubscribe, ClusterGroup grp) throws IgniteCheckedException {
+    public UUID executeQuery(CacheEntryUpdatedListener locLsnr,
+        CacheEntryEventSerializableFilter rmtFilter,
+        int bufSize,
+        long timeInterval,
+        boolean autoUnsubscribe,
+        ClusterGroup grp) throws IgniteCheckedException
+    {
         return executeQuery0(
             locLsnr,
             rmtFilter,
@@ -301,7 +306,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         try {
             cctx.kernalContext().continuous().stopRoutine(routineId).get();
         }
-        catch (IgniteCheckedException e) {
+        catch (IgniteCheckedException | IgniteException e) {
             if (log.isDebugEnabled())
                 log.debug("Failed to stop internal continuous query: " + e.getMessage());
         }
@@ -357,9 +362,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr, final CacheEntryEventSerializableFilter rmtFilter,
-        int bufSize, long timeInterval, boolean autoUnsubscribe, boolean internal, boolean notifyExisting,
-        boolean oldValRequired, boolean sync, boolean ignoreExpired, ClusterGroup grp) throws IgniteCheckedException {
+    private UUID executeQuery0(CacheEntryUpdatedListener locLsnr,
+        final CacheEntryEventSerializableFilter rmtFilter,
+        int bufSize,
+        long timeInterval,
+        boolean autoUnsubscribe,
+        boolean internal,
+        boolean notifyExisting,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        ClusterGroup grp) throws IgniteCheckedException
+    {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
         if (grp == null)
@@ -745,7 +759,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            impl = (CacheEntryEventSerializableFilter)in.readObject();
+            impl = (CacheEntryEventFilter)in.readObject();
             types = in.readByte();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
index 5fde622..02fe679 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheOsStoreManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.store;
 
-import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index d9f50ac..a14df6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -68,6 +68,11 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
     public boolean isWriteThrough();
 
     /**
+     * @return {@code True} is write-behind is enabled.
+     */
+    public boolean isWriteBehind();
+
+    /**
      * @return Whether DHT transaction can write to store from DHT.
      */
     public boolean isWriteToStoreFromDht();
@@ -160,7 +165,7 @@ public interface CacheStoreManager<K, V> extends GridCacheManager<K, V> {
      * @param commit Commit.
      * @throws IgniteCheckedException If failed.
      */
-    public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException;
+    public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException;
 
     /**
      * End session initiated by write-behind store.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index f9a966c..b4a146a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -59,11 +59,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     private ThreadLocal<SessionData> sesHolder;
 
     /** */
+    private ThreadLocalSession locSes;
+
+    /** */
     private boolean locStore;
 
     /** */
     private boolean writeThrough;
 
+    /** */
+    private Collection<CacheStoreSessionListener> sesLsnrs;
+
+    /** */
+    private boolean globalSesLsnrs;
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
@@ -84,14 +93,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             sesHolder0 = ((Map<CacheStore, ThreadLocal>)sesHolders).get(cfgStore);
 
             if (sesHolder0 == null) {
-                ThreadLocalSession locSes = new ThreadLocalSession();
+                sesHolder0 = new ThreadLocal<>();
 
-                if (ctx.resource().injectStoreSession(cfgStore, locSes)) {
-                    sesHolder0 = locSes.sesHolder;
+                locSes = new ThreadLocalSession(sesHolder0);
 
+                if (ctx.resource().injectStoreSession(cfgStore, locSes))
                     sesHolders.put(cfgStore, sesHolder0);
-                }
             }
+            else
+                locSes = new ThreadLocalSession(sesHolder0);
         }
 
         sesHolder = sesHolder0;
@@ -148,6 +158,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException("Failed to start cache store: " + e, e);
             }
         }
+
+        CacheConfiguration cfg = cctx.config();
+
+        if (cfgStore != null && !cfg.isWriteThrough() && !cfg.isReadThrough()) {
+            U.quietAndWarn(log,
+                "Persistence store is configured, but both read-through and write-through are disabled. This " +
+                "configuration makes sense if the store implements loadCache method only. If this is the " +
+                "case, ignore this warning. Otherwise, fix the configuration for cache: " + cfg.getName(),
+                "Persistence store is configured, but both read-through and write-through are disabled.");
+        }
+
+        sesLsnrs = CU.startStoreSessionListeners(cctx.kernalContext(), cfg.getCacheStoreSessionListenerFactories());
+
+        if (sesLsnrs == null) {
+            sesLsnrs = cctx.shared().storeSessionListeners();
+
+            globalSesLsnrs = true;
+        }
     }
 
     /** {@inheritDoc} */
@@ -164,6 +192,15 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 U.error(log(), "Failed to stop cache store.", e);
             }
         }
+
+        if (!globalSesLsnrs) {
+            try {
+                CU.stopStoreSessionListeners(cctx.kernalContext(), sesLsnrs);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to stop store session listeners for cache: " + cctx.name(), e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -215,14 +252,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             Object val = null;
 
             try {
                 val = singleThreadGate.load(storeKey);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -234,7 +271,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -264,8 +301,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isWriteBehind() {
+        return cctx.config().isWriteBehindEnabled();
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isWriteToStoreFromDht() {
-        return cctx.config().isWriteBehindEnabled() || locStore;
+        return isWriteBehind() || locStore;
     }
 
     /** {@inheritDoc} */
@@ -349,7 +391,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
@@ -380,7 +422,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 else
                     singleThreadGate.loadAll(keys0, c);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -392,7 +434,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -408,7 +450,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(null);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.loadCache(new IgniteBiInClosure<Object, Object>() {
@@ -431,7 +473,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                     }
                 }, args);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (CacheLoaderException e) {
                 throw new IgniteCheckedException(e);
@@ -440,7 +482,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheLoaderException(e));
             }
             finally {
-                sessionEnd0(null, thewEx);
+                sessionEnd0(null, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -473,12 +515,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.write(new CacheEntryImpl<>(key, locStore ? F.t(val, ver) : val));
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -490,7 +532,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -522,12 +564,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
                 sessionInit0(tx);
 
-                boolean thewEx = true;
+                boolean threwEx = true;
 
                 try {
                     store.writeAll(entries);
 
-                    thewEx = false;
+                    threwEx = false;
                 }
                 catch (ClassCastException e) {
                     handleClassCastException(e);
@@ -548,7 +590,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                     throw new IgniteCheckedException(e);
                 }
                 finally {
-                    sessionEnd0(tx, thewEx);
+                    sessionEnd0(tx, threwEx);
                 }
 
                 if (log.isDebugEnabled())
@@ -576,12 +618,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.delete(key);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -593,7 +635,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(new CacheWriterException(e));
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -606,8 +648,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys)
-        throws IgniteCheckedException {
+    @Override public boolean removeAll(@Nullable IgniteInternalTx tx, Collection keys) throws IgniteCheckedException {
         if (F.isEmpty(keys))
             return true;
 
@@ -625,12 +666,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             sessionInit0(tx);
 
-            boolean thewEx = true;
+            boolean threwEx = true;
 
             try {
                 store.deleteAll(keys0);
 
-                thewEx = false;
+                threwEx = false;
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
@@ -645,7 +686,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
                 throw new IgniteCheckedException(e);
             }
             finally {
-                sessionEnd0(tx, thewEx);
+                sessionEnd0(tx, threwEx);
             }
 
             if (log.isDebugEnabled())
@@ -669,16 +710,27 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
-    @Override public void sessionEnd(IgniteInternalTx tx, boolean commit) throws IgniteCheckedException {
+    @Override public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last) throws IgniteCheckedException {
         assert store != null;
 
         sessionInit0(tx);
 
         try {
-            store.sessionEnd(commit);
+            if (sesLsnrs != null) {
+                for (CacheStoreSessionListener lsnr : sesLsnrs)
+                    lsnr.onSessionEnd(locSes, commit);
+            }
+
+            if (!sesHolder.get().ended(store))
+                store.sessionEnd(commit);
+        }
+        catch (Throwable e) {
+            last = true;
+
+            throw e;
         }
         finally {
-            if (sesHolder != null) {
+            if (last && sesHolder != null) {
                 sesHolder.set(null);
 
                 tx.removeMeta(SES_ATTR);
@@ -715,10 +767,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
      * @param tx Current transaction.
      */
     private void sessionInit0(@Nullable IgniteInternalTx tx) {
-        if (sesHolder == null)
-            return;
-
-        assert sesHolder.get() == null;
+        assert sesHolder != null;
 
         SessionData ses;
 
@@ -738,6 +787,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             ses = new SessionData(null, cctx.name());
 
         sesHolder.set(ses);
+
+        if (sesLsnrs != null && !ses.started(this)) {
+            for (CacheStoreSessionListener lsnr : sesLsnrs)
+                lsnr.onSessionStart(locSes);
+        }
     }
 
     /**
@@ -745,8 +799,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
      */
     private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
         try {
-            if (tx == null)
-                store.sessionEnd(threwEx);
+            if (tx == null) {
+                if (sesLsnrs != null) {
+                    for (CacheStoreSessionListener lsnr : sesLsnrs)
+                        lsnr.onSessionEnd(locSes, !threwEx);
+                }
+
+                assert !sesHolder.get().ended(store);
+
+                store.sessionEnd(!threwEx);
+            }
         }
         catch (Exception e) {
             if (!threwEx)
@@ -788,6 +850,16 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         @GridToStringInclude
         private Map<Object, Object> props;
 
+        /** */
+        private Object attachment;
+
+        /** */
+        private final Set<CacheStoreManager> started =
+            new GridSetWrapper<>(new IdentityHashMap<CacheStoreManager, Object>());
+
+        /** */
+        private final Set<CacheStore> ended = new GridSetWrapper<>(new IdentityHashMap<CacheStore, Object>());
+
         /**
          * @param tx Current transaction.
          * @param cacheName Cache name.
@@ -815,6 +887,24 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         }
 
         /**
+         * @param attachment Attachment.
+         */
+        private Object attach(Object attachment) {
+            Object prev = this.attachment;
+
+            this.attachment = attachment;
+
+            return prev;
+        }
+
+        /**
+         * @return Attachment.
+         */
+        private Object attachment() {
+            return attachment;
+        }
+
+        /**
          * @return Cache name.
          */
         private String cacheName() {
@@ -828,6 +918,21 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
             this.cacheName = cacheName;
         }
 
+        /**
+         * @return If session is started.
+         */
+        private boolean started(CacheStoreManager mgr) {
+            return !started.add(mgr);
+        }
+
+        /**
+         * @param store Cache store.
+         * @return Whether session already ended on this store instance.
+         */
+        private boolean ended(CacheStore store) {
+            return !ended.add(store);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(SessionData.class, this, "tx", CU.txString(tx));
@@ -839,7 +944,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
      */
     private static class ThreadLocalSession implements CacheStoreSession {
         /** */
-        private final ThreadLocal<SessionData> sesHolder = new ThreadLocal<>();
+        private final ThreadLocal<SessionData> sesHolder;
+
+        /**
+         * @param sesHolder Session holder.
+         */
+        private ThreadLocalSession(ThreadLocal<SessionData> sesHolder) {
+            this.sesHolder = sesHolder;
+        }
 
         /** {@inheritDoc} */
         @Nullable @Override public Transaction transaction() {
@@ -854,6 +966,20 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
         }
 
         /** {@inheritDoc} */
+        @Override public Object attach(@Nullable Object attachment) {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? ses0.attach(attachment) : null;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public <T> T attachment() {
+            SessionData ses0 = sesHolder.get();
+
+            return ses0 != null ? (T)ses0.attachment() : null;
+        }
+
+        /** {@inheritDoc} */
         @Override public <K1, V1> Map<K1, V1> properties() {
             SessionData ses0 = sesHolder.get();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 5f877ec..cb86e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -706,4 +706,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      * @return Public API proxy.
      */
     public TransactionProxy proxy();
+
+    /**
+     * @param topVer New topology version.
+     */
+    public void onRemap(AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb8825e..9e8950f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -184,7 +184,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     private AtomicReference<GridFutureAdapter<IgniteInternalTx>> finFut = new AtomicReference<>();
 
     /** Topology version. */
-    private AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
+    protected AtomicReference<AffinityTopologyVersion> topVer = new AtomicReference<>(AffinityTopologyVersion.NONE);
 
     /** Mutex. */
     private final Lock lock = new ReentrantLock();
@@ -405,7 +405,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public boolean storeUsed() {
-        return storeEnabled() && store() != null;
+        if (!storeEnabled())
+            return false;
+
+        Collection<Integer> cacheIds = activeCacheIds();
+
+        if (!cacheIds.isEmpty()) {
+            for (int cacheId : cacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+                if (store.configured())
+                    return true;
+            }
+        }
+
+        return false;
     }
 
     /**
@@ -413,13 +427,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
      *
      * @return Store manager.
      */
-    protected CacheStoreManager store() {
-        if (!activeCacheIds().isEmpty()) {
-            int cacheId = F.first(activeCacheIds());
+    protected Collection<CacheStoreManager> stores() {
+        Collection<Integer> cacheIds = activeCacheIds();
+
+        if (!cacheIds.isEmpty()) {
+            Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
 
-            CacheStoreManager store = cctx.cacheContext(cacheId).store();
+            for (int cacheId : cacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
 
-            return store.configured() ? store : null;
+                if (store.configured())
+                    stores.add(store);
+            }
+
+            return stores;
         }
 
         return null;
@@ -493,13 +514,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public void onRemap(AffinityTopologyVersion topVer) {
+        assert false : this;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean hasTransforms() {
         return transform;
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean markPreparing() {
+    @Override public boolean markPreparing() {
         return preparing.compareAndSet(false, true);
     }
 
@@ -1716,6 +1741,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public void onRemap(AffinityTopologyVersion topVer) {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean empty() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }



Mime
View raw message