ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [24/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
new file mode 100644
index 0000000..8d9e67c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -0,0 +1,1017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
+
+/**
+ * DHT cache adapter.
+ */
+public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Topology. */
+    private GridDhtPartitionTopology<K, V> top;
+
+    /** Preloader. */
+    protected GridCachePreloader<K, V> preldr;
+
+    /** Multi tx future holder. */
+    private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
+
+    /** Multi tx futures. */
+    private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>();
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected GridDhtCacheAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     */
+    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
+        super(ctx, ctx.config().getStartSize());
+
+        top = new GridDhtPartitionTopologyImpl<>(ctx);
+    }
+
+    /**
+     * Constructor used for near-only cache.
+     *
+     * @param ctx Cache context.
+     * @param map Cache map.
+     */
+    protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) {
+        super(ctx, map);
+
+        top = new GridDhtPartitionTopologyImpl<>(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void init() {
+        map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+            /** {@inheritDoc} */
+            @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+                V val, GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+                return new GridDhtCacheEntry<>(ctx, topVer, key, hash, val, next, ttl, hdrId);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        super.start();
+
+        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest<K, V> req) {
+                processTtlUpdateRequest(req);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        super.stop();
+
+        if (preldr != null)
+            preldr.stop();
+
+        // Clean up to help GC.
+        preldr = null;
+        top = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        preldr.onKernalStart();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop() {
+        super.onKernalStop();
+
+        if (preldr != null)
+            preldr.onKernalStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        super.printMemoryStats();
+
+        top.printMemoryStats(1024);
+    }
+
+    /**
+     * @return Near cache.
+     */
+    public abstract GridNearCacheAdapter<K, V> near();
+
+    /**
+     * @return Partition topology.
+     */
+    public GridDhtPartitionTopology<K, V> topology() {
+        return top;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCachePreloader<K, V> preloader() {
+        return preldr;
+    }
+
+    /**
+     * @return DHT preloader.
+     */
+    public GridDhtPreloader<K, V> dhtPreloader() {
+        assert preldr instanceof GridDhtPreloader;
+
+        return (GridDhtPreloader<K, V>)preldr;
+    }
+
+    /**
+     * @return Topology version future registered for multi-update.
+     */
+    @Nullable public GridDhtTopologyFuture multiUpdateTopologyFuture() {
+        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get();
+
+        return tup == null ? null : tup.get2();
+    }
+
+    /**
+     * Starts multi-update lock. Will wait for topology future is ready.
+     *
+     * @return Topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long beginMultiUpdate() throws IgniteCheckedException {
+        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get();
+
+        if (tup != null)
+            throw new IgniteCheckedException("Nested multi-update locks are not supported");
+
+        top.readLock();
+
+        GridDhtTopologyFuture topFut;
+
+        long topVer;
+
+        try {
+            // While we are holding read lock, register lock future for partition release future.
+            IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId());
+
+            topVer = top.topologyVersion();
+
+            MultiUpdateFuture fut = new MultiUpdateFuture(ctx.kernalContext(), topVer);
+
+            MultiUpdateFuture old = multiTxFuts.putIfAbsent(lockId, fut);
+
+            assert old == null;
+
+            topFut = top.topologyVersionFuture();
+
+            multiTxHolder.set(F.t(lockId, topFut));
+        }
+        finally {
+            top.readUnlock();
+        }
+
+        topFut.get();
+
+        return topVer;
+    }
+
+    /**
+     * Ends multi-update lock.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void endMultiUpdate() throws IgniteCheckedException {
+        IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get();
+
+        if (tup == null)
+            throw new IgniteCheckedException("Multi-update was not started or released twice.");
+
+        top.readLock();
+
+        try {
+            IgniteUuid lockId = tup.get1();
+
+            MultiUpdateFuture multiFut = multiTxFuts.remove(lockId);
+
+            multiTxHolder.set(null);
+
+            // Finish future.
+            multiFut.onDone(lockId);
+        }
+        finally {
+            top.readUnlock();
+        }
+    }
+
+    /**
+     * Creates multi update finish future. Will return {@code null} if no multi-update locks are found.
+     *
+     * @param topVer Topology version.
+     * @return Finish future.
+     */
+    @Nullable public IgniteFuture<?> multiUpdateFinishFuture(long topVer) {
+        GridCompoundFuture<IgniteUuid, Object> fut = null;
+
+        for (MultiUpdateFuture multiFut : multiTxFuts.values()) {
+            if (multiFut.topologyVersion() <= topVer) {
+                if (fut == null)
+                    fut = new GridCompoundFuture<>(ctx.kernalContext());
+
+                fut.add(multiFut);
+            }
+        }
+
+        if (fut != null)
+            fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
+     * @param key Key.
+     * @return DHT entry.
+     */
+    @Nullable public GridDhtCacheEntry<K, V> peekExx(K key) {
+        return (GridDhtCacheEntry<K, V>)peekEx(key);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
+     */
+    @Override public GridCacheEntry<K, V> entry(K key) throws GridDhtInvalidPartitionException {
+        return super.entry(key);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
+     */
+    @Override public GridCacheEntryEx<K, V> entryEx(K key, boolean touch) throws GridDhtInvalidPartitionException {
+        return super.entryEx(key, touch);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
+     */
+    @Override public GridCacheEntryEx<K, V> entryEx(K key, long topVer) throws GridDhtInvalidPartitionException {
+        return super.entryEx(key, topVer);
+    }
+
+    /**
+     * @param key Key.
+     * @return DHT entry.
+     * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
+     */
+    public GridDhtCacheEntry<K, V> entryExx(K key) throws GridDhtInvalidPartitionException {
+        return (GridDhtCacheEntry<K, V>)entryEx(key);
+    }
+
+    /**
+     * @param key Key.
+     * @param topVer Topology version.
+     * @return DHT entry.
+     * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid.
+     */
+    public GridDhtCacheEntry<K, V> entryExx(K key, long topVer) throws GridDhtInvalidPartitionException {
+        return (GridDhtCacheEntry<K, V>)entryEx(key, topVer);
+    }
+
+    /**
+     * Gets or creates entry for given key. If key belongs to local node, dht entry will be returned, otherwise
+     * if {@code allowDetached} is {@code true}, detached entry will be returned, otherwise exception will be
+     * thrown.
+     *
+     * @param key Key for which entry should be returned.
+     * @param allowDetached Whether to allow detached entries.
+     * @param touch {@code True} if entry should be passed to eviction policy.
+     * @return Cache entry.
+     * @throws GridDhtInvalidPartitionException if entry does not belong to this node and
+     *      {@code allowDetached} is {@code false}.
+     */
+    public GridCacheEntryEx<K, V> entryExx(K key, long topVer, boolean allowDetached, boolean touch) {
+        try {
+            return allowDetached && !ctx.affinity().localNode(key, topVer) ?
+                new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0) :
+                entryEx(key, touch);
+        }
+        catch (GridDhtInvalidPartitionException e) {
+            if (!allowDetached)
+                throw e;
+
+            return new GridDhtDetachedCacheEntry<>(ctx, key, key.hashCode(), null, null, 0, 0);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void localLoad(Collection<? extends K> keys) throws IgniteCheckedException {
+        if (ctx.store().isLocalStore()) {
+            super.localLoad(keys);
+
+            return;
+        }
+
+        // Version for all loaded entries.
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+
+        final boolean replicate = ctx.isDrEnabled();
+
+        final long topVer = ctx.affinity().affinityTopologyVersion();
+
+        ctx.store().loadAllFromStore(null, keys, new CI2<K, V>() {
+            @Override public void apply(K key, V val) {
+                loadEntry(key, val, ver0, null, topVer, replicate, 0);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl, Object[] args) throws IgniteCheckedException {
+        if (ctx.store().isLocalStore()) {
+            super.loadCache(p, ttl, args);
+
+            return;
+        }
+
+        // Version for all loaded entries.
+        final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion());
+
+        final boolean replicate = ctx.isDrEnabled();
+
+        final long topVer = ctx.affinity().affinityTopologyVersion();
+
+        ctx.store().loadCache(new CI3<K, V, GridCacheVersion>() {
+            @Override public void apply(K key, V val, @Nullable GridCacheVersion ver) {
+                assert ver == null;
+
+                loadEntry(key, val, ver0, p, topVer, replicate, ttl);
+            }
+        }, args);
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param ver Cache version.
+     * @param p Optional predicate.
+     * @param topVer Topology version.
+     * @param replicate Replication flag.
+     * @param ttl TTL.
+     */
+    private void loadEntry(K key,
+        V val,
+        GridCacheVersion ver,
+        @Nullable IgniteBiPredicate<K, V> p,
+        long topVer,
+        boolean replicate,
+        long ttl) {
+        if (p != null && !p.apply(key, val))
+            return;
+
+        try {
+            GridDhtLocalPartition<K, V> part = top.localPartition(ctx.affinity().partition(key), -1, true);
+
+            // Reserve to make sure that partition does not get unloaded.
+            if (part.reserve()) {
+                GridCacheEntryEx<K, V> entry = null;
+
+                try {
+                    if (ctx.portableEnabled()) {
+                        key = (K)ctx.marshalToPortable(key);
+                        val = (V)ctx.marshalToPortable(val);
+                    }
+
+                    entry = entryEx(key, false);
+
+                    entry.initialValue(val, null, ver, ttl, -1, false, topVer, replicate ? DR_LOAD : DR_NONE);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to put cache value: " + entry, e);
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry during loadCache (will ignore): " + entry);
+                }
+                finally {
+                    if (entry != null)
+                        entry.context().evicts().touch(entry, topVer);
+
+                    part.release();
+                }
+            }
+            else if (log.isDebugEnabled())
+                log.debug("Will node load entry into cache (partition is invalid): " + part);
+        }
+        catch (GridDhtInvalidPartitionException e) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val +
+                    ", err=" + e + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int primarySize() {
+        int sum = 0;
+
+        long topVer = ctx.affinity().affinityTopologyVersion();
+
+        for (GridDhtLocalPartition<K, V> p : topology().currentLocalPartitions()) {
+            if (p.primary(topVer))
+                sum += p.publicSize();
+        }
+
+        return sum;
+    }
+
+    /**
+     * This method is used internally. Use
+     * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, long, UUID, int, boolean, IgnitePredicate[], IgniteCacheExpiryPolicy)}
+     * method instead to retrieve DHT value.
+     *
+     * @param keys {@inheritDoc}
+     * @param forcePrimary {@inheritDoc}
+     * @param skipTx {@inheritDoc}
+     * @param filter {@inheritDoc}
+     * @return {@inheritDoc}
+     */
+    @Override public IgniteFuture<Map<K, V>> getAllAsync(
+        @Nullable Collection<? extends K> keys,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable GridCacheEntryEx<K, V> entry,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter
+    ) {
+        return getAllAsync(keys,
+            true,
+            null,
+            /*don't check local tx. */false,
+            subjId,
+            taskName,
+            deserializePortable,
+            forcePrimary,
+            null,
+            filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public V reload(K key, @Nullable IgnitePredicate<GridCacheEntry<K, V>>... filter)
+        throws IgniteCheckedException {
+        try {
+            return super.reload(key, filter);
+        }
+        catch (GridDhtInvalidPartitionException ignored) {
+            return null;
+        }
+    }
+
+    /**
+     * @param keys Keys to get
+     * @param readThrough Read through flag.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param filter Optional filter.
+     * @param expiry Expiry policy.
+     * @return Get future.
+     */
+    IgniteFuture<Map<K, V>> getDhtAllAsync(@Nullable Collection<? extends K> keys,
+        boolean readThrough,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable IgniteCacheExpiryPolicy expiry
+        ) {
+        return getAllAsync(keys,
+            readThrough,
+            null,
+            /*don't check local tx. */false,
+            subjId,
+            taskName,
+            deserializePortable,
+            false,
+            expiry,
+            filter);
+    }
+
+    /**
+     * @param reader Reader node ID.
+     * @param msgId Message ID.
+     * @param keys Keys to get.
+     * @param readThrough Read through flag.
+     * @param reload Reload flag.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param deserializePortable Deserialize portable flag.
+     * @param filter Optional filter.
+     * @param expiry Expiry policy.
+     * @return DHT future.
+     */
+    public GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> getDhtAsync(UUID reader,
+        long msgId,
+        LinkedHashMap<? extends K, Boolean> keys,
+        boolean readThrough,
+        boolean reload,
+        long topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean deserializePortable,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        @Nullable IgniteCacheExpiryPolicy expiry) {
+        GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx,
+            msgId,
+            reader,
+            keys,
+            readThrough,
+            reload,
+            /*tx*/null,
+            topVer,
+            filter,
+            subjId,
+            taskNameHash,
+            deserializePortable,
+            expiry);
+
+        fut.init();
+
+        return fut;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Get request.
+     */
+    protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest<K, V> req) {
+        assert isAffinityNode(cacheCfg);
+
+        long ttl = req.accessTtl();
+
+        final GetExpiryPolicy expiryPlc = ttl == -1L ? null : new GetExpiryPolicy(ttl);
+
+        IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
+            getDhtAsync(nodeId,
+                req.messageId(),
+                req.keys(),
+                req.readThrough(),
+                req.reload(),
+                req.topologyVersion(),
+                req.subjectId(),
+                req.taskNameHash(),
+                false,
+                req.filter(),
+                expiryPlc);
+
+        fut.listenAsync(new CI1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>>() {
+            @Override public void apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> f) {
+                GridNearGetResponse<K, V> res = new GridNearGetResponse<>(ctx.cacheId(),
+                    req.futureId(),
+                    req.miniId(),
+                    req.version());
+
+                GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
+                    (GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>>)f;
+
+                try {
+                    Collection<GridCacheEntryInfo<K, V>> entries = fut.get();
+
+                    res.entries(entries);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed processing get request: " + req, e);
+
+                    res.error(e);
+                }
+
+                res.invalidPartitions(fut.invalidPartitions(), ctx.discovery().topologyVersion());
+
+                try {
+                    ctx.io().send(nodeId, res);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId +
+                        ",req=" + req + ", res=" + res + ']', e);
+                }
+
+                sendTtlUpdateRequest(expiryPlc);
+            }
+        });
+    }
+
+    /**
+     * @param expiryPlc Expiry policy.
+     */
+    public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy expiryPlc) {
+        if (expiryPlc != null && expiryPlc.entries() != null) {
+            ctx.closures().runLocalSafe(new Runnable() {
+                @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
+                @Override public void run() {
+                    Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries = expiryPlc.entries();
+
+                    assert entries != null && !entries.isEmpty();
+
+                    Map<ClusterNode, GridCacheTtlUpdateRequest<K, V>> reqMap = new HashMap<>();
+
+                    long topVer = ctx.discovery().topologyVersion();
+
+                    for (Map.Entry<Object, IgniteBiTuple<byte[], GridCacheVersion>> e : entries.entrySet()) {
+                        List<ClusterNode> nodes = ctx.affinity().nodes((K)e.getKey(), topVer);
+
+                        for (int i = 0; i < nodes.size(); i++) {
+                            ClusterNode node = nodes.get(i);
+
+                            if (!node.isLocal()) {
+                                GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
+
+                                if (req == null) {
+                                    reqMap.put(node,
+                                        req = new GridCacheTtlUpdateRequest<>(topVer, expiryPlc.forAccess()));
+
+                                    req.cacheId(ctx.cacheId());
+                                }
+
+                                req.addEntry(e.getValue().get1(), e.getValue().get2());
+                            }
+                        }
+                    }
+
+                    Map<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> rdrs = expiryPlc.readers();
+
+                    if (rdrs != null) {
+                        assert !rdrs.isEmpty();
+
+                        for (Map.Entry<UUID, Collection<IgniteBiTuple<byte[], GridCacheVersion>>> e : rdrs.entrySet()) {
+                            ClusterNode node = ctx.node(e.getKey());
+
+                            if (node != null) {
+                                GridCacheTtlUpdateRequest<K, V> req = reqMap.get(node);
+
+                                if (req == null) {
+                                    reqMap.put(node, req = new GridCacheTtlUpdateRequest<>(topVer,
+                                        expiryPlc.forAccess()));
+
+                                    req.cacheId(ctx.cacheId());
+                                }
+
+                                for (IgniteBiTuple<byte[], GridCacheVersion> t : e.getValue())
+                                    req.addNearEntry(t.get1(), t.get2());
+                            }
+                        }
+                    }
+
+                    for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest<K, V>> req : reqMap.entrySet()) {
+                        try {
+                            ctx.io().send(req.getKey(), req.getValue());
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to send TTL update request.", e);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+    /**
+     * @param req Request.
+     */
+    private void processTtlUpdateRequest(GridCacheTtlUpdateRequest<K, V> req) {
+        if (req.keys() != null)
+            updateTtl(this, req.keys(), req.versions(), req.ttl());
+
+        if (req.nearKeys() != null) {
+            GridNearCacheAdapter<K, V> near = near();
+
+            assert near != null;
+
+            updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Entries keys.
+     * @param vers Entries versions.
+     * @param ttl TTL.
+     */
+    private void updateTtl(GridCacheAdapter<K, V> cache,
+        List<K> keys,
+        List<GridCacheVersion> vers,
+        long ttl) {
+        assert !F.isEmpty(keys);
+        assert keys.size() == vers.size();
+
+        int size = keys.size();
+
+        boolean swap = cache.context().isSwapOrOffheapEnabled();
+
+        for (int i = 0; i < size; i++) {
+            try {
+                GridCacheEntryEx<K, V> entry = null;
+
+                try {
+                    if (swap) {
+                        entry = cache.entryEx(keys.get(i));
+
+                        entry.unswap(true, false);
+                    }
+                    else
+                        entry = cache.peekEx(keys.get(i));
+
+                    if (entry != null)
+                        entry.updateTtl(vers.get(i), ttl);
+                }
+                finally {
+                    if (entry != null)
+                        cache.context().evicts().touch(entry, -1L);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to unswap entry.", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unlockAll(Collection<? extends K> keys,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        assert false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<GridCacheEntry<K, V>> entrySet(int part) {
+        return new PartitionEntrySet(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtCacheAdapter.class, this);
+    }
+
+    /**
+     *
+     */
+    private class PartitionEntrySet extends AbstractSet<GridCacheEntry<K, V>> {
+        /** */
+        private int partId;
+
+        /**
+         * @param partId Partition id.
+         */
+        private PartitionEntrySet(int partId) {
+            this.partId = partId;
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<GridCacheEntry<K, V>> iterator() {
+            final GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId,
+                ctx.discovery().topologyVersion(), false);
+
+            Iterator<GridDhtCacheEntry<K, V>> partIt = part == null ? null : part.entries().iterator();
+
+            return new PartitionEntryIterator<>(partIt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            if (!(o instanceof GridCacheEntry))
+                return false;
+
+            GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o;
+
+            K key = entry.getKey();
+            V val = entry.peek();
+
+            if (val == null)
+                return false;
+
+            try {
+                // Cannot use remove(key, val) since we may be in DHT cache and should go through near.
+                return entry(key).remove(val);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeAll(Collection<?> c) {
+            boolean rmv = false;
+
+            for (Object o : c)
+                rmv |= remove(o);
+
+            return rmv;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean contains(Object o) {
+            if (!(o instanceof GridCacheEntry))
+                return false;
+
+            GridCacheEntry<K, V> entry = (GridCacheEntry<K, V>)o;
+
+            return partId == entry.partition() && F.eq(entry.peek(), peek(entry.getKey()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            GridDhtLocalPartition<K, V> part = ctx.topology().localPartition(partId,
+                ctx.discovery().topologyVersion(), false);
+
+            return part != null ? part.publicSize() : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PartitionEntrySet.class, this, "super", super.toString());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<GridCacheClearAllRunnable<K, V>> splitClearAll() {
+        GridCacheDistributionMode mode = configuration().getDistributionMode();
+
+        return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) ? super.splitClearAll() :
+            Collections.<GridCacheClearAllRunnable<K, V>>emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) {
+        assert entry.isDht();
+
+        GridDhtLocalPartition<K, V> part = topology().localPartition(entry.partition(), -1, false);
+
+        // Do not remove entry on replica topology. Instead, add entry to removal queue.
+        // It will be cleared eventually.
+        if (part != null) {
+            try {
+                part.onDeferredDelete(entry.key(), ver);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to enqueue deleted entry [key=" + entry.key() + ", ver=" + ver + ']', e);
+            }
+        }
+    }
+
+    /**
+     * Complex partition iterator for both partition and swap iteration.
+     */
+    private static class PartitionEntryIterator<K, V> extends GridIteratorAdapter<GridCacheEntry<K, V>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Next entry. */
+        private GridCacheEntry<K, V> entry;
+
+        /** Last seen entry to support remove. */
+        private GridCacheEntry<K, V> last;
+
+        /** Partition iterator. */
+        private final Iterator<GridDhtCacheEntry<K, V>> partIt;
+
+        /**
+         * @param partIt Partition iterator.
+         */
+        private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry<K, V>> partIt) {
+            this.partIt = partIt;
+
+            advance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNextX() {
+            return entry != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheEntry<K, V> nextX() throws IgniteCheckedException {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            last = entry;
+
+            advance();
+
+            return last;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeX() throws IgniteCheckedException {
+            if (last == null)
+                throw new IllegalStateException();
+
+            last.remove();
+        }
+
+        /**
+         *
+         */
+        private void advance() {
+            if (partIt != null) {
+                while (partIt.hasNext()) {
+                    GridDhtCacheEntry<K, V> next = partIt.next();
+
+                    if (next.isInternal() || !next.visitable(CU.<K, V>empty()))
+                        continue;
+
+                    entry = next.wrap(true);
+
+                    return;
+                }
+            }
+
+            entry = null;
+        }
+    }
+
+    /**
+     * Multi update future.
+     */
+    private static class MultiUpdateFuture extends GridFutureAdapter<IgniteUuid> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Topology version. */
+        private long topVer;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public MultiUpdateFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         * @param topVer Topology version.
+         */
+        private MultiUpdateFuture(GridKernalContext ctx, long topVer) {
+            super(ctx);
+
+            this.topVer = topVer;
+        }
+
+        /**
+         * @return Topology version.
+         */
+        private long topologyVersion() {
+            return topVer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
new file mode 100644
index 0000000..3e89a0c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Replicated cache entry.
+ */
+@SuppressWarnings({"TooBroadScope", "NonPrivateFieldAccessedInSynchronizedContext"})
+public class GridDhtCacheEntry<K, V> extends GridDistributedCacheEntry<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Size overhead. */
+    private static final int DHT_SIZE_OVERHEAD = 16;
+
+    /** Gets node value from reader ID. */
+    private static final IgniteClosure<ReaderId, UUID> R2N = new C1<ReaderId, UUID>() {
+        @Override public UUID apply(ReaderId e) {
+            return e.nodeId();
+        }
+    };
+
+    /** Reader clients. */
+    @GridToStringInclude
+    private volatile List<ReaderId<K, V>> rdrs = Collections.emptyList();
+
+    /** Local partition. */
+    private final GridDhtLocalPartition<K, V> locPart;
+
+    /**
+     * @param ctx Cache context.
+     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
+     * @param key Cache key.
+     * @param hash Key hash value.
+     * @param val Entry value.
+     * @param next Next entry in the linked list.
+     * @param ttl Time to live.
+     * @param hdrId Header id.
+     */
+    public GridDhtCacheEntry(GridCacheContext<K, V> ctx, long topVer, K key, int hash, V val,
+        GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+        super(ctx, key, hash, val, next, ttl, hdrId);
+
+        // Record this entry with partition.
+        locPart = ctx.dht().topology().onAdded(topVer, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int memorySize() throws IgniteCheckedException {
+        int rdrsOverhead = 0;
+
+        synchronized (this) {
+            if (rdrs != null)
+                rdrsOverhead += ReaderId.READER_ID_SIZE * rdrs.size();
+        }
+
+        return super.memorySize() + DHT_SIZE_OVERHEAD + rdrsOverhead;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return locPart.id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDht() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean partitionValid() {
+        return locPart.valid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkedObsolete() {
+        assert !Thread.holdsLock(this);
+
+        // Remove this entry from partition mapping.
+        cctx.dht().topology().onRemoved(this);
+    }
+
+    /**
+     * @param nearVer Near version.
+     * @param rmv If {@code true}, then add to removed list if not found.
+     * @return Local candidate by near version.
+     * @throws GridCacheEntryRemovedException If removed.
+     */
+    @Nullable public synchronized GridCacheMvccCandidate<K> localCandidateByNearVersion(GridCacheVersion nearVer,
+        boolean rmv) throws GridCacheEntryRemovedException {
+        checkObsolete();
+
+        GridCacheMvcc<K> mvcc = mvccExtras();
+
+        if (mvcc != null) {
+            for (GridCacheMvccCandidate<K> c : mvcc.localCandidatesNoCopy(false)) {
+                GridCacheVersion ver = c.otherVersion();
+
+                if (ver != null && ver.equals(nearVer))
+                    return c;
+            }
+        }
+
+        if (rmv)
+            addRemoved(nearVer);
+
+        return null;
+    }
+
+    /**
+     * Add local candidate.
+     *
+     * @param nearNodeId Near node ID.
+     * @param nearVer Near version.
+     * @param topVer Topology version.
+     * @param threadId Owning thread ID.
+     * @param ver Lock version.
+     * @param timeout Timeout to acquire lock.
+     * @param reenter Reentry flag.
+     * @param tx Tx flag.
+     * @param implicitSingle Implicit flag.
+     * @return New candidate.
+     * @throws GridCacheEntryRemovedException If entry has been removed.
+     * @throws GridDistributedLockCancelledException If lock was cancelled.
+     */
+    @Nullable public GridCacheMvccCandidate<K> addDhtLocal(
+        UUID nearNodeId,
+        GridCacheVersion nearVer,
+        long topVer,
+        long threadId,
+        GridCacheVersion ver,
+        long timeout,
+        boolean reenter,
+        boolean tx,
+        boolean implicitSingle) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+        GridCacheMvccCandidate<K> cand;
+        GridCacheMvccCandidate<K> prev;
+        GridCacheMvccCandidate<K> owner;
+
+        V val;
+
+        synchronized (this) {
+            // Check removed locks prior to obsolete flag.
+            checkRemoved(ver);
+            checkRemoved(nearVer);
+
+            checkObsolete();
+
+            GridCacheMvcc<K> mvcc = mvccExtras();
+
+            if (mvcc == null) {
+                mvcc = new GridCacheMvcc<>(cctx);
+
+                mvccExtras(mvcc);
+            }
+
+            prev = mvcc.anyOwner();
+
+            boolean emptyBefore = mvcc.isEmpty();
+
+            cand = mvcc.addLocal(
+                this,
+                nearNodeId,
+                nearVer,
+                threadId,
+                ver,
+                timeout,
+                reenter,
+                tx,
+                implicitSingle,
+                /*dht-local*/true
+            );
+
+            if (cand == null)
+                return null;
+
+            cand.topologyVersion(topVer);
+
+            owner = mvcc.anyOwner();
+
+            if (owner != null)
+                cand.ownerVersion(owner.version());
+
+            boolean emptyAfter = mvcc.isEmpty();
+
+            checkCallbacks(emptyBefore, emptyAfter);
+
+            val = this.val;
+
+            if (mvcc != null && mvcc.isEmpty())
+                mvccExtras(null);
+        }
+
+        // Don't link reentries.
+        if (cand != null && !cand.reentry())
+            // Link with other candidates in the same thread.
+            cctx.mvcc().addNext(cctx, cand);
+
+        checkOwnerChanged(prev, owner, val);
+
+        return cand;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tmLock(IgniteTxEx<K, V> tx, long timeout)
+        throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
+        if (tx.local()) {
+            GridDhtTxLocalAdapter<K, V> dhtTx = (GridDhtTxLocalAdapter<K, V>)tx;
+
+            // Null is returned if timeout is negative and there is other lock owner.
+            return addDhtLocal(
+                dhtTx.nearNodeId(),
+                dhtTx.nearXidVersion(),
+                tx.topologyVersion(),
+                tx.threadId(),
+                tx.xidVersion(),
+                timeout,
+                /*reenter*/false,
+                /*tx*/true,
+                tx.implicitSingle()) != null;
+        }
+
+        try {
+            addRemote(
+                tx.nodeId(),
+                tx.otherNodeId(),
+                tx.threadId(),
+                tx.xidVersion(),
+                tx.timeout(),
+                /*tx*/true,
+                tx.implicit(),
+                null);
+
+            return true;
+        }
+        catch (GridDistributedLockCancelledException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Attempted to enter tx lock for cancelled ID (will ignore): " + tx);
+
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheMvccCandidate<K> removeLock() {
+        GridCacheMvccCandidate<K> ret = super.removeLock();
+
+        locPart.onUnlock();
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeLock(GridCacheVersion ver) throws GridCacheEntryRemovedException {
+        boolean ret = super.removeLock(ver);
+
+        locPart.onUnlock();
+
+        return ret;
+    }
+
+    /**
+     * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
+     */
+    public void onUnlock() {
+        locPart.onUnlock();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Tuple with version and value of this entry, or {@code null} if entry is new.
+     * @throws GridCacheEntryRemovedException If entry has been removed.
+     */
+    @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext"})
+    @Nullable public synchronized GridTuple3<GridCacheVersion, V, byte[]> versionedValue(long topVer)
+        throws GridCacheEntryRemovedException {
+        if (isNew() || !valid(-1) || deletedUnlocked())
+            return null;
+        else {
+            V val0 = null;
+            byte[] valBytes0 = null;
+
+            GridCacheValueBytes valBytesTuple = valueBytesUnlocked();
+
+            if (!valBytesTuple.isNull()) {
+                if (valBytesTuple.isPlain())
+                    val0 = (V)valBytesTuple.get();
+                else
+                    valBytes0 = valBytesTuple.get();
+            }
+            else
+                val0 = val;
+
+            return F.t(ver, val0, valBytes0);
+        }
+    }
+
+    /**
+     * @return Readers.
+     * @throws GridCacheEntryRemovedException If removed.
+     */
+    public Collection<UUID> readers() throws GridCacheEntryRemovedException {
+        return F.viewReadOnly(checkReaders(), R2N);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return reader ID.
+     */
+    @Nullable public ReaderId<K, V> readerId(UUID nodeId) {
+        for (ReaderId<K, V> reader : rdrs)
+            if (reader.nodeId().equals(nodeId))
+                return reader;
+
+        return null;
+    }
+
+    /**
+     * @param nodeId Reader to add.
+     * @param msgId Message ID.
+     * @param topVer Topology version.
+     * @return Future for all relevant transactions that were active at the time of adding reader,
+     *      or {@code null} if reader was added
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public IgniteFuture<Boolean> addReader(UUID nodeId, long msgId, long topVer)
+        throws GridCacheEntryRemovedException {
+        // Don't add local node as reader.
+        if (cctx.nodeId().equals(nodeId))
+            return null;
+
+        ClusterNode node = cctx.discovery().node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring near reader because node left the grid: " + nodeId);
+
+            return null;
+        }
+
+        // If remote node has no near cache, don't add it.
+        if (!U.hasNearCache(node, cacheName()) && !(key instanceof GridCacheInternal)) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring near reader because near cache is disabled: " + nodeId);
+
+            return null;
+        }
+
+        // If remote node is (primary?) or back up, don't add it as a reader.
+        if (cctx.affinity().belongs(node, partition(), topVer)) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring near reader because remote node is affinity node [locNodeId=" + cctx.localNodeId()
+                    + ", rmtNodeId=" + nodeId + ", key=" + key + ']');
+
+            return null;
+        }
+
+        boolean ret = false;
+
+        GridCacheMultiTxFuture<K, V> txFut = null;
+
+        Collection<GridCacheMvccCandidate<K>> cands = null;
+
+        ReaderId<K, V> reader;
+
+        synchronized (this) {
+            checkObsolete();
+
+            reader = readerId(nodeId);
+
+            if (reader == null) {
+                reader = new ReaderId<>(nodeId, msgId);
+
+                List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() + 1);
+
+                rdrs.addAll(this.rdrs);
+                rdrs.add(reader);
+
+                // Seal.
+                this.rdrs = Collections.unmodifiableList(rdrs);
+
+                // No transactions in ATOMIC cache.
+                if (!cctx.atomic()) {
+                    txFut = reader.getOrCreateTxFuture(cctx);
+
+                    cands = localCandidates();
+
+                    ret = true;
+                }
+            }
+            else {
+                txFut = reader.txFuture();
+
+                long id = reader.messageId();
+
+                if (id < msgId)
+                    reader.messageId(msgId);
+            }
+        }
+
+        if (ret) {
+            assert txFut != null;
+
+            if (!F.isEmpty(cands)) {
+                for (GridCacheMvccCandidate<K> c : cands) {
+                    IgniteTxEx<K, V> tx = cctx.tm().tx(c.version());
+
+                    if (tx != null) {
+                        assert tx.local();
+
+                        txFut.addTx(tx);
+                    }
+                }
+            }
+
+            txFut.init();
+
+            if (!txFut.isDone()) {
+                final ReaderId<K, V> reader0 = reader;
+
+                txFut.listenAsync(new CI1<IgniteFuture<?>>() {
+                    @Override public void apply(IgniteFuture<?> f) {
+                        synchronized (this) {
+                            // Release memory.
+                            reader0.resetTxFuture();
+                        }
+                    }
+                });
+            }
+            else {
+                synchronized (this) {
+                    // Release memory.
+                    reader.resetTxFuture();
+                }
+
+                txFut = null;
+            }
+        }
+
+        return txFut;
+    }
+
+    /**
+     * @param nodeId Reader to remove.
+     * @param msgId Message ID.
+     * @return {@code True} if reader was removed as a result of this operation.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
+    public synchronized boolean removeReader(UUID nodeId, long msgId) throws GridCacheEntryRemovedException {
+        checkObsolete();
+
+        ReaderId reader = readerId(nodeId);
+
+        if (reader == null || (reader.messageId() > msgId && msgId >= 0))
+            return false;
+
+        List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size());
+
+        for (ReaderId<K, V> rdr : this.rdrs) {
+            if (!rdr.equals(reader))
+                rdrs.add(rdr);
+        }
+
+        // Seal.
+        this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() : Collections.unmodifiableList(rdrs);
+
+        return true;
+    }
+
+    /**
+     * Clears all readers (usually when partition becomes invalid and ready for eviction).
+     */
+    @Override public synchronized void clearReaders() {
+        rdrs = Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
+        removeReader(nodeId, -1);
+    }
+
+    /**
+     * Marks entry as obsolete and, if possible or required, removes it
+     * from swap storage.
+     *
+     * @param ver Obsolete version.
+     * @param swap If {@code true} then remove from swap.
+     * @return {@code True} if entry was not being used, passed the filter and could be removed.
+     * @throws IgniteCheckedException If failed to remove from swap.
+     */
+    public boolean clearInternal(GridCacheVersion ver, boolean swap) throws IgniteCheckedException {
+        boolean rmv = false;
+
+        try {
+            synchronized (this) {
+                V prev = saveValueForIndexUnlocked();
+
+                // Call markObsolete0 to avoid recursive calls to clear if
+                // we are clearing dht local partition (onMarkedObsolete should not be called).
+                if (!markObsolete0(ver, false)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry could not be marked obsolete (it is still used or has readers): " + this);
+
+                    return false;
+                }
+
+                rdrs = Collections.emptyList();
+
+                if (log.isDebugEnabled())
+                    log.debug("Entry has been marked obsolete: " + this);
+
+                clearIndex(prev);
+
+                // Give to GC.
+                update(null, null, 0L, 0L, ver);
+
+                if (swap) {
+                    releaseSwap();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Entry has been cleared from swap storage: " + this);
+                }
+
+                rmv = true;
+
+                return true;
+            }
+        }
+        finally {
+            if (rmv)
+                cctx.cache().removeIfObsolete(key); // Clear cache.
+        }
+    }
+
+    /**
+     * @return Collection of readers after check.
+     * @throws GridCacheEntryRemovedException If removed.
+     */
+    public synchronized Collection<ReaderId<K, V>> checkReaders() throws GridCacheEntryRemovedException {
+        checkObsolete();
+
+        if (!rdrs.isEmpty()) {
+            Collection<ReaderId> rmv = null;
+
+            for (ReaderId reader : rdrs) {
+                if (!cctx.discovery().alive(reader.nodeId())) {
+                    if (rmv == null)
+                        rmv = new HashSet<>();
+
+                    rmv.add(reader);
+                }
+            }
+
+            if (rmv != null) {
+                List<ReaderId<K, V>> rdrs = new ArrayList<>(this.rdrs.size() - rmv.size());
+
+                for (ReaderId<K, V> rdr : this.rdrs) {
+                    if (!rmv.contains(rdr))
+                        rdrs.add(rdr);
+                }
+
+                // Seal.
+                this.rdrs = rdrs.isEmpty() ? Collections.<ReaderId<K, V>>emptyList() :
+                    Collections.unmodifiableList(rdrs);
+            }
+        }
+
+        return rdrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected synchronized boolean hasReaders() throws GridCacheEntryRemovedException {
+        checkReaders();
+
+        return !rdrs.isEmpty();
+    }
+
+    /**
+     * Sets mappings into entry.
+     *
+     * @param ver Version.
+     * @param mappings Mappings to set.
+     * @return Candidate, if one existed for the version, or {@code null} if candidate was not found.
+     * @throws GridCacheEntryRemovedException If removed.
+     */
+    @Nullable public synchronized GridCacheMvccCandidate<K> mappings(GridCacheVersion ver, Collection<UUID> mappings)
+        throws GridCacheEntryRemovedException {
+        checkObsolete();
+
+        GridCacheMvcc<K> mvcc = mvccExtras();
+
+        GridCacheMvccCandidate<K> cand = mvcc == null ? null : mvcc.candidate(ver);
+
+        if (cand != null)
+            cand.mappedNodeIds(mappings);
+
+        return cand;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheEntry<K, V> wrap(boolean prjAware) {
+        GridCacheContext<K, V> nearCtx = cctx.dht().near().context();
+
+        GridCacheProjectionImpl<K, V> prjPerCall = nearCtx.projectionPerCall();
+
+        if (prjPerCall != null && prjAware)
+            return new GridPartitionedCacheEntryImpl<>(prjPerCall, nearCtx, key, this);
+
+        return new GridPartitionedCacheEntryImpl<>(null, nearCtx, key, this);
+    }
+
+    /**
+     * @return Cache name.
+     */
+    protected String cacheName() {
+        return cctx.dht().near().name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized String toString() {
+        return S.toString(GridDhtCacheEntry.class, this, "super", super.toString());
+    }
+
+    /**
+     * Reader ID.
+     */
+    private static class ReaderId<K, V> {
+        /** Reader ID size. */
+        private static final int READER_ID_SIZE = 24;
+
+        /** Node ID. */
+        private UUID nodeId;
+
+        /** Message ID. */
+        private long msgId;
+
+        /** Transaction future. */
+        private GridCacheMultiTxFuture<K, V> txFut;
+
+        /**
+         * @param nodeId Node ID.
+         * @param msgId Message ID.
+         */
+        ReaderId(UUID nodeId, long msgId) {
+            this.nodeId = nodeId;
+            this.msgId = msgId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Message ID.
+         */
+        long messageId() {
+            return msgId;
+        }
+
+        /**
+         * @param msgId Message ID.
+         */
+        void messageId(long msgId) {
+            this.msgId = msgId;
+        }
+
+        /**
+         * @param cctx Cache context.
+         * @return Transaction future.
+         */
+        GridCacheMultiTxFuture<K, V> getOrCreateTxFuture(GridCacheContext<K, V> cctx) {
+            if (txFut == null)
+                txFut = new GridCacheMultiTxFuture<>(cctx);
+
+            return txFut;
+        }
+
+        /**
+         * @return Transaction future.
+         */
+        GridCacheMultiTxFuture<K, V> txFuture() {
+            return txFut;
+        }
+
+        /**
+         * Sets multi-transaction future to {@code null}.
+         *
+         * @return Previous transaction future.
+         */
+        GridCacheMultiTxFuture<K, V> resetTxFuture() {
+            GridCacheMultiTxFuture<K, V> txFut = this.txFut;
+
+            this.txFut = null;
+
+            return txFut;
+        }
+
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof ReaderId))
+                return false;
+
+            ReaderId readerId = (ReaderId)o;
+
+            return msgId == readerId.msgId && nodeId.equals(readerId.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + (int)(msgId ^ (msgId >>> 32));
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ReaderId.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java
new file mode 100644
index 0000000..0e4102d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntryImpl.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCachePeekMode.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+
+/**
+ * Colocated cache entry public API.
+ */
+public class GridDhtCacheEntryImpl<K, V> extends GridCacheEntryImpl<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtCacheEntryImpl() {
+        // No-op.
+    }
+
+    /**
+     * @param nearPrj Parent projection or {@code null} if entry belongs to default cache.
+     * @param ctx Near cache context.
+     * @param key key.
+     * @param cached Cached entry (either from near or dht cache map).
+     */
+    @SuppressWarnings({"TypeMayBeWeakened"})
+    public GridDhtCacheEntryImpl(GridCacheProjectionImpl<K, V> nearPrj, GridCacheContext<K, V> ctx, K key,
+        @Nullable GridCacheEntryEx<K, V> cached) {
+        super(nearPrj, ctx, key, cached);
+
+        assert !this.ctx.isDht() || !isNearEnabled(ctx);
+    }
+
+    /**
+     * @return Dht cache.
+     */
+    private GridDhtCacheAdapter<K, V> dht() {
+        return ctx.dht();
+    }
+
+    /** {@inheritDoc} */
+    @Override public V peek(@Nullable Collection<GridCachePeekMode> modes) throws IgniteCheckedException {
+        if (!ctx.isNear() && modes.contains(NEAR_ONLY))
+            return null;
+
+        V val = null;
+
+        if (!modes.contains(PARTITIONED_ONLY))
+            val = super.peek(modes);
+
+        if (val == null)
+            val = peekDht0(modes, CU.<K, V>empty());
+
+        return val;
+    }
+
+    /**
+     * @param filter Filter.
+     * @return Peeked value.
+     */
+    @Nullable private V peekDht(@Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        try {
+            return peekDht0(SMART, filter);
+        }
+        catch (IgniteCheckedException e) {
+            // Should never happen.
+            throw new IgniteException("Unable to perform entry peek() operation.", e);
+        }
+    }
+
+    /**
+     * @param modes Peek modes.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private V peekDht0(@Nullable Collection<GridCachePeekMode> modes,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+        if (F.isEmpty(modes))
+            return peekDht0(SMART, filter);
+
+        assert modes != null;
+
+        for (GridCachePeekMode mode : modes) {
+            V val = peekDht0(mode, filter);
+
+            if (val != null)
+                return val;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param mode Peek mode.
+     * @param filter Optional entry filter.
+     * @return Peeked value.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings({"unchecked"})
+    @Nullable private V peekDht0(@Nullable GridCachePeekMode mode,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+        if (mode == null)
+            mode = SMART;
+
+        while (true) {
+            GridCacheProjectionImpl<K, V> prjPerCall = proxy.gateProjection();
+
+            if (prjPerCall != null)
+                filter = ctx.vararg(F.and(ctx.vararg(proxy.predicate()), filter));
+
+            GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prjPerCall);
+
+            try {
+                GridCacheEntryEx<K, V> entry = dht().peekEx(key);
+
+                return entry == null ? null : ctx.cloneOnFlag(entry.peek(mode, filter));
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                // No-op.
+            }
+            finally {
+                ctx.gate().leave(prev);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocked() {
+        // Check colocated explicit locks.
+        return ctx.mvcc().isLockedByThread(key, -1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLockedByThread() {
+        // Check colocated explicit locks.
+        return ctx.mvcc().isLockedByThread(key, Thread.currentThread().getId());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtCacheEntryImpl.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
new file mode 100644
index 0000000..15ca4a0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Embedded DHT future.
+ */
+public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A, B> implements GridDhtFuture<A> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Retries. */
+    @GridToStringInclude
+    private Collection<Integer> invalidParts;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtEmbeddedFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param embedded Embedded.
+     * @param c Closure.
+     */
+    public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c) {
+        super(ctx, embedded, c);
+
+        invalidParts = Collections.emptyList();
+    }
+
+    /**
+     * @param embedded Future to embed.
+     * @param c Embedding closure.
+     * @param ctx Kernal context.
+     */
+    public GridDhtEmbeddedFuture(IgniteFuture<B> embedded,
+        IgniteBiClosure<B, Exception, IgniteFuture<A>> c, GridKernalContext ctx) {
+        super(embedded, c, ctx);
+
+        invalidParts = Collections.emptyList();
+    }
+
+    /**
+     * @param ctx Context.
+     * @param embedded Embedded.
+     * @param c Closure.
+     * @param invalidParts Retries.
+     */
+    public GridDhtEmbeddedFuture(GridKernalContext ctx, IgniteFuture<B> embedded, IgniteBiClosure<B, Exception, A> c,
+        Collection<Integer> invalidParts) {
+        super(ctx, embedded, c);
+
+        this.invalidParts = invalidParts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> invalidPartitions() {
+        return invalidParts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtEmbeddedFuture.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFinishedFuture.java
new file mode 100644
index 0000000..8dd4492
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFinishedFuture.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Finished DHT future.
+ */
+public class GridDhtFinishedFuture<T> extends GridFinishedFuture<T> implements GridDhtFuture<T> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDhtFinishedFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param t Result.
+     */
+    public GridDhtFinishedFuture(GridKernalContext ctx, T t) {
+        super(ctx, t);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param err Error.
+     */
+    public GridDhtFinishedFuture(GridKernalContext ctx, Throwable err) {
+        super(ctx, err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> invalidPartitions() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtFinishedFuture.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java
new file mode 100644
index 0000000..af494d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtFuture.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Keys to retry.
+ */
+public interface GridDhtFuture<T> extends IgniteFuture<T> {
+    /**
+     * Node that future should be able to provide keys to retry before
+     * it completes, so it's not necessary to wait till future is done
+     * to get retry keys.
+     *
+     * @return Keys to retry because this node is no longer a primary or backup.
+     */
+    public Collection<Integer> invalidPartitions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
new file mode 100644
index 0000000..600abed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo<K, V>>>
+    implements GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Message ID. */
+    private long msgId;
+
+    /** */
+    private UUID reader;
+
+    /** Reload flag. */
+    private boolean reload;
+
+    /** Read through flag. */
+    private boolean readThrough;
+
+    /** Context. */
+    private GridCacheContext<K, V> cctx;
+
+    /** Keys. */
+    private LinkedHashMap<? extends K, Boolean> keys;
+
+    /** Reserved partitions. */
+    private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5);
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /** Topology version .*/
+    private long topVer;
+
+    /** Transaction. */
+    private IgniteTxLocalEx<K, V> tx;
+
+    /** Filters. */
+    private IgnitePredicate<GridCacheEntry<K, V>>[] filters;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Retries because ownership changed. */
+    private Collection<Integer> retries = new GridLeanSet<>();
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name. */
+    private int taskNameHash;
+
+    /** Whether to deserialize portable objects. */
+    private boolean deserializePortable;
+
+    /** Expiry policy. */
+    private IgniteCacheExpiryPolicy expiryPlc;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridDhtGetFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param cctx Context.
+     * @param msgId Message ID.
+     * @param reader Reader.
+     * @param keys Keys.
+     * @param readThrough Read through flag.
+     * @param reload Reload flag.
+     * @param tx Transaction.
+     * @param topVer Topology version.
+     * @param filters Filters.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     */
+    public GridDhtGetFuture(
+        GridCacheContext<K, V> cctx,
+        long msgId,
+        UUID reader,
+        LinkedHashMap<? extends K, Boolean> keys,
+        boolean readThrough,
+        boolean reload,
+        @Nullable IgniteTxLocalEx<K, V> tx,
+        long topVer,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc) {
+        super(cctx.kernalContext(), CU.<GridCacheEntryInfo<K, V>>collectionsReducer());
+
+        assert reader != null;
+        assert !F.isEmpty(keys);
+
+        this.reader = reader;
+        this.cctx = cctx;
+        this.msgId = msgId;
+        this.keys = keys;
+        this.readThrough = readThrough;
+        this.reload = reload;
+        this.filters = filters;
+        this.tx = tx;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.deserializePortable = deserializePortable;
+        this.taskNameHash = taskNameHash;
+        this.expiryPlc = expiryPlc;
+
+        futId = IgniteUuid.randomUuid();
+
+        ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+
+        log = U.logger(ctx, logRef, GridDhtGetFuture.class);
+
+        syncNotify(true);
+    }
+
+    /**
+     * Initializes future.
+     */
+    void init() {
+        map(keys);
+
+        markInitialized();
+    }
+
+    /**
+     * @return Keys.
+     */
+    Collection<? extends K> keys() {
+        return keys.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> invalidPartitions() {
+        return retries;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Future version.
+     */
+    public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(Collection<GridCacheEntryInfo<K, V>> res, Throwable err) {
+        if (super.onDone(res, err)) {
+            // Release all partitions reserved by this future.
+            for (GridDhtLocalPartition part : parts)
+                part.release();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param keys Keys.
+     */
+    private void map(final LinkedHashMap<? extends K, Boolean> keys) {
+        GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+
+        if (!F.isEmpty(fut.invalidPartitions()))
+            retries.addAll(fut.invalidPartitions());
+
+        add(new GridEmbeddedFuture<>(cctx.kernalContext(), fut,
+            new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo<K, V>>>() {
+                @Override public Collection<GridCacheEntryInfo<K, V>> apply(Object o, Exception e) {
+                    if (e != null) { // Check error first.
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
+
+                        onDone(e);
+                    }
+
+                    LinkedHashMap<K, Boolean> mappedKeys = U.newLinkedHashMap(keys.size());
+
+                    // Assign keys to primary nodes.
+                    for (Map.Entry<? extends K, Boolean> key : keys.entrySet()) {
+                        int part = cctx.affinity().partition(key.getKey());
+
+                        if (!retries.contains(part)) {
+                            if (!map(key.getKey(), parts))
+                                retries.add(part);
+                            else
+                                mappedKeys.put(key.getKey(), key.getValue());
+                        }
+                    }
+
+                    // Add new future.
+                    add(getAsync(mappedKeys));
+
+                    // Finish this one.
+                    return Collections.emptyList();
+                }
+            })
+        );
+    }
+
+    /**
+     * @param key Key.
+     * @param parts Parts to map.
+     * @return {@code True} if mapped.
+     */
+    private boolean map(K key, Collection<GridDhtLocalPartition> parts) {
+        GridDhtLocalPartition part = topVer > 0 ?
+            cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
+            cache().topology().localPartition(key, false);
+
+        if (part == null)
+            return false;
+
+        if (!parts.contains(part)) {
+            // By reserving, we make sure that partition won't be unloaded while processed.
+            if (part.reserve()) {
+                parts.add(part);
+
+                return true;
+            }
+            else
+                return false;
+        }
+        else
+            return true;
+    }
+
+    /**
+     * @param keys Keys to get.
+     * @return Future for local get.
+     */
+    @SuppressWarnings( {"unchecked", "IfMayBeConditional"})
+    private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) {
+        if (F.isEmpty(keys))
+            return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(),
+                Collections.<GridCacheEntryInfo<K, V>>emptyList());
+
+        final Collection<GridCacheEntryInfo<K, V>> infos = new LinkedList<>();
+
+        String taskName0 = ctx.job().currentTaskName();
+
+        if (taskName0 == null)
+            taskName0 = ctx.task().resolveTaskName(taskNameHash);
+
+        final String taskName = taskName0;
+
+        GridCompoundFuture<Boolean, Boolean> txFut = null;
+
+        for (Map.Entry<? extends K, Boolean> k : keys.entrySet()) {
+            while (true) {
+                GridDhtCacheEntry<K, V> e = cache().entryExx(k.getKey(), topVer);
+
+                try {
+                    GridCacheEntryInfo<K, V> info = e.info();
+
+                    // If entry is obsolete.
+                    if (info == null)
+                        continue;
+
+                    // Register reader. If there are active transactions for this entry,
+                    // then will wait for their completion before proceeding.
+                    // TODO: GG-4003:
+                    // TODO: What if any transaction we wait for actually removes this entry?
+                    // TODO: In this case seems like we will be stuck with untracked near entry.
+                    // TODO: To fix, check that reader is contained in the list of readers once
+                    // TODO: again after the returned future completes - if not, try again.
+                    // TODO: Also, why is info read before transactions are complete, and not after?
+                    IgniteFuture<Boolean> f = (!e.deleted() && k.getValue()) ? e.addReader(reader, msgId, topVer) : null;
+
+                    if (f != null) {
+                        if (txFut == null)
+                            txFut = new GridCompoundFuture<>(cctx.kernalContext(), CU.boolReducer());
+
+                        txFut.add(f);
+                    }
+
+                    infos.add(info);
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry when getting a DHT value: " + e);
+                }
+                finally {
+                    cctx.evicts().touch(e, topVer);
+                }
+            }
+        }
+
+        if (txFut != null)
+            txFut.markInitialized();
+
+        IgniteFuture<Map<K, V>> fut;
+
+        if (txFut == null || txFut.isDone()) {
+            if (reload && cctx.readThrough() && cctx.store().configured()) {
+                fut = cache().reloadAllAsync(keys.keySet(),
+                    true,
+                    subjId,
+                    taskName,
+                    filters);
+            }
+            else {
+                if (tx == null) {
+                    fut = cache().getDhtAllAsync(keys.keySet(),
+                        readThrough,
+                        subjId,
+                        taskName,
+                        deserializePortable,
+                        filters,
+                        expiryPlc);
+                }
+                else {
+                    fut = tx.getAllAsync(cctx,
+                        keys.keySet(),
+                        null,
+                        deserializePortable,
+                        filters);
+                }
+            }
+        }
+        else {
+            // If we are here, then there were active transactions for some entries
+            // when we were adding the reader. In that case we must wait for those
+            // transactions to complete.
+            fut = new GridEmbeddedFuture<>(
+                txFut,
+                new C2<Boolean, Exception, IgniteFuture<Map<K, V>>>() {
+                    @Override public IgniteFuture<Map<K, V>> apply(Boolean b, Exception e) {
+                        if (e != null)
+                            throw new GridClosureException(e);
+
+                        if (reload && cctx.readThrough() && cctx.store().configured()) {
+                            return cache().reloadAllAsync(keys.keySet(),
+                                true,
+                                subjId,
+                                taskName,
+                                filters);
+                        }
+                        else {
+                            if (tx == null) {
+                                return cache().getDhtAllAsync(keys.keySet(),
+                                    readThrough,
+                                    subjId,
+                                    taskName,
+                                    deserializePortable,
+                                    filters,
+                                    expiryPlc);
+                            }
+                            else {
+                                return tx.getAllAsync(cctx,
+                                    keys.keySet(),
+                                    null,
+                                    deserializePortable,
+                                    filters);
+                            }
+                        }
+                    }
+                },
+                cctx.kernalContext());
+        }
+
+        return new GridEmbeddedFuture<>(cctx.kernalContext(), fut,
+            new C2<Map<K, V>, Exception, Collection<GridCacheEntryInfo<K, V>>>() {
+                @Override public Collection<GridCacheEntryInfo<K, V>> apply(Map<K, V> map, Exception e) {
+                    if (e != null) {
+                        onDone(e);
+
+                        return Collections.emptyList();
+                    }
+                    else {
+                        for (Iterator<GridCacheEntryInfo<K, V>> it = infos.iterator(); it.hasNext();) {
+                            GridCacheEntryInfo<K, V> info = it.next();
+
+                            V v = map.get(info.key());
+
+                            if (v == null)
+                                it.remove();
+                            else
+                                info.value(v);
+                        }
+
+                        return infos;
+                    }
+                }
+            });
+    }
+
+    /**
+     * @return DHT cache.
+     */
+    private GridDhtCacheAdapter<K, V> cache() {
+        return (GridDhtCacheAdapter<K, V>)cctx.cache();
+    }
+}


Mime
View raw message