ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [05/50] [abbrv] ignite git commit: Optimization for single key cache 'get' operation.
Date Sat, 21 Nov 2015 02:01:43 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
new file mode 100644
index 0000000..8f2357b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CIX1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+    CacheGetFuture {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Context. */
+    private final GridCacheContext cctx;
+
+    /** Key. */
+    private final KeyCacheObject key;
+
+    /** Read through flag. */
+    private final boolean readThrough;
+
+    /** Force primary flag. */
+    private final boolean forcePrimary;
+
+    /** Future ID. */
+    private final IgniteUuid futId;
+
+    /** Trackable flag. */
+    private boolean trackable;
+
+    /** Subject ID. */
+    private final UUID subjId;
+
+    /** Task name. */
+    private final String taskName;
+
+    /** Whether to deserialize portable objects. */
+    private boolean deserializePortable;
+
+    /** Skip values flag. */
+    private boolean skipVals;
+
+    /** Expiry policy. */
+    private IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Flag indicating that get should be done on a locked topology version. */
+    private final boolean canRemap;
+
+    /** */
+    private final boolean needVer;
+
+    /** */
+    private final boolean keepCacheObjects;
+
+    /** */
+    private ClusterNode node;
+
+    /**
+     * @param cctx Context.
+     * @param key Key.
+     * @param topVer Topology version.
+     * @param readThrough Read through flag.
+     * @param forcePrimary If {@code true} then will force network trip to primary node even if called on backup node.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObjects Keep cache objects flag.
+     */
+    public GridPartitionedSingleGetFuture(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        boolean readThrough,
+        boolean forcePrimary,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObjects
+    ) {
+        assert key != null;
+
+        this.cctx = cctx;
+        this.key = key;
+        this.readThrough = readThrough;
+        this.forcePrimary = forcePrimary;
+        this.subjId = subjId;
+        this.taskName = taskName;
+        this.deserializePortable = deserializePortable;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+        this.canRemap = canRemap;
+        this.needVer = needVer;
+        this.keepCacheObjects = keepCacheObjects;
+        this.topVer = topVer;
+
+        futId = IgniteUuid.randomUuid();
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridPartitionedSingleGetFuture.class);
+    }
+
+    /**
+     *
+     */
+    public void init() {
+        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+
+        map(topVer);
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    @SuppressWarnings("unchecked")
+    private void map(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+
+        ClusterNode node = mapKeyToNode(topVer);
+
+        if (node == null) {
+            assert isDone() : this;
+
+            return;
+        }
+
+        if (node.isLocal()) {
+            LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+            map.put(key, false);
+
+            final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(),
+                -1,
+                map,
+                readThrough,
+                topVer,
+                subjId,
+                taskName == null ? 0 : taskName.hashCode(),
+                expiryPlc,
+                skipVals);
+
+            final Collection<Integer> invalidParts = fut.invalidPartitions();
+
+            if (!F.isEmpty(invalidParts)) {
+                AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
+
+                assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " +
+                    "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
+                    ", invalidParts=" + invalidParts + ']';
+
+                // Remap recursively.
+                map(updTopVer);
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
+                    @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
+                        try {
+                            Collection<GridCacheEntryInfo> infos = fut.get();
+
+                            assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+                            setResult(F.first(infos));
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
+
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+        }
+        else {
+            synchronized (this) {
+                this.node = node;
+            }
+
+            if (!trackable) {
+                trackable = true;
+
+                cctx.mvcc().addFuture(this, futId);
+            }
+
+            GridCacheMessage req;
+
+            if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) {
+                req = new GridNearSingleGetRequest(cctx.cacheId(),
+                    futId,
+                    key,
+                    readThrough,
+                    topVer,
+                    subjId,
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    skipVals,
+                    /**add reader*/false,
+                    needVer,
+                    cctx.deploymentEnabled());
+            }
+            else {
+                LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1);
+
+                map.put(key, false);
+
+                req = new GridNearGetRequest(
+                    cctx.cacheId(),
+                    futId,
+                    futId,
+                    cctx.versions().next(),
+                    map,
+                    readThrough,
+                    topVer,
+                    subjId,
+                    taskName == null ? 0 : taskName.hashCode(),
+                    expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    skipVals,
+                    cctx.deploymentEnabled());
+            }
+
+            try {
+                cctx.io().send(node, req, cctx.ioPolicy());
+            }
+            catch (IgniteCheckedException e) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    onNodeLeft(node.id());
+                else
+                    onDone(e);
+            }
+        }
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Primary node or {@code null} if future was completed.
+     */
+    @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
+        ClusterNode primary = affinityNode(key, topVer);
+
+        if (primary == null) {
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+
+            return null;
+        }
+
+        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
+
+        if (allowLocRead) {
+            GridDhtCacheAdapter colocated = cctx.dht();
+
+            while (true) {
+                GridCacheEntryEx entry;
+
+                try {
+                    entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                        colocated.peekEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        CacheObject v = null;
+                        GridCacheVersion ver = null;
+
+                        if (needVer) {
+                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                null,
+                                /*swap*/true,
+                                /*unmarshal*/true,
+                                /**update-metrics*/false,
+                                /*event*/!skipVals,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc);
+
+                            if (res != null) {
+                                v = res.get1();
+                                ver = res.get2();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(null,
+                                /*swap*/true,
+                                /*read-through*/false,
+                                /*fail-fast*/true,
+                                /*unmarshal*/true,
+                                /**update-metrics*/false,
+                                /*event*/!skipVals,
+                                /*temporary*/false,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc);
+                        }
+
+                        colocated.context().evicts().touch(entry, topVer);
+
+                        // Entry was not in memory or in swap, so we remove it from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                colocated.removeIfObsolete(key);
+                        }
+                        else {
+                            if (!skipVals && cctx.config().isStatisticsEnabled())
+                                cctx.cache().metrics0().onRead(true);
+
+                            if (!skipVals)
+                                setResult(v, ver);
+                            else
+                                setSkipValueResult(true, ver);
+
+                            return null;
+                        }
+                    }
+
+                    break;
+                }
+                catch (GridDhtInvalidPartitionException ignored) {
+                    break;
+                }
+                catch (IgniteCheckedException e) {
+                    onDone(e);
+
+                    return null;
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    // No-op, will retry.
+                }
+            }
+        }
+
+        return primary;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
+        if (!processResponse(nodeId) || !checkError(res.error(), res.invalidPartitions(), res.topologyVersion()))
+            return;
+
+        Message res0 = res.result();
+
+        if (needVer) {
+            CacheVersionedValue verVal = (CacheVersionedValue)res0;
+
+            if (verVal != null) {
+                if (skipVals)
+                    setSkipValueResult(true, verVal.version());
+                else
+                    setResult(verVal.value() , verVal.version());
+            }
+            else {
+                if (skipVals)
+                    setSkipValueResult(false, null);
+                else
+                    setResult(null , null);
+            }
+        }
+        else {
+            if (skipVals)
+                setSkipValueResult(res.containsValue(), null);
+            else
+                setResult((CacheObject)res0, null);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridNearGetResponse res) {
+        if (!processResponse(nodeId) ||
+            !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion()))
+            return;
+
+        Collection<GridCacheEntryInfo> infos = res.entries();
+
+        assert F.isEmpty(infos) || infos.size() == 1 : infos;
+
+        setResult(F.first(infos));
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if should process received response.
+     */
+    private boolean processResponse(UUID nodeId) {
+        synchronized (this) {
+            if (node != null && node.id().equals(nodeId)) {
+                node = null;
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param err Error.
+     * @param invalidParts Invalid partitions error flag.
+     * @param rmtTopVer Received topology version.
+     */
+    private boolean checkError(@Nullable IgniteCheckedException err,
+        boolean invalidParts,
+        AffinityTopologyVersion rmtTopVer) {
+        if (err != null) {
+            onDone(err);
+
+            return false;
+        }
+
+        if (invalidParts) {
+            assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO);
+
+            if (rmtTopVer.compareTo(topVer) <= 0) {
+                // Fail the whole get future.
+                onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
+                    "invalid partitions but remote topology version does not differ from local) " +
+                    "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) +
+                    ", nodeId=" + node.id() + ']'));
+
+                return false;
+            }
+
+            if (canRemap) {
+                IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+
+                topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
+                    @Override public void applyx(IgniteInternalFuture<Long> fut) {
+                        try {
+                            AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get());
+
+                            remap(topVer);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+
+            }
+            else
+                map(topVer);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param info Entry info.
+     */
+    private void setResult(@Nullable GridCacheEntryInfo info) {
+        assert info == null || skipVals == (info.value() == null);
+
+        if (skipVals) {
+            if (info != null)
+                setSkipValueResult(true, info.version());
+            else
+                setSkipValueResult(false, null);
+        }
+        else {
+            if (info != null)
+                setResult(info.value(), info.version());
+            else
+                setResult(null, null);
+        }
+    }
+
+    /**
+     * @param res Result.
+     * @param ver Version.
+     */
+    private void setSkipValueResult(boolean res, @Nullable GridCacheVersion ver) {
+        assert skipVals;
+
+        if (needVer) {
+            assert ver != null || !res;
+
+            onDone(new T2<>(res, ver));
+        }
+        else
+            onDone(res);
+    }
+
+    /**
+     * @param val Value.
+     * @param ver Version.
+     */
+    private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver) {
+        try {
+            assert !skipVals;
+
+            if (val != null) {
+                if (needVer) {
+                    assert ver != null;
+
+                    onDone(new T2<>(val, ver));
+                }
+                else {
+                    if (!keepCacheObjects) {
+                        Object res = CU.value(val, cctx, true);
+
+                        if (deserializePortable && !skipVals)
+                            res = cctx.unwrapPortableIfNeeded(res, false);
+
+                        onDone(res);
+                    }
+                    else
+                        onDone(val);
+                }
+            }
+            else
+                onDone(null);
+        }
+        catch (Exception e) {
+            onDone(e);
+        }
+    }
+
+    /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (!processResponse(nodeId))
+            return false;
+
+        if (canRemap) {
+            final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(
+                Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+            cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                        try {
+                            fut.get();
+
+                            remap(updTopVer);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+        }
+        else
+            remap(topVer);
+
+        return true;
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    private void remap(final AffinityTopologyVersion topVer) {
+        cctx.closures().runLocalSafe(new Runnable() {
+            @Override public void run() {
+                map(topVer);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(Object res, Throwable err) {
+        if (super.onDone(res, err)) {
+            // Don't forget to clean up.
+            if (trackable)
+                cctx.mvcc().removeFuture(futId);
+
+            cctx.dht().sendTtlUpdateRequest(expiryPlc);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridPartitionedSingleGetFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7f9edb2..75f8c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,11 +65,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -242,6 +245,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() {
+            @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) {
+                processNearSingleGetRequest(nodeId, req);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
             @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
                 processNearAtomicUpdateRequest(nodeId, req);
@@ -279,6 +288,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     processNearGetResponse(nodeId, res);
                 }
             });
+
+            ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+                @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
+                    processNearSingleGetResponse(nodeId, res);
+                }
+            });
         }
     }
 
@@ -301,6 +316,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<V> getAsync(final K key,
+        final boolean forcePrimary,
+        final boolean skipTx,
+        @Nullable UUID subjId,
+        final String taskName,
+        final boolean deserializePortable,
+        final boolean skipVals,
+        final boolean canRemap) {
+        ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        subjId = ctx.subjectIdPerCall(null, opCtx);
+
+        final UUID subjId0 = subjId;
+
+        final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null;
+
+        final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        return asyncOp(new CO<IgniteInternalFuture<V>>() {
+            @Override public IgniteInternalFuture<V> apply() {
+                return getAsync0(ctx.toCacheKeyObject(key),
+                    forcePrimary,
+                    subjId0,
+                    taskName,
+                    deserializePortable,
+                    expiryPlc,
+                    skipVals,
+                    skipStore,
+                    canRemap);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
         @Nullable final Collection<? extends K> keys,
         final boolean forcePrimary,
@@ -914,9 +968,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /**
+     * Entry point to all public API single get methods.
+     *
+     * @param key Key.
+     * @param forcePrimary Force primary flag.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param skipStore Skip store flag.
+     * @param canRemap Can remap flag.
+     * @return Get future.
+     */
+    private IgniteInternalFuture<V> getAsync0(KeyCacheObject key,
+        boolean forcePrimary,
+        UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable ExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean skipStore,
+        boolean canRemap
+    ) {
+        AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+            ctx.shared().exchange().readyAffinityVersion();
+
+        IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
+
+        GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+            key,
+            topVer,
+            !skipStore,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiry,
+            skipVals,
+            canRemap,
+            false,
+            false);
+
+        fut.init();
+
+        return (IgniteInternalFuture<V>)fut;
+    }
+
+    /**
      * Entry point to all public API get methods.
      *
-     * @param keys Keys to remove.
+     * @param keys Keys.
      * @param forcePrimary Force primary flag.
      * @param subjId Subject ID.
      * @param taskName Task name.
@@ -942,7 +1044,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
         // Optimisation: try to resolve value locally and escape 'get future' creation.
-        if (!forcePrimary) {
+        if (!forcePrimary && ctx.affinityNode()) {
             Map<K, V> locVals = U.newHashMap(keys.size());
 
             boolean success = true;
@@ -2409,27 +2511,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
     /**
      * @param nodeId Sender node ID.
-     * @param res Near get response.
-     */
-    private void processNearGetResponse(UUID nodeId, GridNearGetResponse res) {
-        if (log.isDebugEnabled())
-            log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']');
-
-        GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
-
-        if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
-
-            return;
-        }
-
-        fut.onResult(nodeId, res);
-    }
-
-    /**
-     * @param nodeId Sender node ID.
      * @param req Near atomic update request.
      */
     private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4ace5c4..c34dcfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
@@ -48,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -67,39 +65,42 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     protected static IgniteLogger log;
 
     /** Cache context. */
-    private GridCacheContext cctx;
+    private final GridCacheContext cctx;
 
     /** Future version. */
-    private GridCacheVersion futVer;
+    private final GridCacheVersion futVer;
 
     /** Write version. */
-    private GridCacheVersion writeVer;
+    private final GridCacheVersion writeVer;
 
     /** Force transform backup flag. */
     private boolean forceTransformBackups;
 
     /** Completion callback. */
     @GridToStringExclude
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+    private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
     /** Update request. */
-    private GridNearAtomicUpdateRequest updateReq;
+    private final GridNearAtomicUpdateRequest updateReq;
 
     /** Update response. */
-    private GridNearAtomicUpdateResponse updateRes;
+    private final GridNearAtomicUpdateResponse updateRes;
 
     /** Future keys. */
-    private Collection<KeyCacheObject> keys;
+    private final Collection<KeyCacheObject> keys;
 
     /** */
-    private boolean waitForExchange;
+    private final boolean waitForExchange;
+
+    /** Response count. */
+    private volatile int resCnt;
 
     /**
      * @param cctx Cache context.
@@ -128,6 +129,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
         keys = new ArrayList<>(updateReq.keys().size());
+        mappings = U.newHashMap(updateReq.keys().size());
 
         boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
 
@@ -145,22 +147,37 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         if (log.isDebugEnabled())
             log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
 
+        return registerResponse(nodeId);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if request found.
+     */
+    private boolean registerResponse(UUID nodeId) {
+        int resCnt0;
+
         GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
 
         if (req != null) {
-            // Remove only after added keys to failed set.
-            mappings.remove(nodeId);
+            synchronized (this) {
+                if (req.onResponse()) {
+                    resCnt0 = resCnt;
+
+                    resCnt0 += 1;
+
+                    resCnt = resCnt0;
+                }
+                else
+                    return false;
+            }
 
-            checkComplete();
+            if (resCnt0 == mappings.size())
+                onDone();
 
             return true;
         }
@@ -343,18 +360,18 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +
                         req.nodeId());
 
-                    mappings.remove(req.nodeId());
+                    registerResponse(req.nodeId());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
                         + req.nodeId(), e);
 
-                    mappings.remove(req.nodeId());
+                    registerResponse(req.nodeId());
                 }
             }
         }
-
-        checkComplete();
+        else
+            onDone();
 
         // Send response right away if no ACKs from backup is required.
         // Backups will send ACKs anyway, future will be completed after all backups have replied.
@@ -389,9 +406,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(nodeId);
-
-        checkComplete();
+        registerResponse(nodeId);
     }
 
     /**
@@ -403,22 +418,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
-        mappings.remove(nodeId);
-
-        checkComplete();
-    }
-
-    /**
-     * Checks if all required responses are received.
-     */
-    private void checkComplete() {
-        // Always wait for replies from all backups.
-        if (mappings.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Completing DHT atomic update future: " + this);
-
-            onDone();
-        }
+        registerResponse(nodeId);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index e55cac9..1219f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -139,6 +139,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Task name hash. */
     private int taskNameHash;
 
+    /** On response flag. Access should be synced on future. */
+    @GridDirectTransient
+    private boolean onRes;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -527,6 +531,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
+     * @return {@code True} if on response flag changed.
+     */
+    public boolean onResponse() {
+        return !onRes && (onRes = true);
+    }
+
+    /**
      * @return Optional arguments for entry processor.
      */
     @Nullable public Object[] invokeArguments() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ae662c8..a786803 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -238,11 +238,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         return state.futureVersion();
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        throw new UnsupportedOperationException();
-    }
-
     /**
      * @return {@code True} if this future should block partition map exchange.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 7131aa5..47b7aea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -53,8 +53,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -138,7 +141,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
-                processGetResponse(nodeId, res);
+                processNearGetResponse(nodeId, res);
+            }
+        });
+
+        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+            @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
+                processNearSingleGetResponse(nodeId, res);
             }
         });
 
@@ -185,6 +194,80 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<V> getAsync(final K key,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable UUID subjId,
+        String taskName,
+        final boolean deserializePortable,
+        final boolean skipVals,
+        boolean canRemap) {
+        ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+
+        final CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        if (tx != null && !tx.implicit() && !skipTx) {
+            return asyncOp(tx, new AsyncOp<V>() {
+                @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+                    IgniteInternalFuture<Map<Object, Object>>  fut = tx.getAllAsync(ctx,
+                        Collections.singleton(ctx.toCacheKeyObject(key)),
+                        deserializePortable,
+                        skipVals,
+                        false,
+                        opCtx != null && opCtx.skipStore());
+
+                    return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() {
+                        @SuppressWarnings("unchecked")
+                        @Override public V applyx(IgniteInternalFuture<Map<Object, Object>> e)
+                            throws IgniteCheckedException {
+                            Map<Object, Object> map = e.get();
+
+                            assert map.isEmpty() || map.size() == 1 : map.size();
+
+                            if (skipVals) {
+                                Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
+
+                                return (V)(val);
+                            }
+
+                            return (V)map.get(key);
+                        }
+                    });
+                }
+            });
+        }
+
+        AffinityTopologyVersion topVer = tx == null ?
+                (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+                tx.topologyVersion();
+
+        subjId = ctx.subjectIdPerCall(subjId, opCtx);
+
+        GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+            ctx.toCacheKeyObject(key),
+            topVer,
+            opCtx == null || !opCtx.skipStore(),
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
+            skipVals,
+            canRemap,
+            /*needVer*/false,
+            /*keepCacheObjects*/false);
+
+        fut.init();
+
+        return (IgniteInternalFuture<V>)fut;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
         @Nullable final Collection<? extends K> keys,
         boolean forcePrimary,
@@ -290,6 +373,54 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /**
+     * @param key Key to load.
+     * @param readThrough Read through flag.
+     * @param forcePrimary Force get from primary node flag.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObj Keep cache objects flag.
+     * @return Load future.
+     */
+    public final IgniteInternalFuture<Object> loadAsync(
+        KeyCacheObject key,
+        boolean readThrough,
+        boolean forcePrimary,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObj
+    ) {
+        GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+            ctx.toCacheKeyObject(key),
+            topVer,
+            readThrough,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            keepCacheObj);
+
+        fut.init();
+
+        return fut;
+    }
+
+    /**
      * @param keys Keys to load.
      * @param readThrough Read through flag.
      * @param forcePrimary Force get from primary node flag.
@@ -299,9 +430,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
-     * @return Loaded values.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObj Keep cache objects flag.
+     * @return Load future.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(
+    public final IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean forcePrimary,
@@ -931,24 +1065,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /**
-     * @param nodeId Sender ID.
-     * @param res Response.
-     */
-    private void processGetResponse(UUID nodeId, GridNearGetResponse res) {
-        GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
-
-        if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']');
-
-            return;
-        }
-
-        fut.onResult(nodeId, res);
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param res Response.
      */
@@ -957,7 +1073,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         assert res != null;
 
         GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc().
-            <Boolean>future(res.version(), res.futureId());
+            <Boolean>mvccFuture(res.version(), res.futureId());
 
         if (fut != null)
             fut.onResult(nodeId, res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index abeb509..8245d88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -35,10 +35,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -78,7 +79,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
  * Colocated cache lock future.
  */
 public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
-    implements GridCacheFuture<Boolean> {
+    implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -198,25 +199,16 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
     }
 
-    /**
-     * @return Participating nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                if (isMini(f))
-                    return ((MiniFuture)f).node();
-
-                return cctx.discovery().localNode();
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
         return lockVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */
@@ -538,7 +530,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 log.debug("Completing future: " + this);
 
             // Clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             if (timeoutObj != null)
                 cctx.time().removeTimeoutObject(timeoutObj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
index 1559a91..c14621a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
@@ -53,7 +53,7 @@ public class CacheVersionedValue implements Message {
      * @param val Cache value.
      * @param ver Cache version.
      */
-    CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
+    public CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
         this.val = val;
         this.ver = ver;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 3c3527a..eb0b637 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueCollection;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -292,8 +293,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param res Response.
      */
     protected void processGetResponse(UUID nodeId, GridNearGetResponse res) {
-        GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future(
-            res.version(), res.futureId());
+        CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId());
 
         if (fut == null) {
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ae1d43c..dfaa44e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -170,24 +170,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return ver;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
@@ -227,7 +209,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         if (super.onDone(res, err)) {
             // Don't forget to clean up.
             if (trackable)
-                cctx.mvcc().removeFuture(this);
+                cctx.mvcc().removeFuture(futId);
 
             cache().dht().sendTtlUpdateRequest(expiryPlc);
 
@@ -343,7 +325,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                 if (!trackable) {
                     trackable = true;
 
-                    cctx.mvcc().addFuture(this);
+                    cctx.mvcc().addFuture(this, futId);
                 }
 
                 MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer);
@@ -386,6 +368,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
      * @param saved Reserved near cache entries.
      * @return Map.
      */
+    @SuppressWarnings("unchecked")
     private Map<KeyCacheObject, GridNearCacheEntry> map(
         KeyCacheObject key,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
@@ -538,11 +521,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                         else {
                             K key0 = key.value(cctx.cacheObjectContext(), true);
-                            V val0 = v.value(cctx.cacheObjectContext(), true);
-
-                            val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
                             key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
 
+                            V val0;
+
+                            if (!skipVals) {
+                                val0 = v.value(cctx.cacheObjectContext(), true);
+                                val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
+                            }
+                            else
+                                val0 = (V)Boolean.TRUE;
+
                             add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
                         }
                     }
@@ -618,28 +607,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /**
-     * Affinity node to send get request to.
-     *
-     * @param key Key to get.
-     * @param topVer Topology version.
-     * @return Affinity node to get key from.
-     */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
-        if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
-            for (ClusterNode node : affNodes) {
-                if (cctx.discovery().alive(node))
-                    return node;
-            }
-
-            return null;
-        }
-        else
-            return cctx.affinity().primary(key, topVer);
-    }
-
-    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 8482217..6d60298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -133,7 +133,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
     ) {
         assert futId != null;
         assert miniId != null;
-        assert ver != null;
         assert keys != null;
 
         this.cacheId = cacheId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index fc06ab1..15a791f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -100,8 +100,6 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
         boolean addDepInfo
     ) {
         assert futId != null;
-        assert miniId != null;
-        assert ver != null;
 
         this.cacheId = cacheId;
         this.futId = futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 9c3701f..76f2fbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -209,20 +209,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         valMap = new ConcurrentHashMap8<>(keys.size(), 1f);
     }
 
-    /**
-     * @return Participating nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                if (isMini(f))
-                    return ((MiniFuture)f).node();
-
-                return cctx.discovery().localNode();
-            }
-        });
-    }
-
     /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
         return lockVer;
@@ -672,7 +658,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                 log.debug("Completing future: " + this);
 
             // Clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             if (timeoutObj != null)
                 cctx.time().removeTimeoutObject(timeoutObj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 1569b14..770c47a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -73,8 +73,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
-    implements GridCacheMvccFuture<IgniteInternalTx> {
+public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
     /** */
     public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
 
@@ -148,18 +147,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                if (isMini(f))
-                    return ((MiniFuture)f).node();
-
-                return cctx.discovery().localNode();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
@@ -287,7 +274,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 tx.setRollbackOnly();
 
             // Don't forget to clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 82e3868..eaf476c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
@@ -54,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
@@ -66,8 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
-    implements GridCacheMvccFuture<IgniteInternalTx> {
+public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
     /** */
     @GridToStringInclude
     private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
@@ -100,18 +97,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                if (isMini(f))
-                    return ((MiniFuture)f).node();
-
-                return cctx.discovery().localNode();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
@@ -261,7 +246,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         if (super.onDone(tx, err0)) {
             // Don't forget to clean up.
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 103105e..ffe5373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
@@ -42,7 +44,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -68,15 +69,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                return ((MiniFuture)f).node();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         boolean found = false;
 
@@ -280,6 +272,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
     }
 
     /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) {
         if (err != null)
             this.err.compareAndSet(null, err);
@@ -290,7 +287,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             tx.state(PREPARED);
 
         if (super.onDone(tx, err)) {
-            cctx.mvcc().removeFuture(this);
+            cctx.mvcc().removeMvccFuture(this);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
new file mode 100644
index 0000000..a506007
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final int READ_THROUGH_FLAG_MASK = 0x01;
+
+    /** */
+    public static final int SKIP_VALS_FLAG_MASK = 0x02;
+
+    /** */
+    public static final int ADD_READER_FLAG_MASK = 0x04;
+
+    /** */
+    public static final int NEED_VER_FLAG_MASK = 0x08;
+
+    /** */
+    public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** */
+    private KeyCacheObject key;
+
+    /** Flags. */
+    private byte flags;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name hash. */
+    private int taskNameHash;
+
+    /** TTL for read operation. */
+    private long accessTtl;
+
+    /**
+     * Empty constructor required for {@link Message}.
+     */
+    public GridNearSingleGetRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param futId Future ID.
+     * @param key Key.
+     * @param readThrough Read through flag.
+     * @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether
+     *      cache entry has a value.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
+     * @param addReader Add reader flag.
+     * @param needVer {@code True} if entry version is needed.
+     * @param addDepInfo Deployment info.
+     */
+    public GridNearSingleGetRequest(
+        int cacheId,
+        IgniteUuid futId,
+        KeyCacheObject key,
+        boolean readThrough,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        long accessTtl,
+        boolean skipVals,
+        boolean addReader,
+        boolean needVer,
+        boolean addDepInfo
+    ) {
+        assert futId != null;
+        assert key != null;
+
+        this.cacheId = cacheId;
+        this.futId = futId;
+        this.key = key;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.accessTtl = accessTtl;
+        this.addDepInfo = addDepInfo;
+
+        if (readThrough)
+            flags = (byte)(flags | READ_THROUGH_FLAG_MASK);
+
+        if (skipVals)
+            flags = (byte)(flags | SKIP_VALS_FLAG_MASK);
+
+        if (addReader)
+            flags = (byte)(flags | ADD_READER_FLAG_MASK);
+
+        if (needVer)
+            flags = (byte)(flags | NEED_VER_FLAG_MASK);
+    }
+
+    /**
+     * @return Key.
+     */
+    public KeyCacheObject key() {
+        return key;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Subject ID.
+     */
+    public UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * Gets task name hash.
+     *
+     * @return Task name hash.
+     */
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return New TTL to set after entry is accessed, -1 to leave unchanged.
+     */
+    public long accessTtl() {
+        return accessTtl;
+    }
+
+    /**
+     * @return Read through flag.
+     */
+    public boolean readThrough() {
+        return (flags & SKIP_STORE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return Read through flag.
+     */
+    public boolean skipValues() {
+        return (flags & SKIP_VALS_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return Add reader flag.
+     */
+    public boolean addReader() {
+        return (flags & ADD_READER_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return {@code True} if entry version is needed.
+     */
+    public boolean needVersion() {
+        return (flags & NEED_VER_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @return {@code True} if full entry information is needed.
+     */
+    public boolean needEntryInfo() {
+        return (flags & NEED_ENTRY_INFO_FLAG_MASK) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        assert key != null;
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalCacheObject(key, cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        assert key != null;
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                accessTtl = reader.readLong("accessTtl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearSingleGetRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("accessTtl", accessTtl))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 116;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 10;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearSingleGetRequest.class, this);
+    }
+}


Mime
View raw message