ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: 'Single get' operation optimization.
Date Tue, 17 Nov 2015 14:14:18 GMT
'Single get' operation optimization.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b9ecdc6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b9ecdc6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b9ecdc6

Branch: refs/heads/ignite-single-op-get
Commit: 1b9ecdc657893a513feb7563d85ee219f4b40a80
Parents: 00056fe
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 17 16:31:57 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Nov 17 17:11:43 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  12 +
 .../processors/cache/GridCacheAdapter.java      |  70 +-
 .../processors/cache/GridCacheIoManager.java    |  13 +-
 .../processors/cache/GridCacheMessage.java      |  20 +-
 .../processors/cache/GridCacheMvccManager.java  |  30 +
 .../dht/CacheDistributedGetFutureAdapter.java   |  27 +-
 .../cache/distributed/dht/CacheGetFuture.java   |  32 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  98 +++
 .../dht/GridPartitionedGetFuture.java           |  42 +-
 .../dht/GridPartitionedSingleGetFuture.java     | 679 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 129 +++-
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../distributed/near/CacheVersionedValue.java   |   2 +-
 .../distributed/near/GridNearCacheAdapter.java  |   4 +-
 .../distributed/near/GridNearGetFuture.java     |  22 -
 .../distributed/near/GridNearGetRequest.java    |   1 -
 .../distributed/near/GridNearGetResponse.java   |   2 -
 .../near/GridNearSingleGetRequest.java          | 396 +++++++++++
 .../near/GridNearSingleGetResponse.java         | 304 +++++++++
 19 files changed, 1790 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ae8c753..e7b61a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -83,6 +83,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
@@ -690,6 +692,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 115:
+                msg = new GridNearSingleGetRequest();
+
+                break;
+
+            case 116:
+                msg = new GridNearSingleGetResponse();
+
+                break;
+
             // [-3..114] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index cbb7486..14b45a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1473,6 +1473,46 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * @param key Key.
+     * @param forcePrimary Force primary.
+     * @param skipTx Skip tx.
+     * @param subjId Subj Id.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable.
+     * @param skipVals Skip values.
+     * @param canRemap Can remap flag.
+     * @return Future for the get operation.
+     */
+    protected IgniteInternalFuture<V> getAsync(
+        final K key,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        boolean skipVals,
+        boolean canRemap
+    ) {
+        return getAllAsync(Collections.singletonList(key),
+            forcePrimary,
+            skipTx,
+            subjId,
+            taskName,
+            deserializePortable,
+            skipVals,
+            canRemap).chain(
+            new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+                @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+                    Map<K, V> map = e.get();
+
+                    assert map.isEmpty() || map.size() == 1 : map.size();
+
+                    return map.get(key);
+                }
+            });
+    }
+
+    /**
      * @param keys Keys.
      * @param forcePrimary Force primary.
      * @param skipTx Skip tx.
@@ -1524,7 +1564,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Future for the get operation.
      * @see GridCacheAdapter#getAllAsync(Collection)
      */
-    public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
+    public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys,
         boolean readThrough,
         boolean checkTx,
         @Nullable final UUID subjId,
@@ -4389,11 +4429,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     @Nullable public V get(K key, boolean deserializePortable)
         throws IgniteCheckedException {
-        Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get();
-
-        assert map.isEmpty() || map.size() == 1 : map.size();
-
-        return map.get(key);
+        return getAsync(key, deserializePortable).get();
     }
 
     /**
@@ -4409,16 +4445,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             return new GridFinishedFuture<>(e);
         }
 
-        return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
-            new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
-                    @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
-                    Map<K, V> map = e.get();
-
-                    assert map.isEmpty() || map.size() == 1 : map.size();
+        String taskName = ctx.kernalContext().job().currentTaskName();
 
-                    return map.get(key);
-                }
-            });
+        return getAsync(key,
+            !ctx.config().isReadFromBackup(),
+            /*skip tx*/false,
+            null,
+            taskName,
+            deserializePortable,
+            false,
+            /*can remap*/true);
     }
 
     /**
@@ -4445,10 +4481,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         return getAllAsync(keys,
             !ctx.config().isReadFromBackup(),
             /*skip tx*/false,
-            null,
+            /*subject id*/null,
             taskName,
             deserializePortable,
-            false,
+            /*skip vals*/false,
             /*can remap*/true);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2334780..a88445d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,18 +34,17 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
@@ -437,7 +436,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             case 50: {
                 GridNearGetResponse res = (GridNearGetResponse)msg;
 
-                GridCacheFuture fut = ctx.mvcc().future(res.version(), res.futureId());
+                GridCacheFuture fut = ctx.mvcc().future(res.futureId());
+
+                if (fut == null)
+                    fut = ctx.mvcc().future(res.version(), res.futureId());
 
                 if (fut == null) {
                     if (log.isDebugEnabled())
@@ -448,10 +450,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 res.error(res.classError());
 
-                if (fut instanceof GridNearGetFuture)
-                    ((GridNearGetFuture)fut).onResult(nodeId, res);
-                else
-                    ((GridPartitionedGetFuture)fut).onResult(nodeId, res);
+                ((CacheGetFuture)fut).onResult(nodeId, res);
             }
 
             break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index bdd2118..61136bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -499,15 +499,21 @@ public abstract class GridCacheMessage implements Message {
 
         int size = col.size();
 
-        for (int i = 0 ; i < size; i++) {
-            CacheObject obj = col.get(i);
+        for (int i = 0 ; i < size; i++)
+            prepareMarshalCacheObject(col.get(i), ctx);
+    }
 
-            if (obj != null) {
-                obj.prepareMarshal(ctx.cacheObjectContext());
+    /**
+     * @param obj Object.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected final void prepareMarshalCacheObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException {
+        if (obj != null) {
+            obj.prepareMarshal(ctx.cacheObjectContext());
 
-                if (addDepInfo)
-                    prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
-            }
+            if (addDepInfo)
+                prepareObject(obj.value(ctx.cacheObjectContext(), false), ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 2c14209..c289725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -111,6 +111,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs2 = new ConcurrentHashMap8<>();
+
     /** Near to DHT version mapping. */
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
@@ -271,6 +274,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             }
         }
 
+        col.addAll(futs2.values());
+
         return col;
     }
 
@@ -424,6 +429,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param fut Future.
+     * @param futId Future ID.
+     */
+    public void addFuture(final GridCacheFuture<?> fut, final IgniteUuid futId) {
+        GridCacheFuture<?> old = futs2.put(futId, fut);
+
+        assert old == null : old;
+    }
+
+    /**
      * Adds future.
      *
      * @param fut Future.
@@ -541,6 +556,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param futId Future ID.
+     */
+    public void removeFuture(IgniteUuid futId) {
+        futs2.remove(futId);
+    }
+
+    /**
      * @param fut Future to remove.
      * @return {@code True} if removed.
      */
@@ -607,6 +629,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param futId Future ID.
+     * @return Found future.
+     */
+    @Nullable public GridCacheFuture future(IgniteUuid futId) {
+        return futs2.get(futId);
+    }
+
+    /**
      * Gets all futures for given lock version, possibly empty collection.
      *
      * @param ver Version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 721ba4e..245ffc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -18,9 +18,12 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -40,7 +43,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger;
  *
  */
 public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+    implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
     /** Default max remap count value. */
     public static final int DFLT_MAX_REMAP_CNT = 3;
 
@@ -155,4 +158,26 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
 
         map.put(key, new T2<>(skipVals ? true : val, ver));
     }
+
+    /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
new file mode 100644
index 0000000..ebe2cff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheGetFuture.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+
+/**
+ *
+ */
+public interface CacheGetFuture {
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    public void onResult(UUID nodeId, GridNearGetResponse res);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index bdd1140..85a6519 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -57,9 +57,12 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -76,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -669,6 +673,100 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param nodeId Node ID.
      * @param req Get request.
      */
+    protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) {
+        assert ctx.affinityNode();
+
+        long ttl = req.accessTtl();
+
+        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
+
+        LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+        map.put(req.key(), req.addReader());
+
+        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
+            getDhtAsync(nodeId,
+                req.messageId(),
+                map,
+                req.readThrough(),
+                req.topologyVersion(),
+                req.subjectId(),
+                req.taskNameHash(),
+                expiryPlc,
+                req.skipValues());
+
+        fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+            @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) {
+                GridNearSingleGetResponse res;
+
+                GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
+                    (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+
+                try {
+                    Collection<GridCacheEntryInfo> entries = fut.get();
+
+                    if (F.isEmpty(fut.invalidPartitions())) {
+                        GridCacheEntryInfo info = F.first(entries);
+
+                        Message res0 = null;
+
+                        if (info != null) {
+                            if (req.needEntryInfo()) {
+                                info.key(null);
+
+                                res0 = info;
+                            } else if (req.needVersion())
+                                res0 = new CacheVersionedValue(info.value(), info.version());
+                            else
+                                res0 = info.value();
+                        }
+
+                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                            req.futureId(),
+                            req.topologyVersion(),
+                            res0,
+                            false,
+                            req.addDeploymentInfo());
+                    }
+                    else {
+                        res = new GridNearSingleGetResponse(ctx.cacheId(),
+                            req.futureId(),
+                            ctx.shared().exchange().readyAffinityVersion(),
+                            null,
+                            true,
+                            req.addDeploymentInfo());
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed processing get request: " + req, e);
+
+                    res = new GridNearSingleGetResponse(ctx.cacheId(),
+                        req.futureId(),
+                        req.topologyVersion(),
+                        null,
+                        false,
+                        req.addDeploymentInfo());
+
+                    res.error(e);
+                }
+
+                try {
+                    ctx.io().send(nodeId, res, ctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+                        ",req=" + req + ", res=" + res + ']', e);
+                }
+
+                sendTtlUpdateRequest(expiryPlc);
+            }
+        });
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Get request.
+     */
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
         assert !req.reload() : req;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index febe9ba..6be56e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -61,6 +61,8 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture.*;
+
 /**
  * Colocated get future.
  */
@@ -71,15 +73,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
+    /** Dummy version sent to older nodes for backward compatibility, */
+    private static final GridCacheVersion DUMMY_VER = new GridCacheVersion(0, 0, 0, 0);
+
     /** Logger. */
     private static IgniteLogger log;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Version. */
-    private GridCacheVersion ver;
-
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -126,8 +128,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
         this.topVer = topVer;
 
-        ver = cctx.versions().next();
-
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
     }
@@ -161,7 +161,9 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
     /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
-        return ver;
+        assert false : this;
+
+        return null;
     }
 
     /** {@inheritDoc} */
@@ -219,7 +221,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         if (super.onDone(res, err)) {
             // Don't forget to clean up.
             if (trackable)
-                cctx.mvcc().removeFuture(this);
+                cctx.mvcc().removeFuture(futId);
 
             cache().sendTtlUpdateRequest(expiryPlc);
 
@@ -276,7 +278,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         if (hasRmtNodes) {
             trackable = true;
 
-            cctx.mvcc().addFuture(this);
+            cctx.mvcc().addFuture(this, futId);
         }
 
         // Create mini futures.
@@ -343,7 +345,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     cctx.cacheId(),
                     futId,
                     fut.futureId(),
-                    ver,
+                    n.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0 ? null : DUMMY_VER,
                     mappedKeys,
                     readThrough,
                     topVer,
@@ -521,28 +523,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     }
 
     /**
-     * Finds affinity node to send get request to.
-     *
-     * @param key Key to get.
-     * @param topVer Topology version.
-     * @return Affinity node from which the key will be requested.
-     */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        if (!canRemap) {
-            List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
-
-            for (ClusterNode node : nodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return cctx.affinity().primary(key, topVer);
-    }
-
-    /**
      * @param infos Entry infos.
      * @return Result map.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
new file mode 100644
index 0000000..66631fd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+    CacheGetFuture {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Context. */
+    private final GridCacheContext cctx;
+
+    /** Key. */
+    private final KeyCacheObject key;
+
+    /** Read through flag. */
+    private final boolean readThrough;
+
+    /** Force primary flag. */
+    private final boolean forcePrimary;
+
+    /** Future ID. */
+    private final IgniteUuid futId;
+
+    /** Trackable flag. */
+    private boolean trackable;
+
+    /** Subject ID. */
+    private final UUID subjId;
+
+    /** Task name. */
+    private final String taskName;
+
+    /** Whether to deserialize portable objects. */
+    private boolean deserializePortable;
+
+    /** Skip values flag. */
+    private boolean skipVals;
+
+    /** Expiry policy. */
+    private IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Flag indicating that get should be done on a locked topology version. */
+    private final boolean canRemap;
+
+    /** */
+    private final boolean needVer;
+
+    /** */
+    private final boolean keepCacheObjects;
+
+    /** */
+    private ClusterNode node;
+
+    /**
+     * @param cctx Context.
+     * @param key Key.
+     * @param topVer Topology version.
+     * @param readThrough Read through flag.
+     * @param forcePrimary If {@code true} then will force network trip to primary node even if called on backup node.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObjects Keep cache objects flag.
+     */
+    public GridPartitionedSingleGetFuture(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean readThrough,
+        boolean forcePrimary,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObjects
+    ) {
+        assert key != null;
+
+        this.cctx = cctx;
+        this.key = key;
+        this.readThrough = readThrough;
+        this.forcePrimary = forcePrimary;
+        this.subjId = subjId;
+        this.taskName = taskName;
+        this.deserializePortable = deserializePortable;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+        this.canRemap = canRemap;
+        this.needVer = needVer;
+        this.keepCacheObjects = keepCacheObjects;
+        this.topVer = topVer;
+
+        futId = IgniteUuid.randomUuid();
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridPartitionedSingleGetFuture.class);
+    }
+
+    /**
+     *
+     */
+    public void init() {
+        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+
+        map(topVer);
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    private void map(AffinityTopologyVersion topVer) {
+        ClusterNode node = mapKeyToNode(topVer);
+
+        if (node == null) {
+            assert isDone() : this;
+
+            return;
+        }
+
+        if (node.isLocal()) {
+            LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+            map.put(key, false);
+
+            final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(),
+                -1,
+                map,
+                readThrough,
+                topVer,
+                subjId,
+                taskName == null ? 0 : taskName.hashCode(),
+                expiryPlc,
+                skipVals);
+
+            final Collection<Integer> invalidParts = fut.invalidPartitions();
+
+            if (!F.isEmpty(invalidParts)) {
+                AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
+
+                assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " +
+                    "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
+                    ", invalidParts=" + invalidParts + ']';
+
+                // Remap recursively.
+                map(updTopVer);
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+                    @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
+                        try {
+                            Collection<GridCacheEntryInfo> infos = fut.get();
+
+                            assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+                            setResult(F.first(infos));
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+        }
+        else {
+            synchronized (this) {
+                this.node = node;
+            }
+
+            if (!trackable) {
+                trackable = true;
+
+                cctx.mvcc().addFuture(this, futId);
+            }
+
+            GridCacheMessage req;
+
+            if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) {
+                req = new GridNearSingleGetRequest(cctx.cacheId(),
+                    futId,
+                    key,
+                    readThrough,
+                    topVer,
+                    subjId,
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    skipVals,
+                    /**add reader*/false,
+                    needVer,
+                    cctx.deploymentEnabled());
+            }
+            else {
+                LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+                map.put(key, false);
+
+                req = new GridNearGetRequest(
+                    cctx.cacheId(),
+                    futId,
+                    futId,
+                    cctx.versions().next(),
+                    map,
+                    readThrough,
+                    topVer,
+                    subjId,
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    skipVals,
+                    cctx.deploymentEnabled());
+            }
+
+            try {
+                cctx.io().send(node, req, cctx.ioPolicy());
+            }
+            catch (IgniteCheckedException e) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    onNodeLeft(node.id());
+                else
+                    onDone(e);
+            }
+        }
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Primary node or {@code null} if future was completed.
+     */
+    @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
+        ClusterNode primary = affinityNode(key, topVer);
+
+        if (primary == null) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+
+            return null;
+        }
+
+        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
+
+        if (allowLocRead) {
+            GridDhtCacheAdapter colocated = cctx.dht();
+
+            while (true) {
+                GridCacheEntryEx entry;
+
+                try {
+                    entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                        colocated.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        CacheObject v = null;
+                        GridCacheVersion ver = null;
+
+                        if (needVer) {
+                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /**update-metrics*/false,
+                                /*event*/!skipVals,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc);
+
+                            if (res != null) {
+                                v = res.get1();
+                                ver = res.get2();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*fail-fast*/true,
+                                /*unmarshal*/true,
+                                /**update-metrics*/false,
+                                /*event*/!skipVals,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc);
+                        }
+
+                        colocated.context().evicts().touch(entry, topVer);
+
+                        // Entry was not in memory or in swap, so we remove it from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                colocated.removeIfObsolete(key);
+                        }
+                        else {
+                            setResult(v, ver);
+
+                            return null;
+                        }
+                    }
+
+                    break;
+                }
+                catch (GridDhtInvalidPartitionException ignored) {
+                    break;
+                }
+                catch (IgniteCheckedException e) {
+                    onDone(e);
+
+                    return null;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    // No-op, will retry.
+                }
+            }
+        }
+
+        return primary;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
+        if (!processResponse(nodeId))
+            return;
+
+        Message res0 = res.result();
+
+        CacheObject val;
+        GridCacheVersion ver = null;
+
+        if (needVer) {
+            CacheVersionedValue verVal = (CacheVersionedValue)res0;
+
+            val = verVal.value();
+            ver = verVal.version();
+        }
+        else
+            val = (CacheObject)res0;
+
+        onResult(res.error(), res.invalidPartitions(), res.topologyVersion(), val, ver);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridNearGetResponse res) {
+        if (!processResponse(nodeId))
+            return;
+
+        Collection<GridCacheEntryInfo> infos = res.entries();
+
+        assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+        onResult(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), F.first(infos));
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if should process received response.
+     */
+    private boolean processResponse(UUID nodeId) {
+        synchronized (this) {
+            if (node != null && node.id().equals(nodeId)) {
+                node = null;
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param err Error.
+     * @param invalidParts Invalid partitions error flag.
+     * @param rmtTopVer Received topology version.
+     * @param info Entry info.
+     */
+    private void onResult(@Nullable IgniteCheckedException err,
+        boolean invalidParts,
+        AffinityTopologyVersion rmtTopVer,
+        @Nullable GridCacheEntryInfo info) {
+        if (info != null) {
+            assert skipVals == (info.value() == null);
+
+            onResult(err, invalidParts, rmtTopVer, info.value(), info.version());
+        }
+        else
+            onResult(err, invalidParts, rmtTopVer, null, null);
+    }
+
+    /**
+     * @param err Error.
+     * @param invalidParts Invalid partitions error flag.
+     * @param rmtTopVer Received topology version.
+     * @param val Value.
+     * @param ver Version.
+     */
+    private void onResult(@Nullable IgniteCheckedException err,
+        boolean invalidParts,
+        AffinityTopologyVersion rmtTopVer,
+        @Nullable CacheObject val,
+        @Nullable GridCacheVersion ver) {
+        if (err != null) {
+            onDone(err);
+
+            return;
+        }
+
+        if (invalidParts) {
+            assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
+
+            if (rmtTopVer.compareTo(topVer) <= 0) {
+                // Fail the whole get future.
+                onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
+                    "invalid partitions but remote topology version does not differ from local) " +
+                    "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) +
+                    ", nodeId=" + node.id() + ']'));
+
+                return;
+            }
+
+            if (canRemap) {
+                IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+
+                topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
+                    @Override public void applyx(IgniteInternalFuture<Long> fut) {
+                        try {
+                            AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
+
+                            remap(topVer);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+
+            }
+            else
+                map(topVer);
+        }
+        else
+            setResult(val, ver);
+    }
+
+    /**
+     * @param info Entry info.
+     */
+    private void setResult(@Nullable GridCacheEntryInfo info) {
+        if (info != null) {
+            assert skipVals == (info.value() == null);
+
+            setResult(info.value(), info.version());
+        }
+        else
+            onDone(skipVals ? false : null);
+    }
+
+    /**
+     * @param val Value.
+     * @param ver Version.
+     */
+    private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver) {
+        if (needVer) {
+            assert ver != null;
+            assert skipVals || val != null;
+
+            onDone(new T2<>(skipVals ? true : val, ver));
+        }
+        else {
+            if (!keepCacheObjects) {
+                Object res = skipVals ? true : CU.value(val, cctx, true);
+
+                if (deserializePortable && !skipVals)
+                    res = cctx.unwrapPortableIfNeeded(res, false);
+
+                onDone(res);
+            }
+            else
+                onDone(skipVals ? true : val);
+        }
+    }
+
+    /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        assert false : this;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (!processResponse(nodeId))
+            return false;
+
+        if (canRemap) {
+            final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
+                Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+            cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        try {
+                            fut.get();
+
+                            remap(updTopVer);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+        }
+        else
+            remap(topVer);
+
+        return true;
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    private void remap(final AffinityTopologyVersion topVer) {
+        cctx.closures().runLocalSafe(new Runnable() {
+            @Override public void run() {
+                map(topVer);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(Object res, Throwable err) {
+        if (super.onDone(res, err)) {
+            // Don't forget to clean up.
+            if (trackable)
+                cctx.mvcc().removeFuture(futId);
+
+            cctx.dht().sendTtlUpdateRequest(expiryPlc);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridPartitionedSingleGetFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7f9edb2..d1d9bd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -60,16 +60,20 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -242,6 +246,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+            @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
+                processNearSingleGetRequest(nodeId, req);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
             @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
                 processNearAtomicUpdateRequest(nodeId, req);
@@ -279,6 +289,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     processNearGetResponse(nodeId, res);
                 }
             });
+
+            ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+                @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
+                    processNearSingleGetResponse(nodeId, res);
+                }
+            });
         }
     }
 
@@ -301,6 +317,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<V> getAsync(final K key,
+        final boolean forcePrimary,
+        final boolean skipTx,
+        @Nullable UUID subjId,
+        final String taskName,
+        final boolean deserializePortable,
+        final boolean skipVals,
+        final boolean canRemap) {
+        ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        subjId = ctx.subjectIdPerCall(null, opCtx);
+
+        final UUID subjId0 = subjId;
+
+        final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null;
+
+        final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        return asyncOp(new CO<IgniteInternalFuture<V>>() {
+            @Override public IgniteInternalFuture<V> apply() {
+                return getAsync0(ctx.toCacheKeyObject(key),
+                    forcePrimary,
+                    subjId0,
+                    taskName,
+                    deserializePortable,
+                    expiryPlc,
+                    skipVals,
+                    skipStore,
+                    canRemap);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
         @Nullable final Collection<? extends K> keys,
         final boolean forcePrimary,
@@ -914,9 +969,56 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Entry point to all public API single get methods.
+     *
+     * @param key Key.
+     * @param forcePrimary Force primary flag.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param skipStore Skip store flag.
+     * @return Get future.
+     */
+    private IgniteInternalFuture<V> getAsync0(KeyCacheObject key,
+        boolean forcePrimary,
+        UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable ExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean skipStore,
+        boolean canRemap
+    ) {
+        AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+            ctx.shared().exchange().readyAffinityVersion();
+
+        IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+
+        GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+            key,
+            topVer,
+            !skipStore,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiry,
+            skipVals,
+            canRemap,
+            false,
+            false);
+
+        fut.init();
+
+        return (IgniteInternalFuture<V>)fut;
+    }
+
+    /**
      * Entry point to all public API get methods.
      *
-     * @param keys Keys to remove.
+     * @param keys Keys.
      * @param forcePrimary Force primary flag.
      * @param subjId Subject ID.
      * @param taskName Task name.
@@ -942,7 +1044,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!forcePrimary) {
+        if (!forcePrimary && ctx.affinityNode()) {
             Map<K, V> locVals = U.newHashMap(keys.size());
 
             boolean success = true;
@@ -2415,8 +2517,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (log.isDebugEnabled())
             log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
 
-        GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
+        CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param res Near get response.
+     */
+    private void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) {
+        if (log.isDebugEnabled())
+            log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
+
+        GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 83c220d..cd43d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture;
@@ -935,8 +936,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param res Response.
      */
     private void processGetResponse(UUID nodeId, GridNearGetResponse res) {
-        GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
+        CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
index 1559a91..c14621a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
@@ -53,7 +53,7 @@ public class CacheVersionedValue implements Message {
      * @param val Cache value.
      * @param ver Cache version.
      */
-    CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
+    public CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
         this.val = val;
         this.ver = ver;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 3c3527a..4b04cb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueCollection;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -292,8 +293,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param res Response.
      */
     protected void processGetResponse(UUID nodeId, GridNearGetResponse res) {
-        GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
+        CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.version(), res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ae1d43c..8f47fb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -618,28 +618,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /**
-     * Affinity node to send get request to.
-     *
-     * @param key Key to get.
-     * @param topVer Topology version.
-     * @return Affinity node to get key from.
-     */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
-            for (ClusterNode node : affNodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return cctx.affinity().primary(key, topVer);
-    }
-
-    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 8482217..6d60298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -133,7 +133,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
     ) {
         assert futId != null;
         assert miniId != null;
-        assert ver != null;
         assert keys != null;
 
         this.cacheId = cacheId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index fc06ab1..15a791f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -100,8 +100,6 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
         boolean addDepInfo
     ) {
         assert futId != null;
-        assert miniId != null;
-        assert ver != null;
 
         this.cacheId = cacheId;
         this.futId = futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b9ecdc6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
new file mode 100644
index 0000000..073df94
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final int READ_THROUGH_FLAG_MASK = 0x01;
+
+    /** */
+    public static final int SKIP_VALS_FLAG_MASK = 0x02;
+
+    /** */
+    public static final int ADD_READER_FLAG_MASK = 0x04;
+
+    /** */
+    public static final int NEED_VER_FLAG_MASK = 0x08;
+
+    /** */
+    public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** */
+    private KeyCacheObject key;
+
+    /** Flags. */
+    private byte flags;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name hash. */
+    private int taskNameHash;
+
+    /** TTL for read operation. */
+    private long accessTtl;
+
+    /**
+     * Empty constructor required for {@link Message}.
+     */
+    public GridNearSingleGetRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param futId Future ID.
+     * @param key Key.
+     * @param readThrough Read through flag.
+     * @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether
+     *      cache entry has a value.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
+     * @param addReader Add reader flag.
+     * @param needVer {@code True} if entry version is needed.
+     * @param addDepInfo Deployment info.
+     */
+    public GridNearSingleGetRequest(
+        int cacheId,
+        IgniteUuid futId,
+        KeyCacheObject key,
+        boolean readThrough,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        long accessTtl,
+        boolean skipVals,
+        boolean addReader,
+        boolean needVer,
+        boolean addDepInfo
+    ) {
+        assert futId != null;
+        assert key != null;
+
+        this.cacheId = cacheId;
+        this.futId = futId;
+        this.key = key;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.accessTtl = accessTtl;
+        this.addDepInfo = addDepInfo;
+
+        if (readThrough)
+            flags = (byte)(flags | READ_THROUGH_FLAG_MASK);
+
+        if (skipVals)
+            flags = (byte)(flags | SKIP_VALS_FLAG_MASK);
+
+        if (addReader)
+            flags = (byte)(flags | ADD_READER_FLAG_MASK);
+
+        if (needVer)
+            flags = (byte)(flags | NEED_VER_FLAG_MASK);
+    }
+
+    /**
+     * @return Key.
+     */
+    public KeyCacheObject key() {
+        return key;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Subject ID.
+     */
+    public UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * Gets task name hash.
+     *
+     * @return Task name hash.
+     */
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return New TTL to set after entry is accessed, -1 to leave unchanged.
+     */
+    public long accessTtl() {
+        return accessTtl;
+    }
+
+    /**
+     * @return Read through flag.
+     */
+    public boolean readThrough() {
+        return (flags & SKIP_STORE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return Read through flag.
+     */
+    public boolean skipValues() {
+        return (flags & SKIP_VALS_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return Add reader flag.
+     */
+    public boolean addReader() {
+        return (flags & ADD_READER_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return {@code True} if entry version is needed.
+     */
+    public boolean needVersion() {
+        return (flags & NEED_VER_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return {@code True} if full entry information is needed.
+     */
+    public boolean needEntryInfo() {
+        return (flags & NEED_ENTRY_INFO_FLAG_MASK) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        assert key != null;
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObject(key, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        assert key != null;
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                accessTtl = reader.readLong("accessTtl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearSingleGetRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("accessTtl", accessTtl))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 115;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearSingleGetRequest.class, this);
+    }
+}


Mime
View raw message