ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/23] ignite git commit: Optimization for single key cache 'get' operation.
Date Fri, 20 Nov 2015 05:49:19 GMT
Optimization for single key cache 'get' operation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f103067
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f103067
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f103067

Branch: refs/heads/ignite-sql-opt
Commit: 1f1030670a6e7f9fbad1d939301c884f29b7885a
Parents: ba1d563
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Nov 19 17:29:04 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Nov 19 17:29:04 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  12 +
 .../processors/cache/GridCacheAdapter.java      | 103 ++-
 .../processors/cache/GridCacheAtomicFuture.java |   6 +
 .../processors/cache/GridCacheFuture.java       |  13 -
 .../processors/cache/GridCacheIoManager.java    |  50 +-
 .../processors/cache/GridCacheMessage.java      |  20 +-
 .../processors/cache/GridCacheMvccFuture.java   |   7 +
 .../processors/cache/GridCacheMvccManager.java  | 108 +--
 .../distributed/GridCacheTxRecoveryFuture.java  |  13 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |  27 +-
 .../cache/distributed/dht/CacheGetFuture.java   |  32 +
 .../distributed/dht/GridDhtCacheAdapter.java    | 141 ++++
 .../distributed/dht/GridDhtLockFuture.java      |  16 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   9 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  24 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  17 +-
 .../dht/GridPartitionedGetFuture.java           |  69 +-
 .../dht/GridPartitionedSingleGetFuture.java     | 697 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 127 +++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  84 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  11 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   5 -
 .../dht/colocated/GridDhtColocatedCache.java    | 160 ++++-
 .../colocated/GridDhtColocatedLockFuture.java   |  26 +-
 .../distributed/near/CacheVersionedValue.java   |   2 +-
 .../distributed/near/GridNearCacheAdapter.java  |   4 +-
 .../distributed/near/GridNearGetFuture.java     |  57 +-
 .../distributed/near/GridNearGetRequest.java    |   1 -
 .../distributed/near/GridNearGetResponse.java   |   2 -
 .../distributed/near/GridNearLockFuture.java    |  16 +-
 ...arOptimisticSerializableTxPrepareFuture.java |  17 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  19 +-
 .../GridNearPessimisticTxPrepareFuture.java     |  19 +-
 .../near/GridNearSingleGetRequest.java          | 396 +++++++++++
 .../near/GridNearSingleGetResponse.java         | 321 +++++++++
 .../near/GridNearTransactionalCache.java        |   2 +-
 .../near/GridNearTxFinishFuture.java            |  24 +-
 .../cache/distributed/near/GridNearTxLocal.java | 149 ++--
 .../near/GridNearTxPrepareFutureAdapter.java    |   6 +-
 .../processors/cache/local/GridLocalCache.java  |   4 +-
 .../cache/local/GridLocalLockFuture.java        |   5 -
 .../cache/transactions/IgniteTxHandler.java     |  19 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/transactions/IgniteTxManager.java     |   2 +-
 .../IgniteClientReconnectCacheTest.java         |  11 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  75 ++
 .../GridCacheConcurrentTxMultiNodeTest.java     |  15 -
 .../cache/GridCachePartitionedGetSelfTest.java  |   3 +-
 .../IgniteCacheAbstractStopBusySelfTest.java    |  27 +-
 .../IgniteCacheP2pUnmarshallingErrorTest.java   | 184 +++--
 .../CacheGetFutureHangsSelfTest.java            |   6 +
 .../GridCacheAbstractNodeRestartSelfTest.java   |   2 +
 .../IgniteCacheSingleGetMessageTest.java        | 357 ++++++++++
 .../GridCacheReplicatedMetricsSelfTest.java     |   9 -
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 56 files changed, 2908 insertions(+), 632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 2503eda..3548aac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -83,6 +83,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
@@ -696,6 +698,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 116:
+                msg = new GridNearSingleGetRequest();
+
+                break;
+
+            case 117:
+                msg = new GridNearSingleGetResponse();
+
+                break;
+
             // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cbb7486..562a0eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -599,11 +599,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
+    @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
         A.notNull(key, "key");
 
-        return getAllAsync(
-            Collections.singletonList(key),
+        return (IgniteInternalFuture)getAsync(
+            key,
             /*force primary*/false,
             /*skip tx*/false,
             /*subj id*/null,
@@ -611,15 +611,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*deserialize portable*/false,
             /*skip values*/true,
             /*can remap*/true
-        ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-            @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
-                Map<K, V> map = fut.get();
-
-                assert map.isEmpty() || map.size() == 1 : map.size();
-
-                return map.isEmpty() ? false : map.values().iterator().next() != null;
-            }
-        });
+        );
     }
 
     /** {@inheritDoc} */
@@ -1473,6 +1465,52 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * @param key Key.
+     * @param forcePrimary Force primary.
+     * @param skipTx Skip tx.
+     * @param subjId Subj Id.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable.
+     * @param skipVals Skip values.
+     * @param canRemap Can remap flag.
+     * @return Future for the get operation.
+     */
+    protected IgniteInternalFuture<V> getAsync(
+        final K key,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        final boolean skipVals,
+        boolean canRemap
+    ) {
+        return getAllAsync(Collections.singletonList(key),
+            forcePrimary,
+            skipTx,
+            subjId,
+            taskName,
+            deserializePortable,
+            skipVals,
+            canRemap).chain(
+            new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+                @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+                    Map<K, V> map = e.get();
+
+                    assert map.isEmpty() || map.size() == 1 : map.size();
+
+                    if (skipVals) {
+                        Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
+
+                        return (V)(val);
+                    }
+
+                    return map.get(key);
+                }
+            });
+    }
+
+    /**
      * @param keys Keys.
      * @param forcePrimary Force primary.
      * @param skipTx Skip tx.
@@ -1524,7 +1562,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Future for the get operation.
      * @see GridCacheAdapter#getAllAsync(Collection)
      */
-    public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
+    public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
         boolean readThrough,
         boolean checkTx,
         @Nullable final UUID subjId,
@@ -1605,11 +1643,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                 final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();
 
+                final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled();
+
                 Map<KeyCacheObject, GridCacheVersion> misses = null;
 
                 for (KeyCacheObject key : keys) {
                     while (true) {
-                        GridCacheEntryEx entry = entryEx(key);
+                        GridCacheEntryEx entry = needEntry ? entryEx(key) : peekEx(key);
+
+                        if (entry == null) {
+                            if (!skipVals && ctx.config().isStatisticsEnabled())
+                                ctx.cache().metrics0().onRead(false);
+
+                            break;
+                        }
 
                         try {
                             T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
@@ -4389,11 +4436,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     @Nullable public V get(K key, boolean deserializePortable)
         throws IgniteCheckedException {
-        Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get();
-
-        assert map.isEmpty() || map.size() == 1 : map.size();
-
-        return map.get(key);
+        return getAsync(key, deserializePortable).get();
     }
 
     /**
@@ -4409,16 +4452,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             return new GridFinishedFuture<>(e);
         }
 
-        return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
-            new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
-                    @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
-                    Map<K, V> map = e.get();
-
-                    assert map.isEmpty() || map.size() == 1 : map.size();
+        String taskName = ctx.kernalContext().job().currentTaskName();
 
-                    return map.get(key);
-                }
-            });
+        return getAsync(key,
+            !ctx.config().isReadFromBackup(),
+            /*skip tx*/false,
+            null,
+            taskName,
+            deserializePortable,
+            false,
+            /*can remap*/true);
     }
 
     /**
@@ -4445,10 +4488,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         return getAllAsync(keys,
             !ctx.config().isReadFromBackup(),
             /*skip tx*/false,
-            null,
+            /*subject id*/null,
             taskName,
             deserializePortable,
-            false,
+            /*skip vals*/false,
             /*can remap*/true);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index be35c5c..359909e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -20,12 +20,18 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Collection;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
  * Update future for atomic cache.
  */
 public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
     /**
+     * @return Future version.
+     */
+    public GridCacheVersion version();
+
+    /**
      * Gets future that will be completed when it is safe when update is finished on the given version of topology.
      *
      * @param topVer Topology version to finish.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
index caa3d3f..8bf8d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
@@ -17,11 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Collection;
 import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.lang.IgniteUuid;
 
 /**
@@ -34,16 +31,6 @@ public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
     public IgniteUuid futureId();
 
     /**
-     * @return Future version.
-     */
-    public GridCacheVersion version();
-
-    /**
-     * @return Involved nodes.
-     */
-    public Collection<? extends ClusterNode> nodes();
-
-    /**
      * Callback for when node left.
      *
      * @param nodeId Left node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2334780..9afbca8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,22 +34,24 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
@@ -437,7 +439,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case 50: {
                 GridNearGetResponse res = (GridNearGetResponse)msg;
 
-                GridCacheFuture fut = ctx.mvcc().future(res.version(), res.futureId());
+                CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
 
                 if (fut == null) {
                     if (log.isDebugEnabled())
@@ -448,10 +450,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(res.classError());
 
-                if (fut instanceof GridNearGetFuture)
-                    ((GridNearGetFuture)fut).onResult(nodeId, res);
-                else
-                    ((GridPartitionedGetFuture)fut).onResult(nodeId, res);
+                fut.onResult(nodeId, res);
             }
 
             break;
@@ -521,6 +520,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             break;
 
+            case 116: {
+                GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
+
+                GridNearSingleGetResponse res = new GridNearSingleGetResponse(
+                    ctx.cacheId(),
+                    req.futureId(),
+                    req.topologyVersion(),
+                    null,
+                    false,
+                    req.deployInfo() != null);
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
+            case 117: {
+                GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
+
+                GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
+
+                if (fut == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+                    return;
+                }
+
+                res.error(res.classError());
+
+                fut.onResult(nodeId, res);
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bdd2118..61136bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -499,15 +499,21 @@ public abstract class GridCacheMessage implements Message {
 
         int size = col.size();
 
-        for (int i = 0 ; i < size; i++) {
-            CacheObject obj = col.get(i);
+        for (int i = 0 ; i < size; i++)
+            prepareMarshalCacheObject(col.get(i), ctx);
+    }
 
-            if (obj != null) {
-                obj.prepareMarshal(ctx.cacheObjectContext());
+    /**
+     * @param obj Object.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+        if (obj != null) {
+            obj.prepareMarshal(ctx.cacheObjectContext());
 
-                if (addDepInfo)
-                    prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
-            }
+            if (addDepInfo)
+                prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
index 67c1330..080a6f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccFuture.java
@@ -17,11 +17,18 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
 /**
  * Distributed future aware of MVCC locking.
  */
 public interface GridCacheMvccFuture<T> extends GridCacheFuture<T> {
     /**
+     * @return Future version.
+     */
+    public GridCacheVersion version();
+
+    /**
      * @param entry Entry which received new owner.
      * @param owner Owner.
      * @return {@code True} if future cares about this entry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 8562f37..9104acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -101,12 +99,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
     /** Active futures mapped by version ID. */
     @GridToStringExclude
-    private final ConcurrentMap<GridCacheVersion, Collection<GridCacheFuture<?>>> futs = newMap();
+    private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
 
     /** Pending atomic futures. */
     private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
+
     /** Near to DHT version mapping. */
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
@@ -137,12 +138,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     prev + ']');
 
             if (owner != null && (owner.local() || owner.nearLocal())) {
-                Collection<? extends GridCacheFuture> futCol = futs.get(owner.version());
+                Collection<GridCacheMvccFuture<?>> futCol = mvccFuts.get(owner.version());
 
                 if (futCol != null) {
                     synchronized (futCol) {
-                        for (GridCacheFuture fut : futCol) {
-                            if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
+                        for (GridCacheMvccFuture<?> fut : futCol) {
+                            if (!fut.isDone()) {
                                 GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
 
                                 // Since this method is called outside of entry synchronization,
@@ -206,18 +207,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             for (GridCacheFuture<?> fut : activeFutures())
                 fut.onNodeLeft(discoEvt.eventNode().id());
 
-            for (IgniteInternalFuture<?> fut : atomicFuts.values()) {
-                if (fut instanceof GridCacheFuture) {
-                    GridCacheFuture cacheFut = (GridCacheFuture)fut;
+            for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) {
+                cacheFut.onNodeLeft(discoEvt.eventNode().id());
 
-                    cacheFut.onNodeLeft(discoEvt.eventNode().id());
+                if (cacheFut.isCancelled() || cacheFut.isDone()) {
+                    GridCacheVersion futVer = cacheFut.version();
 
-                    if (cacheFut.isCancelled() || cacheFut.isDone()) {
-                        GridCacheVersion futVer = cacheFut.version();
-
-                        if (futVer != null)
-                            atomicFuts.remove(futVer, fut);
-                    }
+                    if (futVer != null)
+                        atomicFuts.remove(futVer, cacheFut);
                 }
             }
         }
@@ -261,12 +258,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public Collection<GridCacheFuture<?>> activeFutures() {
         ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
 
-        for (Collection<GridCacheFuture<?>> verFuts : futs.values()) {
-            synchronized (verFuts) {
-                col.addAll(verFuts);
+        for (Collection<GridCacheMvccFuture<?>> futs : mvccFuts.values()) {
+            synchronized (futs) {
+                col.addAll(futs);
             }
         }
 
+        col.addAll(futs.values());
+
         return col;
     }
 
@@ -420,13 +419,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param fut Future.
+     * @param futId Future ID.
+     */
+    public void addFuture(final GridCacheFuture<?> fut, final IgniteUuid futId) {
+        GridCacheFuture<?> old = futs.put(futId, fut);
+
+        assert old == null : old;
+
+        onFutureAdded(fut);
+    }
+
+    /**
      * Adds future.
      *
      * @param fut Future.
      * @return {@code True} if added.
      */
     @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
-    public boolean addFuture(final GridCacheFuture<?> fut) {
+    public boolean addFuture(final GridCacheMvccFuture<?> fut) {
         if (fut.isDone()) {
             fut.markNotTrackable();
 
@@ -437,10 +448,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             return true;
 
         while (true) {
-            Collection<GridCacheFuture<?>> old = futs.get(fut.version());
+            Collection<GridCacheMvccFuture<?>> old = mvccFuts.get(fut.version());
 
             if (old == null) {
-                Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
+                Collection<GridCacheMvccFuture<?>> col = new HashSet<GridCacheMvccFuture<?>>(U.capacity(4), 0.75f) {
                     {
                         // Make sure that we add future to queue before
                         // adding queue to the map of futures.
@@ -456,7 +467,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     }
                 };
 
-                old = futs.putIfAbsent(fut.version(), col);
+                old = mvccFuts.putIfAbsent(fut.version(), col);
             }
 
             if (old != null) {
@@ -471,7 +482,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
                 // Future is being removed, so we force-remove here and try again.
                 if (empty) {
-                    if (futs.remove(fut.version(), old)) {
+                    if (mvccFuts.remove(fut.version(), old)) {
                         if (log.isDebugEnabled())
                             log.debug("Removed future list from futures map for lock version: " + fut.version());
                     }
@@ -501,16 +512,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             break;
         }
 
-        // Close window in case of node is gone before the future got added to
-        // the map of futures.
-        for (ClusterNode n : fut.nodes()) {
-            if (cctx.discovery().node(n.id()) == null)
-                fut.onNodeLeft(n.id());
-        }
-
         // Just in case if future was completed before it was added.
         if (fut.isDone())
-            removeFuture(fut);
+            removeMvccFuture(fut);
         else
             onFutureAdded(fut);
 
@@ -537,15 +541,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param futId Future ID.
+     */
+    public void removeFuture(IgniteUuid futId) {
+        futs.remove(futId);
+    }
+
+    /**
      * @param fut Future to remove.
      * @return {@code True} if removed.
      */
     @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"})
-    public boolean removeFuture(GridCacheFuture<?> fut) {
+    public boolean removeMvccFuture(GridCacheMvccFuture<?> fut) {
         if (!fut.trackable())
             return true;
 
-        Collection<GridCacheFuture<?>> cur = futs.get(fut.version());
+        Collection<GridCacheMvccFuture<?>> cur = mvccFuts.get(fut.version());
 
         if (cur == null)
             return false;
@@ -565,7 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         else if (log.isDebugEnabled())
             log.debug("Attempted to remove a non-registered future (has it been already removed?): " + fut);
 
-        if (empty && futs.remove(fut.version(), cur))
+        if (empty && mvccFuts.remove(fut.version(), cur))
             if (log.isDebugEnabled())
                 log.debug("Removed future list from futures map for lock version: " + fut.version());
 
@@ -580,12 +591,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Future.
      */
     @SuppressWarnings({"unchecked"})
-    @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
-        Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
+    @Nullable public GridCacheMvccFuture<?> mvccFuture(GridCacheVersion ver, IgniteUuid futId) {
+        Collection<GridCacheMvccFuture<?>> futs = this.mvccFuts.get(ver);
 
         if (futs != null) {
             synchronized (futs) {
-                for (GridCacheFuture<?> fut : futs) {
+                for (GridCacheMvccFuture<?> fut : futs) {
                     if (fut.futureId().equals(futId)) {
                         if (log.isDebugEnabled())
                             log.debug("Found future in futures map: " + fut);
@@ -603,22 +614,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Gets all futures for given lock version, possibly empty collection.
-     *
-     * @param ver Version.
-     * @return All futures for given lock version.
+     * @param futId Future ID.
+     * @return Found future.
      */
-    @SuppressWarnings("unchecked")
-    public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
-        Collection c = futs.get(ver);
-
-        if (c == null)
-            return Collections.<IgniteInternalFuture<T>>emptyList();
-        else {
-            synchronized (c) {
-                return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c);
-            }
-        }
+    @Nullable public GridCacheFuture future(IgniteUuid futId) {
+        return futs.get(futId);
     }
 
     /**
@@ -913,7 +913,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
         X.println(">>>   rmvLocksSize: " + rmvLocks.sizex());
         X.println(">>>   lockedSize: " + locked.size());
-        X.println(">>>   futsSize: " + futs.size());
+        X.println(">>>   futsSize: " + (mvccFuts.size() + futs.size()));
         X.println(">>>   near2dhtSize: " + near2dht.size());
         X.println(">>>   finishFutsSize: " + finishFuts.sizex());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 01c4867..1648de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -389,16 +388,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return nodes.values();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         for (IgniteInternalFuture<?> fut : futures())
             if (isMini(fut)) {
@@ -424,7 +413,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeFuture(futId);
 
             if (err == null) {
                 assert res != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 721ba4e..245ffc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -18,9 +18,12 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -40,7 +43,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger;
  *
  */
 public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+    implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
     /** Default max remap count value. */
     public static final int DFLT_MAX_REMAP_CNT = 3;
 
@@ -155,4 +158,26 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
 
         map.put(key, new T2<>(skipVals ? true : val, ver));
     }
+
+    /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
new file mode 100644
index 0000000..ebe2cff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+
+/**
+ *
+ */
+public interface CacheGetFuture {
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    public void onResult(UUID nodeId, GridNearGetResponse res);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index bdd1140..8537357 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -57,9 +57,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -76,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -110,6 +114,46 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @param nodeId Sender node ID.
+     * @param res Near get response.
+     */
+    protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
+        if (log.isDebugEnabled())
+            log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
+
+        CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param res Near get response.
+     */
+    protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) {
+        if (log.isDebugEnabled())
+            log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
+
+        GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
      * @param ctx Context.
      */
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
@@ -669,6 +713,103 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param nodeId Node ID.
      * @param req Get request.
      */
+    protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
+        assert ctx.affinityNode();
+
+        long ttl = req.accessTtl();
+
+        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
+
+        LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+        map.put(req.key(), req.addReader());
+
+        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
+            getDhtAsync(nodeId,
+                req.messageId(),
+                map,
+                req.readThrough(),
+                req.topologyVersion(),
+                req.subjectId(),
+                req.taskNameHash(),
+                expiryPlc,
+                req.skipValues());
+
+        fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+            @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+                GridNearSingleGetResponse res;
+
+                GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
+                    (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+
+                try {
+                    Collection<GridCacheEntryInfo> entries = fut.get();
+
+                    if (F.isEmpty(fut.invalidPartitions())) {
+                        GridCacheEntryInfo info = F.first(entries);
+
+                        Message res0 = null;
+
+                        if (info != null) {
+                            if (req.needEntryInfo()) {
+                                info.key(null);
+
+                                res0 = info;
+                            } else if (req.needVersion())
+                                res0 = new CacheVersionedValue(info.value(), info.version());
+                            else
+                                res0 = info.value();
+                        }
+
+                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                            req.futureId(),
+                            req.topologyVersion(),
+                            res0,
+                            false,
+                            req.addDeploymentInfo());
+
+                        if (info != null && req.skipValues())
+                            res.setContainsValue();
+                    }
+                    else {
+                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                            req.futureId(),
+                            ctx.shared().exchange().readyAffinityVersion(),
+                            null,
+                            true,
+                            req.addDeploymentInfo());
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed processing get request: " + req, e);
+
+                    res = new GridNearSingleGetResponse(ctx.cacheId(),
+                        req.futureId(),
+                        req.topologyVersion(),
+                        null,
+                        false,
+                        req.addDeploymentInfo());
+
+                    res.error(e);
+                }
+
+                try {
+                    ctx.io().send(nodeId, res, ctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+                        ",req=" + req + ", res=" + res + ']', e);
+                }
+
+                sendTtlUpdateRequest(expiryPlc);
+            }
+        });
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Get request.
+     */
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
         assert !req.reload() : req;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 7284fd4..a7978c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -262,20 +262,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             log.debug("Added invalid partition to future [invalidPart=" + invalidPart + ", fut=" + this + ']');
     }
 
-    /**
-     * @return Participating nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                if (isMini(f))
-                    return ((MiniFuture)f).node();
-
-                return cctx.discovery().localNode();
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
         return lockVer;
@@ -756,7 +742,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                 log.debug("Completing future: " + this);
 
             // Clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             if (timeoutObj != null)
                 cctx.time().removeTimeoutObject(timeoutObj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 35f63e3..2468cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
@@ -126,6 +127,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+            @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
+                processNearSingleGetRequest(nodeId, req);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest>() {
             @Override public void apply(UUID nodeId, GridNearLockRequest req) {
                 processNearLockRequest(nodeId, req);
@@ -566,7 +573,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse res) {
         assert nodeId != null;
         assert res != null;
-        GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>future(res.version(), res.futureId());
+        GridDhtLockFuture fut = (GridDhtLockFuture)ctx.mvcc().<Boolean>mvccFuture(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 992bd66..bb370a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
     private GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
-    private IgniteUuid futId;
+    private final IgniteUuid futId;
 
     /** Transaction. */
     @GridToStringExclude
@@ -115,26 +115,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /**
-     * @return Involved nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         for (IgniteInternalFuture<?> fut : futures())
             if (isMini(fut)) {
@@ -228,7 +208,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 this.tx.sendFinishReply(commit, error());
 
                 // Don't forget to clean up.
-                cctx.mvcc().removeFuture(this);
+                cctx.mvcc().removeFuture(futId);
 
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 2bed931..f344d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -490,7 +490,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true);
 
-        cctx.mvcc().addFuture(fut);
+        cctx.mvcc().addFuture(fut, fut.futureId());
 
         GridDhtTxPrepareFuture prep = prepFut.get();
 
@@ -580,7 +580,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
 
         final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*rollback*/false);
 
-        cctx.mvcc().addFuture(fut);
+        cctx.mvcc().addFuture(fut, fut.futureId());
 
         if (prepFut == null) {
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d081c0c..4cb5d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -243,21 +243,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         return tx.xidVersion();
     }
 
-    /**
-     * @return Involved nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
     /** {@inheritDoc} */
     @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
         if (log.isDebugEnabled())
@@ -823,7 +808,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
         if (super.onDone(res, err.get())) {
             // Don't forget to clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index febe9ba..c3d9836 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -57,10 +56,11 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.*;
+
 /**
  * Colocated get future.
  */
@@ -71,15 +71,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
+    /** Dummy version sent to older nodes for backward compatibility, */
+    private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0);
+
     /** Logger. */
     private static IgniteLogger log;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Version. */
-    private GridCacheVersion ver;
-
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -126,8 +126,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
         this.topVer = topVer;
 
-        ver = cctx.versions().next();
-
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
     }
@@ -160,25 +158,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return ver;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
@@ -219,7 +198,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         if (super.onDone(res, err)) {
             // Don't forget to clean up.
             if (trackable)
-                cctx.mvcc().removeFuture(this);
+                cctx.mvcc().removeFuture(futId);
 
             cache().sendTtlUpdateRequest(expiryPlc);
 
@@ -274,9 +253,11 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             add(new GridFinishedFuture<>(locVals));
 
         if (hasRmtNodes) {
-            trackable = true;
+            if (!trackable) {
+                trackable = true;
 
-            cctx.mvcc().addFuture(this);
+                cctx.mvcc().addFuture(this, futId);
+            }
         }
 
         // Create mini futures.
@@ -343,7 +324,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     cctx.cacheId(),
                     futId,
                     fut.futureId(),
-                    ver,
+                    n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER,
                     mappedKeys,
                     readThrough,
                     topVer,
@@ -390,7 +371,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         boolean remote = false;
 
         // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
+                cctx.affinity().primary(cctx.localNode(), key, topVer);
 
         while (true) {
             GridCacheEntryEx entry;
@@ -521,28 +503,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     }
 
     /**
-     * Finds affinity node to send get request to.
-     *
-     * @param key Key to get.
-     * @param topVer Topology version.
-     * @return Affinity node from which the key will be requested.
-     */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        if (!canRemap) {
-            List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
-
-            for (ClusterNode node : nodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return cctx.affinity().primary(key, topVer);
-    }
-
-    /**
      * @param infos Entry infos.
      * @return Result map.
      */
@@ -557,7 +517,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
                 if (needVer)
                     versionedResult(map, info.key(), info.value(), info.version());
-                else
+                else {
                     cctx.addResult(map,
                         info.key(),
                         info.value(),
@@ -565,6 +525,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                         keepCacheObjects,
                         deserializePortable,
                         false);
+                }
             }
 
             return map;


Mime
View raw message