ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
deleted file mode 100644
index 31dde89..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ /dev/null
@@ -1,1492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-
-/**
- * Base class for transactional DHT caches.
- */
-public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCacheAdapter<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    protected GridDhtTransactionalCacheAdapter() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     */
-    protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx) {
-        super(ctx);
-    }
-
-    /**
-     * Constructor used for near-only cache.
-     *
-     * @param ctx Cache context.
-     * @param map Cache map.
-     */
-    protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap<K, V> map) {
-        super(ctx, map);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        preldr = new GridDhtPreloader<>(ctx);
-
-        preldr.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2<UUID, GridNearGetRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, GridNearGetRequest<K, V> req) {
-                processNearGetRequest(nodeId, req);
-            }
-        });
-
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2<UUID, GridNearLockRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, GridNearLockRequest<K, V> req) {
-                processNearLockRequest(nodeId, req);
-            }
-        });
-
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2<UUID, GridDhtLockRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, GridDhtLockRequest<K, V> req) {
-                processDhtLockRequest(nodeId, req);
-            }
-        });
-
-        ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2<UUID, GridDhtLockResponse<K, V>>() {
-            @Override public void apply(UUID nodeId, GridDhtLockResponse<K, V> req) {
-                processDhtLockResponse(nodeId, req);
-            }
-        });
-
-        ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2<UUID, GridNearUnlockRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, GridNearUnlockRequest<K, V> req) {
-                processNearUnlockRequest(nodeId, req);
-            }
-        });
-
-        ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2<UUID, GridDhtUnlockRequest<K, V>>() {
-            @Override public void apply(UUID nodeId, GridDhtUnlockRequest<K, V> req) {
-                processDhtUnlockRequest(nodeId, req);
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public abstract GridNearTransactionalCache<K, V> near();
-
-    /**
-     * @param nodeId Primary node ID.
-     * @param req Request.
-     * @param res Response.
-     * @return Remote transaction.
-     * @throws IgniteCheckedException If failed.
-     * @throws GridDistributedLockCancelledException If lock has been cancelled.
-     */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
-        GridDhtLockRequest<K, V> req,
-        GridDhtLockResponse<K, V> res)
-        throws IgniteCheckedException, GridDistributedLockCancelledException {
-        List<K> keys = req.keys();
-        List<IgniteTxEntry<K, V>> writes = req.writeEntries();
-
-        GridDhtTxRemote<K, V> tx = null;
-
-        int size = F.size(keys);
-
-        for (int i = 0; i < size; i++) {
-            K key = keys.get(i);
-
-            if (key == null)
-                continue;
-
-            IgniteTxKey<K> txKey = ctx.txKey(key);
-
-            IgniteTxEntry<K, V> writeEntry = writes == null ? null : writes.get(i);
-
-            assert F.isEmpty(req.candidatesByIndex(i));
-
-            GridCacheVersion drVer = req.drVersionByIndex(i);
-
-            if (log.isDebugEnabled())
-                log.debug("Unmarshalled key: " + key);
-
-            GridDistributedCacheEntry<K, V> entry = null;
-
-            while (true) {
-                try {
-                    int part = ctx.affinity().partition(key);
-
-                    GridDhtLocalPartition<K, V> locPart = ctx.topology().localPartition(part, req.topologyVersion(),
-                        false);
-
-                    if (locPart == null || !locPart.reserve()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Local partition for given key is already evicted (will add to invalid " +
-                                "partition list) [key=" + key + ", part=" + part + ", locPart=" + locPart + ']');
-
-                        res.addInvalidPartition(part);
-
-                        // Invalidate key in near cache, if any.
-                        if (isNearEnabled(cacheCfg))
-                            obsoleteNearEntry(key, req.version());
-
-                        break;
-                    }
-
-                    try {
-                        // Handle implicit locks for pessimistic transactions.
-                        if (req.inTx()) {
-                            if (tx == null)
-                                tx = ctx.tm().tx(req.version());
-
-                            if (tx == null) {
-                                tx = new GridDhtTxRemote<>(
-                                    ctx.shared(),
-                                    req.nodeId(),
-                                    req.futureId(),
-                                    nodeId,
-                                    req.nearXidVersion(),
-                                    req.threadId(),
-                                    req.topologyVersion(),
-                                    req.version(),
-                                    /*commitVer*/null,
-                                    ctx.system(),
-                                    PESSIMISTIC,
-                                    req.isolation(),
-                                    req.isInvalidate(),
-                                    req.timeout(),
-                                    req.txSize(),
-                                    req.groupLockKey(),
-                                    req.subjectId(),
-                                    req.taskNameHash());
-
-                                tx = ctx.tm().onCreated(tx);
-
-                                if (tx == null || !ctx.tm().onStarted(tx))
-                                    throw new IgniteTxRollbackException("Failed to acquire lock (transaction " +
-                                        "has been completed) [ver=" + req.version() + ", tx=" + tx + ']');
-                            }
-
-                            tx.addWrite(
-                                ctx,
-                                writeEntry == null ? NOOP : writeEntry.op(),
-                                txKey,
-                                req.keyBytes() != null ? req.keyBytes().get(i) : null,
-                                writeEntry == null ? null : writeEntry.value(),
-                                writeEntry == null ? null : writeEntry.valueBytes(),
-                                writeEntry == null ? null : writeEntry.entryProcessors(),
-                                drVer,
-                                req.accessTtl());
-
-                            if (req.groupLock())
-                                tx.groupLockKey(txKey);
-                        }
-
-                        entry = entryExx(key, req.topologyVersion());
-
-                        // Add remote candidate before reordering.
-                        entry.addRemote(
-                            req.nodeId(),
-                            nodeId,
-                            req.threadId(),
-                            req.version(),
-                            req.timeout(),
-                            tx != null,
-                            tx != null && tx.implicitSingle(),
-                            null
-                        );
-
-                        // Invalidate key in near cache, if any.
-                        if (isNearEnabled(cacheCfg) && req.invalidateNearEntry(i))
-                            invalidateNearEntry(key, req.version());
-
-                        // Get entry info after candidate is added.
-                        if (req.needPreloadKey(i)) {
-                            entry.unswap();
-
-                            GridCacheEntryInfo<K, V> info = entry.info();
-
-                            if (info != null && !info.isNew() && !info.isDeleted())
-                                res.addPreloadEntry(info);
-                        }
-
-                        // Double-check in case if sender node left the grid.
-                        if (ctx.discovery().node(req.nodeId()) == null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
-
-                            entry.removeLock(req.version());
-
-                            if (tx != null) {
-                                tx.clearEntry(txKey);
-
-                                // If there is a concurrent salvage, there could be a case when tx is moved to
-                                // COMMITTING state, but this lock is never acquired.
-                                if (tx.state() == COMMITTING)
-                                    tx.forceCommit();
-                                else
-                                    tx.rollback();
-                            }
-
-                            return null;
-                        }
-
-                        // Entry is legit.
-                        break;
-                    }
-                    finally {
-                        locPart.release();
-                    }
-                }
-                catch (GridDhtInvalidPartitionException e) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received invalid partition exception [e=" + e + ", req=" + req + ']');
-
-                    res.addInvalidPartition(e.partition());
-
-                    // Invalidate key in near cache, if any.
-                    if (isNearEnabled(cacheCfg))
-                        obsoleteNearEntry(key, req.version());
-
-                    if (tx != null) {
-                        tx.clearEntry(txKey);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Cleared invalid entry from remote transaction (will skip) [entry=" +
-                                entry + ", tx=" + tx + ']');
-                    }
-
-                    break;
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
-                        entry;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
-
-                    if (tx != null) {
-                        tx.clearEntry(txKey);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
-                                entry + ", tx=" + tx + ']');
-                    }
-                }
-            }
-        }
-
-        if (tx != null && tx.empty()) {
-            if (log.isDebugEnabled())
-                log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']');
-
-            tx.rollback();
-
-            tx = null;
-        }
-
-        return tx;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest<K, V> req) {
-        IgniteFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
-            ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());
-
-        if (keyFut == null || keyFut.isDone())
-            processDhtLockRequest0(nodeId, req);
-        else {
-            keyFut.listenAsync(new CI1<IgniteFuture<Object>>() {
-                @Override public void apply(IgniteFuture<Object> t) {
-                    processDhtLockRequest0(nodeId, req);
-                }
-            });
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest<K, V> req) {
-        assert nodeId != null;
-        assert req != null;
-        assert !nodeId.equals(locNodeId);
-
-        if (log.isDebugEnabled())
-            log.debug("Processing dht lock request [locNodeId=" + locNodeId + ", nodeId=" + nodeId + ", req=" + req +
-                ']');
-
-        int cnt = F.size(req.keys());
-
-        GridDhtLockResponse<K, V> res;
-
-        GridDhtTxRemote<K, V> dhtTx = null;
-        GridNearTxRemote<K, V> nearTx = null;
-
-        boolean fail = false;
-        boolean cancelled = false;
-
-        try {
-            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), cnt);
-
-            dhtTx = startRemoteTx(nodeId, req, res);
-            nearTx = isNearEnabled(cacheCfg) ? near().startRemoteTx(nodeId, req) : null;
-
-            if (nearTx != null && !nearTx.empty())
-                res.nearEvicted(nearTx.evicted());
-            else {
-                if (!F.isEmpty(req.nearKeys())) {
-                    Collection<IgniteTxKey<K>> nearEvicted = new ArrayList<>(req.nearKeys().size());
-
-                    nearEvicted.addAll(F.viewReadOnly(req.nearKeys(), new C1<K, IgniteTxKey<K>>() {
-                        @Override public IgniteTxKey<K> apply(K k) {
-                            return ctx.txKey(k);
-                        }
-                    }));
-
-                    res.nearEvicted(nearEvicted);
-                }
-            }
-        }
-        catch (IgniteTxRollbackException e) {
-            String err = "Failed processing DHT lock request (transaction has been completed): " + req;
-
-            U.error(log, err, e);
-
-            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(),
-                new IgniteTxRollbackException(err, e));
-
-            fail = true;
-        }
-        catch (IgniteCheckedException e) {
-            String err = "Failed processing DHT lock request: " + req;
-
-            U.error(log, err, e);
-
-            res = new GridDhtLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), new IgniteCheckedException(err, e));
-
-            fail = true;
-        }
-        catch (GridDistributedLockCancelledException ignored) {
-            // Received lock request for cancelled lock.
-            if (log.isDebugEnabled())
-                log.debug("Received lock request for canceled lock (will ignore): " + req);
-
-            res = null;
-
-            fail = true;
-            cancelled = true;
-        }
-
-        boolean releaseAll = false;
-
-        if (res != null) {
-            try {
-                // Reply back to sender.
-                ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
-            }
-            catch (ClusterTopologyException ignored) {
-                U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId);
-
-                fail = true;
-                releaseAll = true;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send lock reply to node (lock will not be acquired): " + nodeId, e);
-
-                fail = true;
-            }
-        }
-
-        if (fail) {
-            if (dhtTx != null)
-                dhtTx.rollback();
-
-            if (nearTx != null) // Even though this should never happen, we leave this check for consistency.
-                nearTx.rollback();
-
-            List<K> keys = req.keys();
-
-            if (keys != null) {
-                for (K key : keys) {
-                    while (true) {
-                        GridDistributedCacheEntry<K, V> entry = peekExx(key);
-
-                        try {
-                            if (entry != null) {
-                                // Release all locks because sender node left grid.
-                                if (releaseAll)
-                                    entry.removeExplicitNodeLocks(req.nodeId());
-                                else
-                                    entry.removeLock(req.version());
-                            }
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Attempted to remove lock on removed entity during during failure " +
-                                    "handling for dht lock request (will retry): " + entry);
-                        }
-                    }
-                }
-            }
-
-            if (releaseAll && !cancelled)
-                U.warn(log, "Sender node left grid in the midst of lock acquisition (locks have been released).");
-        }
-    }
-
-    /** {@inheritDoc} */
-    protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest<K, V> req) {
-        clearLocks(nodeId, req);
-
-        if (isNearEnabled(cacheCfg))
-            near().clearLocks(nodeId, req);
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    private void processNearLockRequest(UUID nodeId, GridNearLockRequest<K, V> req) {
-        assert isAffinityNode(cacheCfg);
-        assert nodeId != null;
-        assert req != null;
-
-        if (log.isDebugEnabled())
-            log.debug("Processing near lock request [locNodeId=" + locNodeId + ", nodeId=" + nodeId + ", req=" + req +
-                ']');
-
-        ClusterNode nearNode = ctx.discovery().node(nodeId);
-
-        if (nearNode == null) {
-            U.warn(log, "Received lock request from unknown node (will ignore): " + nodeId);
-
-            return;
-        }
-
-        // Group lock can be only started from local node, so we never start group lock transaction on remote node.
-        IgniteFuture<?> f = lockAllAsync(ctx, nearNode, req, null);
-
-        // Register listener just so we print out errors.
-        // Exclude lock timeout exception since it's not a fatal exception.
-        f.listenAsync(CU.errorLogger(log, GridCacheLockTimeoutException.class,
-            GridDistributedLockCancelledException.class));
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    private void processDhtLockResponse(UUID nodeId, GridDhtLockResponse<K, V> res) {
-        assert nodeId != null;
-        assert res != null;
-        GridDhtLockFuture<K, V> fut = (GridDhtLockFuture<K, V>)ctx.mvcc().<Boolean>future(res.version(),
-            res.futureId());
-
-        if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Received response for unknown future (will ignore): " + res);
-
-            return;
-        }
-
-        fut.onResult(nodeId, res);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys,
-        long timeout,
-        IgniteTxLocalEx<K, V> txx,
-        boolean isInvalidate,
-        boolean isRead,
-        boolean retval,
-        IgniteTxIsolation isolation,
-        long accessTtl,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        return lockAllAsyncInternal(keys,
-            timeout,
-            txx,
-            isInvalidate,
-            isRead,
-            retval,
-            isolation,
-            accessTtl,
-            filter);
-    }
-
-    /**
-     * Acquires locks in partitioned cache.
-     *
-     * @param keys Keys to lock.
-     * @param timeout Lock timeout.
-     * @param txx Transaction.
-     * @param isInvalidate Invalidate flag.
-     * @param isRead Read flag.
-     * @param retval Return value flag.
-     * @param isolation Transaction isolation.
-     * @param accessTtl TTL for read operation.
-     * @param filter Optional filter.
-     * @return Lock future.
-     */
-    public GridDhtFuture<Boolean> lockAllAsyncInternal(@Nullable Collection<? extends K> keys,
-        long timeout,
-        IgniteTxLocalEx<K, V> txx,
-        boolean isInvalidate,
-        boolean isRead,
-        boolean retval,
-        IgniteTxIsolation isolation,
-        long accessTtl,
-        IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
-        if (keys == null || keys.isEmpty())
-            return new GridDhtFinishedFuture<>(ctx.kernalContext(), true);
-
-        GridDhtTxLocalAdapter<K, V> tx = (GridDhtTxLocalAdapter<K, V>)txx;
-
-        assert tx != null;
-
-        GridDhtLockFuture<K, V> fut = new GridDhtLockFuture<>(
-            ctx,
-            tx.nearNodeId(),
-            tx.nearXidVersion(),
-            tx.topologyVersion(),
-            keys.size(),
-            isRead,
-            timeout,
-            tx,
-            tx.threadId(),
-            accessTtl,
-            filter);
-
-        for (K key : keys) {
-            if (key == null)
-                continue;
-
-            try {
-                while (true) {
-                    GridDhtCacheEntry<K, V> entry = entryExx(key, tx.topologyVersion());
-
-                    try {
-                        fut.addEntry(entry);
-
-                        // Possible in case of cancellation or time out.
-                        if (fut.isDone())
-                            return fut;
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry when adding lock (will retry): " + entry);
-                    }
-                    catch (GridDistributedLockCancelledException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got lock request for cancelled lock (will fail): " + entry);
-
-                        return new GridDhtFinishedFuture<>(ctx.kernalContext(), e);
-                    }
-                }
-            }
-            catch (GridDhtInvalidPartitionException e) {
-                fut.addInvalidPartition(ctx, e.partition());
-
-                if (log.isDebugEnabled())
-                    log.debug("Added invalid partition to DHT lock future [part=" + e.partition() + ", fut=" +
-                        fut + ']');
-            }
-        }
-
-        ctx.mvcc().addFuture(fut);
-
-        fut.map();
-
-        return fut;
-    }
-
-    /**
-     * @param cacheCtx Cache context.
-     * @param nearNode Near node.
-     * @param req Request.
-     * @param filter0 Filter.
-     * @return Future.
-     */
-    public IgniteFuture<GridNearLockResponse<K, V>> lockAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
-        final ClusterNode nearNode,
-        final GridNearLockRequest<K, V> req,
-        @Nullable final IgnitePredicate<GridCacheEntry<K, V>>[] filter0) {
-        final List<K> keys = req.keys();
-
-        IgniteFuture<Object> keyFut = null;
-
-        if (req.onePhaseCommit()) {
-            boolean forceKeys = req.hasTransforms() || req.filter() != null;
-
-            if (!forceKeys) {
-                for (int i = 0; i < req.keysCount() && !forceKeys; i++)
-                    forceKeys |= req.returnValue(i);
-            }
-
-            if (forceKeys)
-                keyFut = ctx.dht().dhtPreloader().request(keys, req.topologyVersion());
-        }
-
-        if (keyFut == null)
-            keyFut = new GridFinishedFutureEx<>();
-
-        return new GridEmbeddedFuture<>(true, keyFut,
-            new C2<Object, Exception, IgniteFuture<GridNearLockResponse<K,V>>>() {
-                @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(Object o, Exception exx) {
-                    if (exx != null)
-                        return new GridDhtFinishedFuture<>(ctx.kernalContext(), exx);
-
-                    IgnitePredicate<GridCacheEntry<K, V>>[] filter = filter0;
-
-                    // Set message into thread context.
-                    GridDhtTxLocal<K, V> tx = null;
-
-                    try {
-                        int cnt = keys.size();
-
-                        if (req.inTx()) {
-                            GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
-
-                            if (dhtVer != null)
-                                tx = ctx.tm().tx(dhtVer);
-                        }
-
-                        final List<GridCacheEntryEx<K, V>> entries = new ArrayList<>(cnt);
-
-                        // Unmarshal filter first.
-                        if (filter == null)
-                            filter = req.filter();
-
-                        GridDhtLockFuture<K, V> fut = null;
-
-                        if (!req.inTx()) {
-                            fut = new GridDhtLockFuture<>(ctx,
-                                nearNode.id(),
-                                req.version(),
-                                req.topologyVersion(),
-                                cnt,
-                                req.txRead(),
-                                req.timeout(),
-                                tx,
-                                req.threadId(),
-                                req.accessTtl(),
-                                filter);
-
-                            // Add before mapping.
-                            if (!ctx.mvcc().addFuture(fut))
-                                throw new IllegalStateException("Duplicate future ID: " + fut);
-                        }
-
-                        boolean timedout = false;
-
-                        for (K key : keys) {
-                            if (timedout)
-                                break;
-
-                            while (true) {
-                                // Specify topology version to make sure containment is checked
-                                // based on the requested version, not the latest.
-                                GridDhtCacheEntry<K, V> entry = entryExx(key, req.topologyVersion());
-
-                                try {
-                                    if (fut != null) {
-                                        // This method will add local candidate.
-                                        // Entry cannot become obsolete after this method succeeded.
-                                        fut.addEntry(key == null ? null : entry);
-
-                                        if (fut.isDone()) {
-                                            timedout = true;
-
-                                            break;
-                                        }
-                                    }
-
-                                    entries.add(entry);
-
-                                    break;
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry when adding lock (will retry): " + entry);
-                                }
-                                catch (GridDistributedLockCancelledException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got lock request for cancelled lock (will ignore): " +
-                                            entry);
-
-                                    fut.onError(e);
-
-                                    return new GridDhtFinishedFuture<>(ctx.kernalContext(), e);
-                                }
-                            }
-                        }
-
-                        // Handle implicit locks for pessimistic transactions.
-                        if (req.inTx()) {
-                            if (tx == null) {
-                                tx = new GridDhtTxLocal<>(
-                                    ctx.shared(),
-                                    nearNode.id(),
-                                    req.version(),
-                                    req.futureId(),
-                                    req.miniId(),
-                                    req.threadId(),
-                                    req.implicitTx(),
-                                    req.implicitSingleTx(),
-                                    ctx.system(),
-                                    PESSIMISTIC,
-                                    req.isolation(),
-                                    req.timeout(),
-                                    req.isInvalidate(),
-                                    false,
-                                    req.txSize(),
-                                    req.groupLockKey(),
-                                    req.partitionLock(),
-                                    null,
-                                    req.subjectId(),
-                                    req.taskNameHash());
-
-                                tx.syncCommit(req.syncCommit());
-
-                                tx = ctx.tm().onCreated(tx);
-
-                                if (tx == null || !tx.init()) {
-                                    String msg = "Failed to acquire lock (transaction has been completed): " +
-                                        req.version();
-
-                                    U.warn(log, msg);
-
-                                    if (tx != null)
-                                        tx.rollback();
-
-                                    return new GridDhtFinishedFuture<>(ctx.kernalContext(), new IgniteCheckedException(msg));
-                                }
-
-                                tx.topologyVersion(req.topologyVersion());
-                            }
-
-                            ctx.tm().txContext(tx);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']');
-
-                            assert req.writeEntries() == null || req.writeEntries().size() == entries.size();
-
-                            IgniteFuture<GridCacheReturn<V>> txFut = tx.lockAllAsync(
-                                cacheCtx,
-                                entries,
-                                req.writeEntries(),
-                                req.onePhaseCommit(),
-                                req.drVersions(),
-                                req.messageId(),
-                                req.implicitTx(),
-                                req.txRead(),
-                                req.accessTtl());
-
-                            final GridDhtTxLocal<K, V> t = tx;
-
-                            return new GridDhtEmbeddedFuture<>(
-                                txFut,
-                                new C2<GridCacheReturn<V>, Exception, IgniteFuture<GridNearLockResponse<K, V>>>() {
-                                    @Override public IgniteFuture<GridNearLockResponse<K, V>> apply(
-                                        GridCacheReturn<V> o, Exception e) {
-                                        if (e != null)
-                                            e = U.unwrap(e);
-
-                                        assert !t.empty();
-
-                                        // Create response while holding locks.
-                                        final GridNearLockResponse<K, V> resp = createLockReply(nearNode,
-                                            entries,
-                                            req,
-                                            t,
-                                            t.xidVersion(),
-                                            e);
-
-                                        if (resp.error() == null && t.onePhaseCommit()) {
-                                            assert t.implicit();
-
-                                            return t.commitAsync().chain(
-                                                new C1<IgniteFuture<IgniteTx>, GridNearLockResponse<K, V>>() {
-                                                    @Override public GridNearLockResponse<K, V> apply(IgniteFuture<IgniteTx> f) {
-                                                        try {
-                                                            // Check for error.
-                                                            f.get();
-                                                        }
-                                                        catch (IgniteCheckedException e1) {
-                                                            resp.error(e1);
-                                                        }
-
-                                                        sendLockReply(nearNode, t, req, resp);
-
-                                                        return resp;
-                                                    }
-                                                });
-                                        }
-                                        else {
-                                            sendLockReply(nearNode, t, req, resp);
-
-                                            return new GridFinishedFutureEx<>(resp);
-                                        }
-                                    }
-                                },
-                                ctx.kernalContext());
-                        }
-                        else {
-                            assert fut != null;
-
-                            // This will send remote messages.
-                            fut.map();
-
-                            final GridCacheVersion mappedVer = fut.version();
-
-                            return new GridDhtEmbeddedFuture<>(
-                                ctx.kernalContext(),
-                                fut,
-                                new C2<Boolean, Exception, GridNearLockResponse<K, V>>() {
-                                    @Override public GridNearLockResponse<K, V> apply(Boolean b, Exception e) {
-                                        if (e != null)
-                                            e = U.unwrap(e);
-                                        else if (!b)
-                                            e = new GridCacheLockTimeoutException(req.version());
-
-                                        GridNearLockResponse<K, V> res = createLockReply(nearNode,
-                                            entries,
-                                            req,
-                                            null,
-                                            mappedVer,
-                                            e);
-
-                                        sendLockReply(nearNode, null, req, res);
-
-                                        return res;
-                                    }
-                                });
-                        }
-                    }
-                    catch (IgniteCheckedException e) {
-                        String err = "Failed to unmarshal at least one of the keys for lock request message: " + req;
-
-                        U.error(log, err, e);
-
-                        if (tx != null) {
-                            try {
-                                tx.rollback();
-                            }
-                            catch (IgniteCheckedException ex) {
-                                U.error(log, "Failed to rollback the transaction: " + tx, ex);
-                            }
-                        }
-
-                        return new GridDhtFinishedFuture<>(ctx.kernalContext(),
-                            new IgniteCheckedException(err, e));
-                    }
-                }
-            },
-            ctx.kernalContext());
-    }
-
-    /**
-     * @param nearNode Near node.
-     * @param entries Entries.
-     * @param req Lock request.
-     * @param tx Transaction.
-     * @param mappedVer Mapped version.
-     * @param err Error.
-     * @return Response.
-     */
-    private GridNearLockResponse<K, V> createLockReply(
-        ClusterNode nearNode,
-        List<GridCacheEntryEx<K, V>> entries,
-        GridNearLockRequest<K, V> req,
-        @Nullable GridDhtTxLocalAdapter<K,V> tx,
-        GridCacheVersion mappedVer,
-        Throwable err) {
-        assert mappedVer != null;
-        assert tx == null || tx.xidVersion().equals(mappedVer);
-
-        try {
-            // Send reply back to originating near node.
-            GridNearLockResponse<K, V> res = new GridNearLockResponse<>(ctx.cacheId(),
-                req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err);
-
-            if (err == null) {
-                res.pending(localDhtPendingVersions(entries, mappedVer));
-
-                // We have to add completed versions for cases when nearLocal and remote transactions
-                // execute concurrently.
-                res.completedVersions(ctx.tm().committedVersions(req.version()),
-                    ctx.tm().rolledbackVersions(req.version()));
-
-                int i = 0;
-
-                for (ListIterator<GridCacheEntryEx<K, V>> it = entries.listIterator(); it.hasNext();) {
-                    GridCacheEntryEx<K, V> e = it.next();
-
-                    assert e != null;
-
-                    while (true) {
-                        try {
-                            // Don't return anything for invalid partitions.
-                            if (tx == null || !tx.isRollbackOnly()) {
-                                GridCacheVersion dhtVer = req.dhtVersion(i);
-
-                                try {
-                                    GridCacheVersion ver = e.version();
-
-                                    boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver);
-
-                                    V val = null;
-
-                                    if (ret)
-                                        val = e.innerGet(tx,
-                                            /*swap*/true,
-                                            /*read-through*/ctx.loadPreviousValue(),
-                                            /*fail-fast.*/false,
-                                            /*unmarshal*/false,
-                                            /*update-metrics*/true,
-                                            /*event notification*/req.returnValue(i),
-                                            /*temporary*/false,
-                                            CU.subjectId(tx, ctx.shared()),
-                                            null,
-                                            tx != null ? tx.resolveTaskName() : null,
-                                            CU.<K, V>empty(),
-                                            null);
-
-                                    assert e.lockedBy(mappedVer) ||
-                                        (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) :
-                                        "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() +
-                                            ", entry=" + e +
-                                            ", mappedVer=" + mappedVer + ", ver=" + ver +
-                                            ", tx=" + tx + ", req=" + req +
-                                            ", err=" + err + ']';
-
-                                    boolean filterPassed = false;
-
-                                    if (tx != null && tx.onePhaseCommit()) {
-                                        IgniteTxEntry<K, V> writeEntry = tx.entry(ctx.txKey(e.key()));
-
-                                        assert writeEntry != null :
-                                            "Missing tx entry for locked cache entry: " + e;
-
-                                        filterPassed = writeEntry.filtersPassed();
-                                    }
-
-                                    GridCacheValueBytes valBytes = ret ? e.valueBytes(null) : GridCacheValueBytes.nil();
-
-                                    // We include values into response since they are required for local
-                                    // calls and won't be serialized. We are also including DHT version.
-                                    res.addValueBytes(
-                                        val != null ? val : (V)valBytes.getIfPlain(),
-                                        ret ? valBytes.getIfMarshaled() : null,
-                                        filterPassed,
-                                        ver,
-                                        mappedVer,
-                                        ctx);
-                                }
-                                catch (GridCacheFilterFailedException ex) {
-                                    assert false : "Filter should never fail if fail-fast is false.";
-
-                                    ex.printStackTrace();
-                                }
-                            }
-                            else {
-                                // We include values into response since they are required for local
-                                // calls and won't be serialized. We are also including DHT version.
-                                res.addValueBytes(null, null, false, e.version(), mappedVer, ctx);
-                            }
-
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry when sending reply to DHT lock request " +
-                                    "(will retry): " + e);
-
-                            e = entryExx(e.key());
-
-                            it.set(e);
-                        }
-                    }
-
-                    i++;
-                }
-            }
-
-            return res;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to get value for lock reply message for node [node=" +
-                U.toShortString(nearNode) + ", req=" + req + ']', e);
-
-            return new GridNearLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false,
-                entries.size(), e);
-        }
-    }
-
-    /**
-     * Send lock reply back to near node.
-     *
-     * @param nearNode Near node.
-     * @param tx Transaction.
-     * @param req Lock request.
-     * @param res Lock response.
-     */
-    private void sendLockReply(
-        ClusterNode nearNode,
-        @Nullable IgniteTxEx<K,V> tx,
-        GridNearLockRequest<K, V> req,
-        GridNearLockResponse<K, V> res
-    ) {
-        Throwable err = res.error();
-
-        // Log error before sending reply.
-        if (err != null && !(err instanceof GridCacheLockTimeoutException))
-            U.error(log, "Failed to acquire lock for request: " + req, err);
-
-        try {
-            // Don't send reply message to this node or if lock was cancelled.
-            if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class))
-                ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" +
-                U.toShortString(nearNode) + ", req=" + req + ']', e);
-
-            if (tx != null)
-                tx.rollbackAsync();
-
-            // Convert to closure exception as this method is only called form closures.
-            throw new GridClosureException(e);
-        }
-    }
-
-    /**
-     * Collects versions of pending candidates versions less then base.
-     *
-     * @param entries Tx entries to process.
-     * @param baseVer Base version.
-     * @return Collection of pending candidates versions.
-     */
-    private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<GridCacheEntryEx<K, V>> entries,
-        GridCacheVersion baseVer) {
-        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
-
-        for (GridCacheEntryEx<K, V> entry : entries) {
-            // Since entries were collected before locks are added, some of them may become obsolete.
-            while (true) {
-                try {
-                    for (GridCacheMvccCandidate cand : entry.localCandidates()) {
-                        if (cand.version().isLess(baseVer))
-                            lessPending.add(cand.version());
-                    }
-
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Got removed entry is localDhtPendingVersions (will retry): " + entry);
-
-                    entry = entryExx(entry.key());
-                }
-            }
-        }
-
-        return lessPending;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    private void clearLocks(UUID nodeId, GridDistributedUnlockRequest<K, V> req) {
-        assert nodeId != null;
-
-        List<K> keys = req.keys();
-
-        if (keys != null) {
-            for (K key : keys) {
-                while (true) {
-                    GridDistributedCacheEntry<K, V> entry = peekExx(key);
-
-                    boolean created = false;
-
-                    if (entry == null) {
-                        entry = entryExx(key);
-
-                        created = true;
-                    }
-
-                    try {
-                        entry.doneRemote(
-                            req.version(),
-                            req.version(),
-                            null,
-                            null,
-                            null,
-                            /*system invalidate*/false);
-
-                        // Note that we don't reorder completed versions here,
-                        // as there is no point to reorder relative to the version
-                        // we are about to remove.
-                        if (entry.removeLock(req.version())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Removed lock [lockId=" + req.version() + ", key=" + key + ']');
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Received unlock request for unknown candidate " +
-                                    "(added to cancelled locks set): " + req);
-                        }
-
-                        if (created && entry.markObsolete(req.version()))
-                            removeEntry(entry);
-
-                        ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        if (log.isDebugEnabled())
-                            log.debug("Received remove lock request for removed entry (will retry) [entry=" +
-                                entry + ", req=" + req + ']');
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Request.
-     */
-    @SuppressWarnings({"RedundantTypeArguments", "TypeMayBeWeakened"})
-    private void processNearUnlockRequest(UUID nodeId, GridNearUnlockRequest<K, V> req) {
-        assert isAffinityNode(cacheCfg);
-        assert nodeId != null;
-
-        removeLocks(nodeId, req.version(), req.keys(), true);
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param topVer Topology version.
-     * @param cached Entry.
-     * @param readers Readers for this entry.
-     * @param dhtMap DHT map.
-     * @param nearMap Near map.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void map(UUID nodeId,
-        long topVer,
-        GridCacheEntryEx<K,V> cached,
-        Collection<UUID> readers,
-        Map<ClusterNode, List<T2<K, byte[]>>> dhtMap,
-        Map<ClusterNode, List<T2<K, byte[]>>> nearMap)
-        throws IgniteCheckedException {
-        Collection<ClusterNode> dhtNodes = ctx.dht().topology().nodes(cached.partition(), topVer);
-
-        ClusterNode primary = F.first(dhtNodes);
-
-        assert primary != null;
-
-        if (!primary.id().equals(ctx.nodeId())) {
-            if (log.isDebugEnabled())
-                log.debug("Primary node mismatch for unlock [entry=" + cached + ", expected=" + ctx.nodeId() +
-                    ", actual=" + U.toShortString(primary) + ']');
-
-            return;
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + ", entry=" + cached + ']');
-
-        Collection<ClusterNode> nearNodes = null;
-
-        if (!F.isEmpty(readers)) {
-            nearNodes = ctx.discovery().nodes(readers, F0.not(F.idForNodeId(nodeId)));
-
-            if (log.isDebugEnabled())
-                log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + ", entry=" + cached +
-                    ']');
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Entry has no near readers: " + cached);
-        }
-
-        map(cached, F.view(dhtNodes, F.remoteNodes(ctx.nodeId())), dhtMap); // Exclude local node.
-        map(cached, nearNodes, nearMap);
-    }
-
-    /**
-     * @param entry Entry.
-     * @param nodes Nodes.
-     * @param map Map.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings( {"MismatchedQueryAndUpdateOfCollection"})
-    private void map(GridCacheEntryEx<K, V> entry,
-        @Nullable Iterable<? extends ClusterNode> nodes,
-        Map<ClusterNode, List<T2<K, byte[]>>> map) throws IgniteCheckedException {
-        if (nodes != null) {
-            for (ClusterNode n : nodes) {
-                List<T2<K, byte[]>> keys = map.get(n);
-
-                if (keys == null)
-                    map.put(n, keys = new LinkedList<>());
-
-                keys.add(new T2<>(entry.key(), entry.getOrMarshalKeyBytes()));
-            }
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param ver Version.
-     * @param keys Keys.
-     * @param unmap Flag for un-mapping version.
-     */
-    public void removeLocks(UUID nodeId, GridCacheVersion ver, Iterable<? extends K> keys, boolean unmap) {
-        assert nodeId != null;
-        assert ver != null;
-
-        if (F.isEmpty(keys))
-            return;
-
-        // Remove mapped versions.
-        GridCacheVersion dhtVer = unmap ? ctx.mvcc().unmapVersion(ver) : ver;
-
-        Map<ClusterNode, List<T2<K, byte[]>>> dhtMap = new HashMap<>();
-        Map<ClusterNode, List<T2<K, byte[]>>> nearMap = new HashMap<>();
-
-        GridCacheVersion obsoleteVer = null;
-
-        for (K key : keys) {
-            while (true) {
-                boolean created = false;
-
-                GridDhtCacheEntry<K, V> entry = peekExx(key);
-
-                if (entry == null) {
-                    entry = entryExx(key);
-
-                    created = true;
-                }
-
-                try {
-                    GridCacheMvccCandidate<K> cand = null;
-
-                    if (dhtVer == null) {
-                        cand = entry.localCandidateByNearVersion(ver, true);
-
-                        if (cand != null)
-                            dhtVer = cand.version();
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to locate lock candidate based on dht or near versions [nodeId=" +
-                                    nodeId + ", ver=" + ver + ", unmap=" + unmap + ", keys=" + keys + ']');
-
-                            entry.removeLock(ver);
-
-                            if (created) {
-                                if (obsoleteVer == null)
-                                    obsoleteVer = ctx.versions().next();
-
-                                if (entry.markObsolete(obsoleteVer))
-                                    removeEntry(entry);
-                            }
-
-                            break;
-                        }
-                    }
-
-                    if (cand == null)
-                        cand = entry.candidate(dhtVer);
-
-                    long topVer = cand == null ? -1 : cand.topologyVersion();
-
-                    // Note that we obtain readers before lock is removed.
-                    // Even in case if entry would be removed just after lock is removed,
-                    // we must send release messages to backups and readers.
-                    Collection<UUID> readers = entry.readers();
-
-                    // Note that we don't reorder completed versions here,
-                    // as there is no point to reorder relative to the version
-                    // we are about to remove.
-                    if (entry.removeLock(dhtVer)) {
-                        // Map to backups and near readers.
-                        map(nodeId, topVer, entry, readers, dhtMap, nearMap);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Removed lock [lockId=" + ver + ", key=" + key + ']');
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Received unlock request for unknown candidate " +
-                            "(added to cancelled locks set) [ver=" + ver + ", entry=" + entry + ']');
-
-                    if (created && entry.markObsolete(dhtVer))
-                        removeEntry(entry);
-
-                    ctx.evicts().touch(entry, topVer);
-
-                    break;
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received remove lock request for removed entry (will retry): " + entry);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to remove locks for keys: " + keys, e);
-                }
-            }
-        }
-
-        Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
-        Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
-
-        // Backups.
-        for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : dhtMap.entrySet()) {
-            ClusterNode n = entry.getKey();
-
-            List<T2<K, byte[]>> keyBytes = entry.getValue();
-
-            GridDhtUnlockRequest<K, V> req = new GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size());
-
-            req.version(dhtVer);
-
-            try {
-                for (T2<K, byte[]> key : keyBytes)
-                    req.addKey(key.get1(), key.get2(), ctx);
-
-                keyBytes = nearMap.get(n);
-
-                if (keyBytes != null)
-                    for (T2<K, byte[]> key : keyBytes)
-                        req.addNearKey(key.get1(), key.get2(), ctx.shared());
-
-                req.completedVersions(committed, rolledback);
-
-                ctx.io().send(n, req);
-            }
-            catch (ClusterTopologyException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Node left while sending unlock request: " + n);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send unlock request to node (will make best effort to complete): " + n, e);
-            }
-        }
-
-        // Readers.
-        for (Map.Entry<ClusterNode, List<T2<K, byte[]>>> entry : nearMap.entrySet()) {
-            ClusterNode n = entry.getKey();
-
-            if (!dhtMap.containsKey(n)) {
-                List<T2<K, byte[]>> keyBytes = entry.getValue();
-
-                GridDhtUnlockRequest<K, V> req = new GridDhtUnlockRequest<>(ctx.cacheId(), keyBytes.size());
-
-                req.version(dhtVer);
-
-                try {
-                    for (T2<K, byte[]> key : keyBytes)
-                        req.addNearKey(key.get1(), key.get2(), ctx.shared());
-
-                    req.completedVersions(committed, rolledback);
-
-                    ctx.io().send(n, req);
-                }
-                catch (ClusterTopologyException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Node left while sending unlock request: " + n);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send unlock request to node (will make best effort to complete): " + n, e);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param key Key
-     * @param ver Version.
-     * @throws IgniteCheckedException If invalidate failed.
-     */
-    private void invalidateNearEntry(K key, GridCacheVersion ver) throws IgniteCheckedException {
-        GridCacheEntryEx<K, V> nearEntry = near().peekEx(key);
-
-        if (nearEntry != null)
-            nearEntry.invalidate(null, ver);
-    }
-
-    /**
-     * @param key Key
-     * @param ver Version.
-     */
-    private void obsoleteNearEntry(K key, GridCacheVersion ver) {
-        GridCacheEntryEx<K, V> nearEntry = near().peekEx(key);
-
-        if (nearEntry != null)
-            nearEntry.markObsolete(ver);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
deleted file mode 100644
index cb3ceb8..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteTx>
-    implements GridCacheFuture<IgniteTx> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Context. */
-    private GridCacheSharedContext<K, V> cctx;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Transaction. */
-    @GridToStringExclude
-    private GridDhtTxLocalAdapter<K, V> tx;
-
-    /** Commit flag. */
-    private boolean commit;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Error. */
-    @GridToStringExclude
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
-    /** DHT mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> dhtMap;
-
-    /** Near mappings. */
-    private Map<UUID, GridDistributedTxMapping<K, V>> nearMap;
-
-    /** Trackable flag. */
-    private boolean trackable = true;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridDhtTxFinishFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Context.
-     * @param tx Transaction.
-     * @param commit Commit flag.
-     */
-    public GridDhtTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridDhtTxLocalAdapter<K, V> tx, boolean commit) {
-        super(cctx.kernalContext(), F.<IgniteTx>identityReducer(tx));
-
-        assert cctx != null;
-
-        this.cctx = cctx;
-        this.tx = tx;
-        this.commit = commit;
-
-        dhtMap = tx.dhtMap();
-        nearMap = tx.nearMap();
-
-        futId = IgniteUuid.randomUuid();
-
-        log = U.logger(ctx, logRef, GridDhtTxFinishFuture.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return tx.xidVersion();
-    }
-
-    /**
-     * @return Involved nodes.
-     */
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteFuture<?> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyException("Remote node left grid (will retry): " + nodeId));
-
-                    return true;
-                }
-            }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        trackable = false;
-    }
-
-    /**
-     * @param e Error.
-     */
-    public void onError(Throwable e) {
-        if (err.compareAndSet(null, e)) {
-            boolean marked = tx.setRollbackOnly();
-
-            if (e instanceof IgniteTxRollbackException) {
-                if (marked) {
-                    try {
-                        tx.rollback();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
-                    }
-                }
-            }
-            else if (tx.isSystemInvalidate()) { // Invalidate remote transactions on heuristic error.
-                finish();
-
-                try {
-                    get();
-                }
-                catch (IgniteTxHeuristicException ignore) {
-                    // Future should complete with GridCacheTxHeuristicException.
-                }
-                catch (IgniteCheckedException err) {
-                    U.error(log, "Failed to invalidate transaction: " + tx, err);
-                }
-            }
-
-            onComplete();
-        }
-    }
-
-    /**
-     * @param nodeId Sender.
-     * @param res Result.
-     */
-    public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
-        if (!isDone()) {
-            for (IgniteFuture<IgniteTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
-
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
-
-                        f.onResult(res);
-                    }
-                }
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(IgniteTx tx, Throwable err) {
-        if (initialized() || err != null) {
-            if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING))
-                this.tx.tmCommit();
-
-            Throwable e = this.err.get();
-
-            if (super.onDone(tx, e != null ? e : err)) {
-                // Always send finish reply.
-                this.tx.sendFinishReply(commit, error());
-
-                // Don't forget to clean up.
-                cctx.mvcc().removeFuture(this);
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
-     * Completeness callback.
-     */
-    private void onComplete() {
-        onDone(tx, err.get());
-    }
-
-    /**
-     * Completes this future.
-     */
-    void complete() {
-        onComplete();
-    }
-
-    /**
-     * Initializes future.
-     */
-    public void finish() {
-        if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) {
-            boolean sync = finish(dhtMap, nearMap);
-
-            markInitialized();
-
-            if (!sync)
-                onComplete();
-        }
-        else {
-            markInitialized();
-
-            // No backup or near nodes to send commit message to (just complete then).
-            onComplete();
-        }
-    }
-
-    /**
-     * @param dhtMap DHT map.
-     * @param nearMap Near map.
-     * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for.
-     */
-    private boolean finish(Map<UUID, GridDistributedTxMapping<K, V>> dhtMap,
-        Map<UUID, GridDistributedTxMapping<K, V>> nearMap) {
-        boolean res = false;
-
-        boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
-
-        // Create mini futures.
-        for (GridDistributedTxMapping<K, V> dhtMapping : dhtMap.values()) {
-            ClusterNode n = dhtMapping.node();
-
-            assert !n.isLocal();
-
-            GridDistributedTxMapping<K, V> nearMapping = nearMap.get(n.id());
-
-            if (dhtMapping.empty() && nearMapping != null && nearMapping.empty())
-                // Nothing to send.
-                continue;
-
-            MiniFuture fut = new MiniFuture(dhtMapping, nearMapping);
-
-            add(fut); // Append new future.
-
-            GridDhtTxFinishRequest<K, V> req = new GridDhtTxFinishRequest<>(
-                tx.nearNodeId(),
-                futId,
-                fut.futureId(),
-                tx.topologyVersion(),
-                tx.xidVersion(),
-                tx.commitVersion(),
-                tx.threadId(),
-                tx.isolation(),
-                commit,
-                tx.isInvalidate(),
-                tx.system(),
-                tx.isSystemInvalidate(),
-                tx.syncCommit(),
-                tx.syncRollback(),
-                tx.completedBase(),
-                tx.committedVersions(),
-                tx.rolledbackVersions(),
-                tx.pendingVersions(),
-                tx.size(),
-                tx.pessimistic() ? dhtMapping.writes() : null,
-                tx.pessimistic() && nearMapping != null ? nearMapping.writes() : null,
-                tx.recoveryWrites(),
-                tx.onePhaseCommit(),
-                tx.groupLockKey(),
-                tx.subjectId(),
-                tx.taskNameHash());
-
-            if (!tx.pessimistic()) {
-                int idx = 0;
-
-                for (IgniteTxEntry<K, V> e : dhtMapping.writes())
-                    req.ttl(idx++, e.ttl());
-
-                if (nearMapping != null) {
-                    idx = 0;
-
-                    for (IgniteTxEntry<K, V> e : nearMapping.writes())
-                        req.nearTtl(idx++, e.ttl());
-                }
-            }
-
-            if (tx.onePhaseCommit())
-                req.writeVersion(tx.writeVersion());
-
-            try {
-                cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
-
-                if (sync)
-                    res = true;
-                else
-                    fut.onDone();
-            }
-            catch (IgniteCheckedException e) {
-                // Fail the whole thing.
-                if (e instanceof ClusterTopologyException)
-                    fut.onResult((ClusterTopologyException)e);
-                else
-                    fut.onResult(e);
-            }
-        }
-
-        for (GridDistributedTxMapping<K, V> nearMapping : nearMap.values()) {
-            if (!dhtMap.containsKey(nearMapping.node().id())) {
-                if (nearMapping.empty())
-                    // Nothing to send.
-                    continue;
-
-                MiniFuture fut = new MiniFuture(null, nearMapping);
-
-                add(fut); // Append new future.
-
-                GridDhtTxFinishRequest<K, V> req = new GridDhtTxFinishRequest<>(
-                    tx.nearNodeId(),
-                    futId,
-                    fut.futureId(),
-                    tx.topologyVersion(),
-                    tx.xidVersion(),
-                    tx.commitVersion(),
-                    tx.threadId(),
-                    tx.isolation(),
-                    commit,
-                    tx.isInvalidate(),
-                    tx.system(),
-                    tx.isSystemInvalidate(),
-                    tx.syncCommit(),
-                    tx.syncRollback(),
-                    tx.completedBase(),
-                    tx.committedVersions(),
-                    tx.rolledbackVersions(),
-                    tx.pendingVersions(),
-                    tx.size(),
-                    null,
-                    tx.pessimistic() ? nearMapping.writes() : null,
-                    tx.recoveryWrites(),
-                    tx.onePhaseCommit(),
-                    tx.groupLockKey(),
-                    tx.subjectId(),
-                    tx.taskNameHash());
-
-                if (!tx.pessimistic()) {
-                    int idx = 0;
-
-                    for (IgniteTxEntry<K, V> e : nearMapping.writes())
-                        req.nearTtl(idx++, e.ttl());
-                }
-
-                if (tx.onePhaseCommit())
-                    req.writeVersion(tx.writeVersion());
-
-                try {
-                    cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
-
-                    if (sync)
-                        res = true;
-                    else
-                        fut.onDone();
-                }
-                catch (IgniteCheckedException e) {
-                    // Fail the whole thing.
-                    if (e instanceof ClusterTopologyException)
-                        fut.onResult((ClusterTopologyException)e);
-                    else
-                        fut.onResult(e);
-                }
-            }
-        }
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtTxFinishFuture.class, this, super.toString());
-    }
-
-    /**
-     * Mini-future for get operations. Mini-futures are only waiting on a single
-     * node as opposed to multiple nodes.
-     */
-    private class MiniFuture extends GridFutureAdapter<IgniteTx> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** DHT mapping. */
-        @GridToStringInclude
-        private GridDistributedTxMapping<K, V> dhtMapping;
-
-        /** Near mapping. */
-        @GridToStringInclude
-        private GridDistributedTxMapping<K, V> nearMapping;
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public MiniFuture() {
-            // No-op.
-        }
-
-        /**
-         * @param dhtMapping Mapping.
-         * @param nearMapping nearMapping.
-         */
-        MiniFuture(GridDistributedTxMapping<K, V> dhtMapping, GridDistributedTxMapping<K, V> nearMapping) {
-            super(cctx.kernalContext());
-
-            assert dhtMapping == null || nearMapping == null || dhtMapping.node() == nearMapping.node();
-
-            this.dhtMapping = dhtMapping;
-            this.nearMapping = nearMapping;
-        }
-
-        /**
-         * @return Future ID.
-         */
-        IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public ClusterNode node() {
-            return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
-        }
-
-        /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
-        }
-
-        /**
-         * @param e Node failure.
-         */
-        void onResult(ClusterTopologyException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this);
-
-            // If node left, then there is nothing to commit on it.
-            onDone(tx);
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        void onResult(GridDhtTxFinishResponse<K, V> res) {
-            if (log.isDebugEnabled())
-                log.debug("Transaction synchronously completed on node [node=" + node() + ", res=" + res + ']');
-
-            onDone();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
-        }
-    }
-}


Mime
View raw message