ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [34/46] incubator-ignite git commit: GG-9141 - Renaming.
Date Sun, 21 Dec 2014 23:04:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
new file mode 100644
index 0000000..4fc7140
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -0,0 +1,3179 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.portables.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.grid.kernal.processors.cache.dr.*;
+import org.gridgain.grid.kernal.processors.dr.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
+
+/**
+ * Transaction adapter for cache transactions.
+ */
+public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
+    implements IgniteTxLocalEx<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Per-transaction read map. */
+    @GridToStringExclude
+    protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> txMap;
+
+    /** Read view on transaction map. */
+    @GridToStringExclude
+    protected IgniteTxMap<K, V> readView;
+
+    /** Write view on transaction map. */
+    @GridToStringExclude
+    protected IgniteTxMap<K, V> writeView;
+
+    /** Minimal version encountered (either explicit lock or XID of this transaction). */
+    protected GridCacheVersion minVer;
+
+    /** Flag indicating with TM commit happened. */
+    protected AtomicBoolean doneFlag = new AtomicBoolean(false);
+
+    /** Committed versions, relative to base. */
+    private Collection<GridCacheVersion> committedVers = Collections.emptyList();
+
+    /** Rolled back versions, relative to base. */
+    private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
+
+    /** Base for completed versions. */
+    private GridCacheVersion completedBase;
+
+    /** Flag indicating partition lock in group lock transaction. */
+    private boolean partLock;
+
+    /** Flag indicating that transformed values should be sent to remote nodes. */
+    private boolean sndTransformedVals;
+
+    /** Commit error. */
+    protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
+
+    /** Active cache IDs. */
+    protected Set<Integer> activeCacheIds = new HashSet<>();
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected IgniteTxLocalAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param cctx Cache registry.
+     * @param xidVer Transaction ID.
+     * @param implicit {@code True} if transaction was implicitly started by the system,
+     *      {@code false} if it was started explicitly by user.
+     * @param implicitSingle {@code True} if transaction is implicit with only one key.
+     * @param sys System flag.
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @param timeout Timeout.
+     * @param txSize Expected transaction size.
+     * @param grpLockKey Group lock key if this is a group-lock transaction.
+     * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
+     */
+    protected IgniteTxLocalAdapter(
+        GridCacheSharedContext<K, V> cctx,
+        GridCacheVersion xidVer,
+        boolean implicit,
+        boolean implicitSingle,
+        boolean sys,
+        IgniteTxConcurrency concurrency,
+        IgniteTxIsolation isolation,
+        long timeout,
+        boolean invalidate,
+        boolean storeEnabled,
+        int txSize,
+        @Nullable IgniteTxKey grpLockKey,
+        boolean partLock,
+        @Nullable UUID subjId,
+        int taskNameHash
+    ) {
+        super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
+            storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
+
+        assert !partLock || grpLockKey != null;
+
+        this.partLock = partLock;
+
+        minVer = xidVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID eventNodeId() {
+        return cctx.localNodeId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID originatingNodeId() {
+        return cctx.localNodeId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean empty() {
+        return txMap.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<UUID> masterNodeIds() {
+        return Collections.singleton(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean partitionLock() {
+        return partLock;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Throwable commitError() {
+        return commitErr.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commitError(Throwable e) {
+        commitErr.compareAndSet(null, e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
+        assert false;
+        return false;
+    }
+
+    /**
+     * Gets collection of active cache IDs for this transaction.
+     *
+     * @return Collection of active cache IDs.
+     */
+    @Override public Collection<Integer> activeCacheIds() {
+        return activeCacheIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStarted() {
+        return txMap != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasWriteKey(IgniteTxKey<K> key) {
+        return writeView.containsKey(key);
+    }
+
+    /**
+     * @return Transaction read set.
+     */
+    @Override public Set<IgniteTxKey<K>> readSet() {
+        return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : readView.keySet();
+    }
+
+    /**
+     * @return Transaction write set.
+     */
+    @Override public Set<IgniteTxKey<K>> writeSet() {
+        return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : writeView.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed(IgniteTxKey<K> key) {
+        if (txMap == null)
+            return false;
+
+        IgniteTxEntry<K, V> e = txMap.get(key);
+
+        return e != null && e.op() == DELETE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() {
+        return readView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : readView;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() {
+        return writeView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : writeView;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> allEntries() {
+        return txMap == null ? Collections.<IgniteTxEntry<K, V>>emptySet() : txMap.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> readEntries() {
+        return readView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : readView.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgniteTxEntry<K, V>> writeEntries() {
+        return writeView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : writeView.values();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) {
+        return txMap == null ? null : txMap.get(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void seal() {
+        if (readView != null)
+            readView.seal();
+
+        if (writeView != null)
+            writeView.seal();
+    }
+
+    /**
+     * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
+     * to remote nodes.
+     */
+    public void sendTransformedValues(boolean snd) {
+        sndTransformedVals = snd;
+    }
+
+    /**
+     * @return {@code True} if should be committed after lock is acquired.
+     */
+    protected boolean commitAfterLock() {
+        return implicit() && (!dht() || colocated());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    @Nullable @Override public GridTuple<V> peek(
+        GridCacheContext<K, V> cacheCtx,
+        boolean failFast,
+        K key,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter
+    ) throws GridCacheFilterFailedException {
+        IgniteTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
+
+        if (e != null) {
+            // We should look at tx entry previous value. If this is a user peek then previous
+            // value is the same as value. If this is a filter evaluation peek then previous value holds
+            // value visible to filter while value contains value enlisted for write.
+            if (!F.isAll(e.cached().wrap(false), filter))
+                return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null;
+
+            return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> loadMissing(
+        final GridCacheContext<K, V> cacheCtx,
+        boolean async,
+        final Collection<? extends K> keys,
+        boolean deserializePortable,
+        final IgniteBiInClosure<K, V> c
+    ) {
+        if (!async) {
+            try {
+                return new GridFinishedFuture<>(cctx.kernalContext(),
+                    cacheCtx.store().loadAllFromStore(this, keys, c));
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(cctx.kernalContext(), e);
+            }
+        }
+        else
+            return cctx.kernalContext().closure().callLocalSafe(
+                new GPC<Boolean>() {
+                    @Override public Boolean call() throws Exception {
+                        return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c);
+                    }
+                },
+                true);
+    }
+
+    /**
+     * Gets minimum version present in transaction.
+     *
+     * @return Minimum versions.
+     */
+    @Override public GridCacheVersion minVersion() {
+        return minVer;
+    }
+
+    /**
+     * @throws IgniteCheckedException If prepare step failed.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    public void userPrepare() throws IgniteCheckedException {
+        if (state() != PREPARING) {
+            if (timedOut())
+                throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+
+            IgniteTxState state = state();
+
+            setRollbackOnly();
+
+            throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
+        }
+
+        checkValid();
+
+        try {
+            cctx.tm().prepareTx(this);
+        }
+        catch (IgniteCheckedException e) {
+            throw e;
+        }
+        catch (Throwable e) {
+            setRollbackOnly();
+
+            throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() throws IgniteCheckedException {
+        try {
+            commitAsync().get();
+        }
+        finally {
+            cctx.tm().txContextReset();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepare() throws IgniteCheckedException {
+        prepareAsync().get();
+    }
+
+    /**
+     * Checks that locks are in proper state for commit.
+     *
+     * @param entry Cache entry to check.
+     */
+    private void checkCommitLocks(GridCacheEntryEx<K, V> entry) {
+        assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
+            ", tx=" + this + ']';
+    }
+
+    /**
+     * Uncommits transaction by invalidating all of its entries.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    private void uncommit() {
+        for (IgniteTxEntry<K, V> e : writeMap().values()) {
+            try {
+                GridCacheEntryEx<K, V> cacheEntry = e.cached();
+
+                if (e.op() != NOOP)
+                    cacheEntry.invalidate(null, xidVer);
+            }
+            catch (Throwable t) {
+                U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
+
+                break;
+            }
+        }
+
+        cctx.tm().uncommitTx(this);
+    }
+
+    /**
+     * Gets cache entry for given key.
+     *
+     * @param key Key.
+     * @return Cache entry.
+     */
+    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key) {
+        return cacheCtx.cache().entryEx(key.key());
+    }
+
+    /**
+     * Gets cache entry for given key and topology version.
+     *
+     * @param key Key.
+     * @param topVer Topology version.
+     * @return Cache entry.
+     */
+    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) {
+        return cacheCtx.cache().entryEx(key.key(), topVer);
+    }
+
+    /**
+     * Performs batch database operations. This commit must be called
+     * before {@link #userCommit()}. This way if there is a DB failure,
+     * cache transaction can still be rolled back.
+     *
+     * @param writeEntries Transaction write set.
+     * @throws IgniteCheckedException If batch update failed.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
+        GridCacheStoreManager<K, V> store = store();
+
+        if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
+            try {
+                if (writeEntries != null) {
+                    Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
+                    List<K> rmvCol = null;
+
+                    boolean skipNear = near() && store.writeToStoreFromDht();
+
+                    for (IgniteTxEntry<K, V> e : writeEntries) {
+                        if (skipNear && e.cached().isNear())
+                            continue;
+
+                        boolean intercept = e.context().config().getInterceptor() != null;
+
+                        if (intercept || !F.isEmpty(e.transformClosures()))
+                            e.cached().unswap(true, false);
+
+                        GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
+
+                        GridCacheContext<K, V> cacheCtx = e.context();
+
+                        GridCacheOperation op = res.get1();
+                        K key = e.key();
+                        V val = res.get2();
+                        GridCacheVersion ver = writeVersion();
+
+                        if (op == CREATE || op == UPDATE) {
+                            // Batch-process all removes if needed.
+                            if (rmvCol != null && !rmvCol.isEmpty()) {
+                                store.removeAllFromStore(this, rmvCol);
+
+                                // Reset.
+                                rmvCol.clear();
+                            }
+
+                            if (intercept) {
+                                V old = e.cached().rawGetOrUnmarshal(true);
+
+                                val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
+
+                                if (val == null)
+                                    continue;
+
+                                val = cacheCtx.unwrapTemporary(val);
+                            }
+
+                            if (putMap == null)
+                                putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+
+                            putMap.put(key, F.t(val, ver));
+                        }
+                        else if (op == DELETE) {
+                            // Batch-process all puts if needed.
+                            if (putMap != null && !putMap.isEmpty()) {
+                                store.putAllToStore(this, putMap);
+
+                                // Reset.
+                                putMap.clear();
+                            }
+
+                            if (intercept) {
+                                V old = e.cached().rawGetOrUnmarshal(true);
+
+                                IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
+                                    .onBeforeRemove(key, old);
+
+                                if (cacheCtx.cancelRemove(t))
+                                    continue;
+                            }
+
+                            if (rmvCol == null)
+                                rmvCol = new LinkedList<>();
+
+                            rmvCol.add(key);
+                        }
+                        else if (log.isDebugEnabled())
+                            log.debug("Ignoring NOOP entry for batch store commit: " + e);
+                    }
+
+                    if (putMap != null && !putMap.isEmpty()) {
+                        assert rmvCol == null || rmvCol.isEmpty();
+
+                        // Batch put at the end of transaction.
+                        store.putAllToStore(this, putMap);
+                    }
+
+                    if (rmvCol != null && !rmvCol.isEmpty()) {
+                        assert putMap == null || putMap.isEmpty();
+
+                        // Batch remove at the end of transaction.
+                        store.removeAllFromStore(this, rmvCol);
+                    }
+                }
+
+                // Commit while locks are held.
+                store.txEnd(this, true);
+            }
+            catch (IgniteCheckedException ex) {
+                commitError(ex);
+
+                setRollbackOnly();
+
+                // Safe to remove transaction from committed tx list because nothing was committed yet.
+                cctx.tm().removeCommittedTx(this);
+
+                throw ex;
+            }
+            catch (Throwable ex) {
+                commitError(ex);
+
+                setRollbackOnly();
+
+                // Safe to remove transaction from committed tx list because nothing was committed yet.
+                cctx.tm().removeCommittedTx(this);
+
+                throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CatchGenericClass"})
+    @Override public void userCommit() throws IgniteCheckedException {
+        IgniteTxState state = state();
+
+        if (state != COMMITTING) {
+            if (timedOut())
+                throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+
+            setRollbackOnly();
+
+            throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
+        }
+
+        checkValid();
+
+        boolean empty = F.isEmpty(near() ? txMap : writeMap());
+
+        // Register this transaction as completed prior to write-phase to
+        // ensure proper lock ordering for removed entries.
+        // We add colocated transaction to committed set even if it is empty to correctly order
+        // locks on backup nodes.
+        if (!empty || colocated())
+            cctx.tm().addCommittedTx(this);
+
+        if (groupLock())
+            addGroupTxMapping(writeSet());
+
+        if (!empty) {
+            // We are holding transaction-level locks for entries here, so we can get next write version.
+            writeVersion(cctx.versions().next(topologyVersion()));
+
+            batchStoreCommit(writeMap().values());
+
+            try {
+                cctx.tm().txContext(this);
+
+                long topVer = topologyVersion();
+
+                /*
+                 * Commit to cache. Note that for 'near' transaction we loop through all the entries.
+                 */
+                for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {
+                    GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+                    GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
+
+                    UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
+
+                    try {
+                        while (true) {
+                            try {
+                                GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+                                // Must try to evict near entries before committing from
+                                // transaction manager to make sure locks are held.
+                                if (!evictNearEntry(txEntry, false)) {
+                                    if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
+                                        cached.markObsolete(xidVer);
+
+                                        break;
+                                    }
+
+                                    if (cached.detached())
+                                        break;
+
+                                    GridCacheEntryEx<K, V> nearCached = null;
+
+                                    boolean metrics = true;
+
+                                    if (updateNearCache(cacheCtx, txEntry.key(), topVer))
+                                        nearCached = cacheCtx.dht().near().peekEx(txEntry.key());
+                                    else if (cacheCtx.isNear() && txEntry.locallyMapped())
+                                        metrics = false;
+
+                                    boolean evt = !isNearLocallyMapped(txEntry, false);
+
+                                    // For near local transactions we must record DHT version
+                                    // in order to keep near entries on backup nodes until
+                                    // backup remote transaction completes.
+                                    if (cacheCtx.isNear())
+                                        ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
+
+                                    if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+                                        txEntry.cached().unswap(true, false);
+
+                                    GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
+                                        true);
+
+                                    GridCacheOperation op = res.get1();
+                                    V val = res.get2();
+                                    byte[] valBytes = res.get3();
+
+                                    // Preserve TTL if needed.
+                                    if (txEntry.ttl() < 0)
+                                        txEntry.ttl(cached.ttl());
+
+                                    // Deal with DR conflicts.
+                                    GridCacheVersion explicitVer = txEntry.drVersion() != null ?
+                                        txEntry.drVersion() : writeVersion();
+
+                                    GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached,
+                                        txEntry,
+                                        explicitVer,
+                                        op,
+                                        val,
+                                        valBytes,
+                                        txEntry.ttl(),
+                                        txEntry.drExpireTime());
+
+                                    if (drRes != null) {
+                                        op = drRes.operation();
+                                        val = drRes.value();
+                                        valBytes = drRes.valueBytes();
+
+                                        if (drRes.isMerge())
+                                            explicitVer = writeVersion();
+                                    }
+                                    else
+                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
+                                        explicitVer = null;
+
+                                    if (sndTransformedVals || (drRes != null)) {
+                                        assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null);
+
+                                        txEntry.value(val, true, false);
+                                        txEntry.valueBytes(valBytes);
+                                        txEntry.op(op);
+                                        txEntry.transformClosures(null);
+                                        txEntry.drVersion(explicitVer);
+                                    }
+
+                                    if (op == CREATE || op == UPDATE) {
+                                        GridCacheUpdateTxResult<V> updRes = cached.innerSet(
+                                            this,
+                                            eventNodeId(),
+                                            txEntry.nodeId(),
+                                            val,
+                                            valBytes,
+                                            false,
+                                            false,
+                                            txEntry.ttl(),
+                                            evt,
+                                            metrics,
+                                            topVer,
+                                            txEntry.filters(),
+                                            cached.detached() ? DR_NONE : drType,
+                                            txEntry.drExpireTime(),
+                                            cached.isNear() ? null : explicitVer,
+                                            CU.subjectId(this, cctx),
+                                            resolveTaskName());
+
+                                        if (nearCached != null && updRes.success())
+                                            nearCached.innerSet(
+                                                null,
+                                                eventNodeId(),
+                                                nodeId,
+                                                val,
+                                                valBytes,
+                                                false,
+                                                false,
+                                                txEntry.ttl(),
+                                                false,
+                                                metrics,
+                                                topVer,
+                                                CU.<K, V>empty(),
+                                                DR_NONE,
+                                                txEntry.drExpireTime(),
+                                                null,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName());
+                                    }
+                                    else if (op == DELETE) {
+                                        GridCacheUpdateTxResult<V> updRes = cached.innerRemove(
+                                            this,
+                                            eventNodeId(),
+                                            txEntry.nodeId(),
+                                            false,
+                                            false,
+                                            evt,
+                                            metrics,
+                                            topVer,
+                                            txEntry.filters(),
+                                            cached.detached()  ? DR_NONE : drType,
+                                            cached.isNear() ? null : explicitVer,
+                                            CU.subjectId(this, cctx),
+                                            resolveTaskName());
+
+                                        if (nearCached != null && updRes.success())
+                                            nearCached.innerRemove(
+                                                null,
+                                                eventNodeId(),
+                                                nodeId,
+                                                false,
+                                                false,
+                                                false,
+                                                metrics,
+                                                topVer,
+                                                CU.<K, V>empty(),
+                                                DR_NONE,
+                                                null,
+                                                CU.subjectId(this, cctx),
+                                                resolveTaskName());
+                                    }
+                                    else if (op == RELOAD) {
+                                        cached.innerReload(CU.<K, V>empty());
+
+                                        if (nearCached != null)
+                                            nearCached.innerReload(CU.<K, V>empty());
+                                    }
+                                    else if (op == READ) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Ignoring READ entry when committing: " + txEntry);
+                                    }
+                                    else {
+                                        assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
+                                            "Transaction does not own lock for group lock entry during  commit [tx=" +
+                                                this + ", txEntry=" + txEntry + ']';
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Ignoring NOOP entry when committing: " + txEntry);
+                                    }
+                                }
+
+                                // Check commit locks after set, to make sure that
+                                // we are not changing obsolete entries.
+                                // (innerSet and innerRemove will throw an exception
+                                // if an entry is obsolete).
+                                if (txEntry.op() != READ && !txEntry.groupLockEntry())
+                                    checkCommitLocks(cached);
+
+                                // Break out of while loop.
+                                break;
+                            }
+                            // If entry cached within transaction got removed.
+                            catch (GridCacheEntryRemovedException ignored) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
+
+                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes());
+                            }
+                        }
+                    }
+                    catch (Throwable ex) {
+                        // We are about to initiate transaction rollback when tx has started to committing.
+                        // Need to remove version from committed list.
+                        cctx.tm().removeCommittedTx(this);
+
+                        if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to update mongo document index (transaction entry will " +
+                                    "be ignored): " + txEntry);
+
+                            // Set operation to NOOP.
+                            txEntry.op(NOOP);
+
+                            setRollbackOnly();
+
+                            throw ex;
+                        }
+                        else {
+                            IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " +
+                                "(all transaction entries will be invalidated, however there was a window when " +
+                                "entries for this transaction were visible to others): " + this, ex);
+
+                            U.error(log, "Heuristic transaction failure.", err);
+
+                            commitErr.compareAndSet(null, err);
+
+                            state(UNKNOWN);
+
+                            try {
+                                // Courtesy to minimize damage.
+                                uncommit();
+                            }
+                            catch (Throwable ex1) {
+                                U.error(log, "Failed to uncommit transaction: " + this, ex1);
+                            }
+
+                            throw err;
+                        }
+                    }
+                }
+            }
+            finally {
+                cctx.tm().txContextReset();
+            }
+        }
+        else {
+            GridCacheStoreManager<K, V> store = store();
+
+            if (store != null && (!internal() || groupLock())) {
+                try {
+                    store.txEnd(this, true);
+                }
+                catch (IgniteCheckedException e) {
+                    commitError(e);
+
+                    setRollbackOnly();
+
+                    cctx.tm().removeCommittedTx(this);
+
+                    throw e;
+                }
+            }
+        }
+
+        // Do not unlock transaction entries if one-phase commit.
+        if (!onePhaseCommit()) {
+            if (doneFlag.compareAndSet(false, true)) {
+                // Unlock all locks.
+                cctx.tm().commitTx(this);
+
+                boolean needsCompletedVersions = needsCompletedVersions();
+
+                assert !needsCompletedVersions || completedBase != null;
+                assert !needsCompletedVersions || committedVers != null;
+                assert !needsCompletedVersions || rolledbackVers != null;
+            }
+        }
+    }
+
+    /**
+     * Commits transaction to transaction manager. Used for one-phase commit transactions only.
+     */
+    public void tmCommit() {
+        assert onePhaseCommit();
+
+        if (doneFlag.compareAndSet(false, true)) {
+            // Unlock all locks.
+            cctx.tm().commitTx(this);
+
+            state(COMMITTED);
+
+            boolean needsCompletedVersions = needsCompletedVersions();
+
+            assert !needsCompletedVersions || completedBase != null;
+            assert !needsCompletedVersions || committedVers != null;
+            assert !needsCompletedVersions || rolledbackVers != null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void completedVersions(
+        GridCacheVersion completedBase,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers) {
+        this.completedBase = completedBase;
+        this.committedVers = committedVers;
+        this.rolledbackVers = rolledbackVers;
+    }
+
+    /**
+     * @return Completed base for ordering.
+     */
+    public GridCacheVersion completedBase() {
+        return completedBase;
+    }
+
+    /**
+     * @return Committed versions.
+     */
+    public Collection<GridCacheVersion> committedVersions() {
+        return committedVers;
+    }
+
+    /**
+     * @return Rolledback versions.
+     */
+    public Collection<GridCacheVersion> rolledbackVersions() {
+        return rolledbackVers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void userRollback() throws IgniteCheckedException {
+        IgniteTxState state = state();
+
+        if (state != ROLLING_BACK && state != ROLLED_BACK) {
+            setRollbackOnly();
+
+            throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']',
+                commitErr.get());
+        }
+
+        if (doneFlag.compareAndSet(false, true)) {
+            try {
+                if (near())
+                    // Must evict near entries before rolling back from
+                    // transaction manager, so they will be removed from cache.
+                    for (IgniteTxEntry<K, V> e : allEntries())
+                        evictNearEntry(e, false);
+
+                cctx.tm().rollbackTx(this);
+
+                GridCacheStoreManager<K, V> store = store();
+
+                if (store != null && (near() || store.writeToStoreFromDht())) {
+                    if (!internal() || groupLock())
+                        store.txEnd(this, false);
+                }
+            }
+            catch (Error | IgniteCheckedException | RuntimeException e) {
+                U.addLastCause(e, commitErr.get(), log);
+
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Checks if there is a cached or swapped value for
+     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+     *
+     *
+     * @param keys Key to enlist.
+     * @param cached Cached entry, if called from entry wrapper.
+     * @param map Return map.
+     * @param missed Map of missed keys.
+     * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+     * @param deserializePortable Deserialize portable flag.
+     * @param filter Filter to test.
+     * @throws IgniteCheckedException If failed.
+     * @return Enlisted keys.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    private Collection<K> enlistRead(
+        final GridCacheContext<K, V> cacheCtx,
+        Collection<? extends K> keys,
+        @Nullable GridCacheEntryEx<K, V> cached,
+        Map<K, V> map,
+        Map<K, GridCacheVersion> missed,
+        int keysCnt,
+        boolean deserializePortable,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+        assert !F.isEmpty(keys);
+        assert keysCnt == keys.size();
+        assert cached == null || F.first(keys).equals(cached.key());
+
+        cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+        groupLockSanityCheck(cacheCtx, keys);
+
+        boolean single = keysCnt == 1;
+
+        Collection<K> lockKeys = null;
+
+        long topVer = topologyVersion();
+
+        // In this loop we cover only read-committed or optimistic transactions.
+        // Transactions that are pessimistic and not read-committed are covered
+        // outside of this loop.
+        for (K key : keys) {
+            if (key == null)
+                continue;
+
+            if (pessimistic() && !readCommitted())
+                addActiveCache(cacheCtx);
+
+            IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+            // Check write map (always check writes first).
+            IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+            // Either non-read-committed or there was a previous write.
+            if (txEntry != null) {
+                if (cacheCtx.isAll(txEntry.cached(), filter)) {
+                    V val = txEntry.value();
+
+                    // Read value from locked entry in group-lock transaction as well.
+                    if (txEntry.hasValue()) {
+                        if (!F.isEmpty(txEntry.transformClosures())) {
+                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                val = clos.apply(val);
+                        }
+
+                        if (val != null) {
+                            V val0 = val;
+
+                            if (cacheCtx.portableEnabled())
+                                val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+                            map.put(key, val0);
+                        }
+                    }
+                    else {
+                        assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
+
+                        while (true) {
+                            try {
+                                Object transformClo =
+                                    (txEntry.op() == TRANSFORM  && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                        F.first(txEntry.transformClosures()) : null;
+
+                                val = txEntry.cached().innerGet(this,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*fail fast*/true,
+                                    /*unmarshal*/true,
+                                    /*metrics*/true,
+                                    /*event*/true,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    transformClo,
+                                    resolveTaskName(),
+                                    filter);
+
+                                if (val != null) {
+                                    if (!readCommitted())
+                                        txEntry.readValue(val);
+
+                                    if (!F.isEmpty(txEntry.transformClosures())) {
+                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                            val = clos.apply(val);
+                                    }
+
+                                    V val0 = val;
+
+                                    if (cacheCtx.portableEnabled())
+                                        val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+                                    map.put(key, val0);
+                                }
+                                else
+                                    missed.put(key, txEntry.cached().version());
+
+                                break;
+                            }
+                            catch (GridCacheFilterFailedException e) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Filter validation failed for entry: " + txEntry);
+
+                                if (!readCommitted())
+                                    txEntry.readValue(e.<V>value());
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes());
+                            }
+                        }
+                    }
+                }
+            }
+            // First time access within transaction.
+            else {
+                if (lockKeys == null)
+                    lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt);
+
+                if (!single)
+                    lockKeys.add(key);
+
+                while (true) {
+                    GridCacheEntryEx<K, V> entry;
+
+                    if (cached != null) {
+                        entry = cached;
+
+                        cached = null;
+                    }
+                    else
+                        entry = entryEx(cacheCtx, txKey, topVer);
+
+                    try {
+                        GridCacheVersion ver = entry.version();
+
+                        V val = null;
+
+                        if (!pessimistic() || readCommitted() || groupLock()) {
+                            // This call will check for filter.
+                            val = entry.innerGet(this,
+                                /*swap*/true,
+                                /*no read-through*/false,
+                                /*fail-fast*/true,
+                                /*unmarshal*/true,
+                                /*metrics*/true,
+                                /*event*/true,
+                                /*temporary*/false,
+                                CU.subjectId(this, cctx),
+                                null,
+                                resolveTaskName(),
+                                filter);
+
+                            if (val != null) {
+                                V val0 = val;
+
+                                if (cacheCtx.portableEnabled())
+                                    val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+                                map.put(key, val0);
+                            }
+                            else
+                                missed.put(key, ver);
+                        }
+                        else
+                            // We must wait for the lock in pessimistic mode.
+                            missed.put(key, ver);
+
+                        if (!readCommitted()) {
+                            txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
+
+                            if (groupLock())
+                                txEntry.groupLockEntry(true);
+
+                            // As optimization, mark as checked immediately
+                            // for non-pessimistic if value is not null.
+                            if (val != null && !pessimistic())
+                                txEntry.markValid();
+                        }
+
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+                    }
+                    catch (GridCacheFilterFailedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Filter validation failed for entry: " + entry);
+
+                        if (!readCommitted()) {
+                            // Value for which failure occurred.
+                            V val = e.<V>value();
+
+                            txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
+
+                            // Mark as checked immediately for non-pessimistic.
+                            if (val != null && !pessimistic())
+                                txEntry.markValid();
+                        }
+
+                        break; // While loop.
+                    }
+                }
+            }
+        }
+
+        return lockKeys != null ? lockKeys : Collections.<K>emptyList();
+    }
+
+    /**
+     * Adds skipped key.
+     *
+     * @param skipped Skipped set (possibly {@code null}).
+     * @param key Key to add.
+     * @return Skipped set.
+     */
+    private Set<K> skip(Set<K> skipped, K key) {
+        if (skipped == null)
+            skipped = new GridLeanSet<>();
+
+        skipped.add(key);
+
+        if (log.isDebugEnabled())
+            log.debug("Added key to skipped set: " + key);
+
+        return skipped;
+    }
+
+    /**
+     * Loads all missed keys for
+     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+     *
+     * @param map Return map.
+     * @param missedMap Missed keys.
+     * @param redos Keys to retry.
+     * @param deserializePortable Deserialize portable flag.
+     * @param filter Filter.
+     * @return Loaded key-value pairs.
+     */
+    private IgniteFuture<Map<K, V>> checkMissed(
+        final GridCacheContext<K, V> cacheCtx,
+        final Map<K, V> map,
+        final Map<K, GridCacheVersion> missedMap,
+        @Nullable final Collection<K> redos,
+        final boolean deserializePortable,
+        final IgnitePredicate<GridCacheEntry<K, V>>[] filter
+    ) {
+        assert redos != null || pessimistic();
+
+        if (log.isDebugEnabled())
+            log.debug("Loading missed values for missed map: " + missedMap);
+
+        final Collection<K> loaded = new HashSet<>();
+
+        return new GridEmbeddedFuture<>(cctx.kernalContext(),
+            loadMissing(
+                cacheCtx,
+                false, missedMap.keySet(), deserializePortable, new CI2<K, V>() {
+                /** */
+                private GridCacheVersion nextVer;
+
+                @Override public void apply(K key, V val) {
+                    if (isRollbackOnly()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+                                IgniteTxLocalAdapter.this);
+
+                        return;
+                    }
+
+                    GridCacheVersion ver = missedMap.get(key);
+
+                    if (ver == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+
+                        return;
+                    }
+
+                    V visibleVal = val;
+
+                    IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+                    IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+                    if (txEntry != null) {
+                        if (!readCommitted())
+                            txEntry.readValue(val);
+
+                        if (!F.isEmpty(txEntry.transformClosures())) {
+                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                visibleVal = clos.apply(visibleVal);
+                        }
+                    }
+
+                    // In pessimistic mode we hold the lock, so filter validation
+                    // should always be valid.
+                    if (pessimistic())
+                        ver = null;
+
+                    // Initialize next version.
+                    if (nextVer == null)
+                        nextVer = cctx.versions().next(topologyVersion());
+
+                    while (true) {
+                        assert txEntry != null || readCommitted() || groupLock();
+
+                        GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+
+                        try {
+                            boolean pass = cacheCtx.isAll(e, filter);
+
+                            // Must initialize to true since even if filter didn't pass,
+                            // we still record the transaction value.
+                            boolean set = true;
+
+                            if (pass) {
+                                try {
+                                    set = e.versionedValue(val, ver, nextVer);
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed entry in transaction getAll method " +
+                                            "(will try again): " + e);
+
+                                    if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
+                                        (!groupLock() || F.eq(e.key(), groupLockKey()))) {
+                                        U.error(log, "Inconsistent transaction state (entry got removed while " +
+                                            "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+
+                                        setRollbackOnly();
+
+                                        return;
+                                    }
+
+                                    if (txEntry != null)
+                                        txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+
+                                    continue; // While loop.
+                                }
+                            }
+
+                            // In pessimistic mode, we should always be able to set.
+                            assert set || !pessimistic();
+
+                            if (readCommitted() || groupLock()) {
+                                cacheCtx.evicts().touch(e, topologyVersion());
+
+                                if (pass && visibleVal != null)
+                                    map.put(key, visibleVal);
+                            }
+                            else {
+                                assert txEntry != null;
+
+                                if (set || F.isEmptyOrNulls(filter)) {
+                                    txEntry.setAndMarkValid(val);
+
+                                    if (pass && visibleVal != null)
+                                        map.put(key, visibleVal);
+                                }
+                                else {
+                                    assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " +
+                                        this;
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to set versioned value for entry (will redo): " + e);
+
+                                    redos.add(key);
+                                }
+                            }
+
+                            loaded.add(key);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Set value loaded from store into entry from transaction [set=" + set +
+                                    ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+
+                            break; // While loop.
+                        }
+                        catch (IgniteCheckedException ex) {
+                            throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+                        }
+                    }
+                }
+            }),
+            new C2<Boolean, Exception, Map<K, V>>() {
+                @Override public Map<K, V> apply(Boolean b, Exception e) {
+                    if (e != null) {
+                        setRollbackOnly();
+
+                        throw new GridClosureException(e);
+                    }
+
+                    if (!b && !readCommitted()) {
+                        // There is no store - we must mark the entries.
+                        for (K key : missedMap.keySet()) {
+                            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+
+                            if (txEntry != null)
+                                txEntry.markValid();
+                        }
+                    }
+
+                    if (readCommitted()) {
+                        Collection<K> notFound = new HashSet<>(missedMap.keySet());
+
+                        notFound.removeAll(loaded);
+
+                        // In read-committed mode touch entries that have just been read.
+                        for (K key : notFound) {
+                            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+
+                            GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+                                txEntry.cached();
+
+                            if (entry != null)
+                                cacheCtx.evicts().touch(entry, topologyVersion());
+                        }
+                    }
+
+                    return map;
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Map<K, V>> getAllAsync(
+        final GridCacheContext<K, V> cacheCtx,
+        Collection<? extends K> keys,
+        @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
+        final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+        if (F.isEmpty(keys))
+            return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+
+        init();
+
+        int keysCnt = keys.size();
+
+        boolean single = keysCnt == 1;
+
+        try {
+            checkValid();
+
+            final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+            final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+            final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt,
+                deserializePortable, filter);
+
+            if (single && missed.isEmpty())
+                return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+
+            // Handle locks.
+            if (pessimistic() && !readCommitted() && !groupLock()) {
+                IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
+                    isolation, isInvalidate(), CU.<K, V>empty());
+
+                PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+                    @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+                        if (log.isDebugEnabled())
+                            log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+                        // Load keys only after the locks have been acquired.
+                        for (K key : lockKeys) {
+                            if (retMap.containsKey(key))
+                                // We already have a return value.
+                                continue;
+
+                            IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+                            IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+                            assert txEntry != null;
+
+                            // Check if there is cached value.
+                            while (true) {
+                                GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+                                try {
+                                    Object transformClo =
+                                        (!F.isEmpty(txEntry.transformClosures()) &&
+                                            cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+                                            F.first(txEntry.transformClosures()) : null;
+
+                                    V val = cached.innerGet(IgniteTxLocalAdapter.this,
+                                        cacheCtx.isSwapOrOffheapEnabled(),
+                                        /*read-through*/false,
+                                        /*fail-fast*/true,
+                                        /*unmarshal*/true,
+                                        /*metrics*/true,
+                                        /*events*/true,
+                                        /*temporary*/true,
+                                        CU.subjectId(IgniteTxLocalAdapter.this, cctx),
+                                        transformClo,
+                                        resolveTaskName(),
+                                        filter);
+
+                                    // If value is in cache and passed the filter.
+                                    if (val != null) {
+                                        missed.remove(key);
+
+                                        txEntry.setAndMarkValid(val);
+
+                                        if (!F.isEmpty(txEntry.transformClosures())) {
+                                            for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                                val = clos.apply(val);
+                                        }
+
+                                        if (cacheCtx.portableEnabled())
+                                            val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+                                        retMap.put(key, val);
+                                    }
+
+                                    // Even though we bring the value back from lock acquisition,
+                                    // we still need to recheck primary node for consistent values
+                                    // in case of concurrent transactional locks.
+
+                                    break; // While.
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed exception in get postLock (will retry): " +
+                                            cached);
+
+                                    txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+                                }
+                                catch (GridCacheFilterFailedException e) {
+                                    // Failed value for the filter.
+                                    V val = e.value();
+
+                                    if (val != null) {
+                                        // If filter fails after lock is acquired, we don't reload,
+                                        // regardless if value is null or not.
+                                        missed.remove(key);
+
+                                        txEntry.setAndMarkValid(val);
+                                    }
+
+                                    break; // While.
+                                }
+                            }
+                        }
+
+                        if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
+                            return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter);
+
+                        return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+                    }
+                };
+
+                FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+                    @Override Map<K, V> finish(Map<K, V> loaded) {
+                        retMap.putAll(loaded);
+
+                        return retMap;
+                    }
+                };
+
+                if (fut.isDone()) {
+                    try {
+                        IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+                        return fut1.isDone() ?
+                            new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) :
+                            new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos);
+                    }
+                    catch (GridClosureException e) {
+                        return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap());
+                    }
+                    catch (IgniteCheckedException e) {
+                        try {
+                            return plc2.apply(false, e);
+                        }
+                        catch (Exception e1) {
+                            return new GridFinishedFuture<>(cctx.kernalContext(), e1);
+                        }
+                    }
+                }
+                else {
+                    return new GridEmbeddedFuture<>(
+                        cctx.kernalContext(),
+                        fut,
+                        plc2,
+                        finClos);
+                }
+            }
+            else {
+                assert optimistic() || readCommitted() || groupLock();
+
+                final Collection<K> redos = new LinkedList<>();
+
+                if (!missed.isEmpty()) {
+                    if (!readCommitted())
+                        for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); )
+                            if (retMap.containsKey(it.next()))
+                                it.remove();
+
+                    if (missed.isEmpty())
+                        return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+
+                    return new GridEmbeddedFuture<>(
+                        cctx.kernalContext(),
+                        // First future.
+                        checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter),
+                        // Closure that returns another future, based on result from first.
+                        new PMC<Map<K, V>>() {
+                            @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) {
+                                if (redos.isEmpty())
+                                    return new GridFinishedFuture<>(cctx.kernalContext(),
+                                        Collections.<K, V>emptyMap());
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Starting to future-recursively get values for keys: " + redos);
+
+                                // Future recursion.
+                                return getAllAsync(cacheCtx, redos, null, deserializePortable, filter);
+                            }
+                        },
+                        // Finalize.
+                        new FinishClosure<Map<K, V>>() {
+                            @Override Map<K, V> finish(Map<K, V> loaded) {
+                                for (Map.Entry<K, V> entry : loaded.entrySet()) {
+                                    IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey()));
+
+                                    V val = entry.getValue();
+
+                                    if (!readCommitted())
+                                        txEntry.readValue(val);
+
+                                    if (!F.isEmpty(txEntry.transformClosures())) {
+                                        for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+                                            val = clos.apply(val);
+                                    }
+
+                                    retMap.put(entry.getKey(), val);
+                                }
+
+                                return retMap;
+                            }
+                        }
+                    );
+                }
+
+                return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            setRollbackOnly();
+
+            return new GridFinishedFuture<>(cctx.kernalContext(), e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
+        GridCacheContext<K, V> cacheCtx,
+        Map<? extends K, ? extends V> map,
+        boolean retval,
+        @Nullable GridCacheEntryEx<K, V> cached,
+        long ttl,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter
+    ) {
+        return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> putAllDrAsync(
+        GridCacheContext<K, V> cacheCtx,
+        Map<? extends K, GridCacheDrInfo<V>> drMap
+    ) {
+        return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
+        GridCacheContext<K, V> cacheCtx,
+        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
+        boolean retval,
+        @Nullable GridCacheEntryEx<K, V> cached,
+        long ttl
+    ) {
+        return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> removeAllDrAsync(
+        GridCacheContext<K, V> cacheCtx,
+        Map<? extends K, GridCacheVersion> drMap
+    ) {
+        return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
+    }
+
+    /**
+     * Checks filter for non-pessimistic transactions.
+     *
+     * @param cached Cached entry.
+     * @param filter Filter to check.
+     * @return {@code True} if passed or pessimistic.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean filter(GridCacheEntryEx<K, V> cached,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+        return pessimistic() || cached.context().isAll(cached, filter);
+    }
+
+    /**
+     * Internal routine for <tt>putAll(..)</tt>
+     *
+     * @param keys Keys to enlist.
+     * @param cached Cached entry.
+     * @param ttl Time to live for entry. If negative, leave unchanged.
+     * @param implicit Implicit flag.
+     * @param lookup Value lookup map ({@code null} for remove).
+     * @param transformMap Map with transform closures if this is a transform operation.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+     * @param filter User filters.
+     * @param ret Return value.
+     * @param enlisted Collection of keys enlisted into this transaction.
+     * @param drPutMap DR put map (optional).
+     * @param drRmvMap DR remove map (optional).
+     * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
+     */
+    protected IgniteFuture<Set<K>> enlistWrite(
+        GridCacheContext<K, V> cacheCtx,
+        Collection<? extends K> keys,
+        @Nullable GridCacheEntryEx<K, V> cached,
+        long ttl,
+        boolean implicit,
+        @Nullable Map<? extends K, ? extends V> lookup,
+        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        boolean retval,
+        boolean lockOnly,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+        final GridCacheReturn<V> ret,
+        Collection<K> enlisted,
+        @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
+        @Nullable Map<? extends K, GridCacheVersion> drRmvMap
+    ) {
+        assert cached == null || keys.size() == 1;
+        assert cached == null || F.first(keys).equals(cached.key());
+
+        try {
+            addActiveCache(cacheCtx);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(cctx.kernalContext(), e);
+        }
+
+        Set<K> skipped = null;
+
+        boolean rmv = lookup == null && transformMap == null;
+
+        try {
+            // Set transform flag for transaction.
+            if (transformMap != null)
+                transform = true;
+
+            groupLockSanityCheck(cacheCtx, keys);
+
+            for (K key : keys) {
+                V val = rmv || lookup == null ? null : lookup.get(key);
+                IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
+
+                GridCacheVersion drVer;
+                long drTtl;
+                long drExpireTime;
+
+                if (drPutMap != null) {
+                    GridCacheDrInfo<V> info = drPutMap.get(key);
+
+                    assert info != null;
+
+                    drVer = info.version();
+                    drTtl = info.ttl();
+                    drExpireTime = info.expireTime();
+                }
+                else if (drRmvMap != null) {
+                    assert drRmvMap.get(key) != null;
+
+                    drVer = drRmvMap.get(key);
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
+                else {
+                    drVer = null;
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
+
+                if (key == null)
+                    continue;
+
+                if (!rmv && val == null && transformClo == null) {
+                    skipped = skip(skipped, key);
+
+                    continue;
+                }
+
+                if (cacheCtx.portableEnabled())
+                    key = (K)cacheCtx.marshalToPortable(key);
+
+                IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+                IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+                // First time access.
+                if (txEntry == null) {
+                    while (true) {
+                        GridCacheEntryEx<K, V> entry;
+
+                        if (cached != null) {
+                            entry = cached;
+
+                            cached = null;
+                        }
+                        else {
+                            entry = entryEx(cacheCtx, txKey, topologyVersion());
+
+                            entry.unswap(true, false);
+                        }
+
+                        try {
+                            // Check if lock is being explicitly acquired by the same thread.
+                            if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+                                entry.lockedByThread(threadId, xidVer))
+                                throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+                                    "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
+                                    ", threadId=" + threadId +
+                                    ", locNodeId=" + cctx.localNodeId() + ']');
+
+                            V old = null;
+
+                            boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+
+                            if (optimistic()) {
+                                try {
+                                    //Should read through if filter is specified.
+                                    old = entry.innerGet(this,
+                                        /*swap*/false,
+                                        /*read-through*/readThrough,
+                                        /*fail-fast*/false,
+                                        /*unmarshal*/retval,
+                                        /*metrics*/retval,
+                                        /*events*/retval,
+                                        /*temporary*/false,
+                                        CU.subjectId(this, cctx),
+                                        transformClo,
+                                        resolveTaskName(),
+                                        CU.<K, V>empty());
+                                }
+                                catch (GridCacheFilterFailedException e) {
+                                    e.printStackTrace();
+
+                                    assert false : "Empty filter failed: " + e;
+                                }
+                            }
+                            else
+                                old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+
+                            if (!filter(entry, filter)) {
+                                skipped = skip(skipped, key);
+
+                                ret.set(old, false);
+
+                                if (!readCommitted() && old != null) {
+                                    // Enlist failed filters as reads for non-read-committed mode,
+                                    // so future ops will get the same values.
+                                    txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
+                                        null);
+
+                                    txEntry.markValid();
+                                }
+
+                                if (readCommitted() || old == null)
+                                    cacheCtx.evicts().touch(entry, topologyVersion());
+
+                                break; // While.
+                            }
+
+                            txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
+                                old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+                                drExpireTime, drVer);
+
+                            if (!implicit() && readCommitted())
+                                cacheCtx.evicts().touch(entry, topologyVersion());
+
+                            if (groupLock() && !lockOnly)
+                                txEntry.groupLockEntry(true);
+
+                            enlisted.add(key);
+
+                            if (!pessimistic() || (groupLock() && !lockOnly)) {
+                                txEntry.markValid();
+
+                                if (old == null) {
+                                    if (retval && !readThrough) {
+                                        // If return value is required, then we know for sure that there is only
+                                        // one key in the keys collection.
+                                        assert keys.size() == 1;
+
+                                        IgniteFuture<Boolean> fut = loadMissing(
+                                            cacheCtx,
+                                            true,
+                                            F.asList(key),
+                                            deserializePortables(cacheCtx),
+                                            new CI2<K, V>() {
+                                                @Override public void apply(K k, V v) {
+                                                    if (log.isDebugEnabled())
+                                                        log.debug("Loaded value from remote node [key=" + k + ", val=" +
+                                                            v + ']');
+
+                                                    ret.set(v, true);
+                                                }
+                                            });
+
+                                        return new GridEmbeddedFuture<>(
+                                            cctx.kernalContext(),
+                                            fut,
+                                            new C2<Boolean, Exception, Set<K>>() {
+                                                @Override public Set<K> apply(Boolean b, Exception e) {
+                                                    if (e != null)
+                                                        throw new GridClosureException(e);
+
+                                                    return Collections.emptySet();
+                                                }
+                                            }
+                                        );
+                                    }
+                                    else
+                                        ret.set(null, true);
+                                }
+                                else
+                                    ret.set(old, true);
+                            }
+                            // Pessimistic.
+                            else
+                                ret.set(old, true);
+
+                            break; // While.
+                        }
+                        catch (GridCacheEntryRemovedException ignore) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got removed entry in transaction putAll0 method: " + entry);
+                        }
+                    }
+                }
+                else {
+                    if (transformClo == null && txEntry.op() == TRANSFORM)
+                        throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+                            "transaction after transform closure is applied): " + key);
+
+                    GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+                    V v = txEntry.value();
+
+                    boolean del = txEntry.op() == DELETE && rmv;
+
+                    if (!del) {
+                        if (!filter(entry, filter)) {
+                            skipped = skip(skipped, key);
+
+                            ret.set(v, false);
+
+                            continue;
+                        }
+
+                        txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
+                            v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+                            drExpireTime, drVer);
+
+                        enlisted.add(key);
+                    }
+
+                    if (!pessimistic()) {
+                        txEntry.markValid();
+
+                        // Set tx entry and return values.
+                        ret.set(v, true);
+                    }
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(cctx.kernalContext(), e);
+        }
+
+        return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
+    }
+
+    /**
+     * Post lock processing for put or remove.
+     *
+     * @param keys Keys.
+     * @param failed Collection of potentially failed keys (need to populate in this method).
+     * @param transformed Output map where transformed values will be placed.
+     * @param transformMap Transform map.
+     * @param ret Return value.
+     * @param rmv {@code True} if remove.
+     * @param retval Flag to return value or not.
+     * @param filter Filter to check entries.
+     * @return Failed keys.
+     * @throws IgniteCheckedException If error.
+     */
+    protected Set<K> postLockWrite(
+        GridCacheContext<K, V> cacheCtx,
+        Iterable<? extends K> keys,
+        Set<K> failed,
+        @Nullable Map<K, V> transformed,
+        @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+        GridCacheReturn<V> ret,
+        boolean rmv,
+        boolean retval,
+        IgnitePredicate<GridCacheEntry<K, V>>[] filter
+    ) throws IgniteCheckedException {
+        for (K k : keys) {
+            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
+
+            if (txEntry == null)
+                throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+                    "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
+
+            while (true) {
+                GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+                try {
+                    assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+                        "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+                            ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
+
+                    if (log.isDebugEnabled())
+                        log.debug("Post lock write entry: " + cached);
+
+                    V v = txEntry.previousValue();
+                    boolean hasPrevVal = txEntry.hasPreviousValue();
+
+                    if (onePhaseCommit())
+                        filter = txEntry.filters();
+
+                    // If we have user-passed filter, we must read value into entry for peek().
+                    if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+                        retval = true;
+
+                    if (retval) {
+                        if (!cacheCtx.isNear()) {
+                            try {
+                                if (!hasPrevVal)
+                                    v = cached.innerGet(this,
+                                        /*swap*/retval,
+                                        /*read-through*/retval,
+                                        /*failFast*/false,
+                                        /*unmarshal*/retval,
+                                        /*metrics*/true,
+                                        /*event*/!dht(),
+                                        /*temporary*/false,
+                                        CU.subjectId(this, cctx),
+                                        null,
+                                        resolveTaskName(),
+                                        CU.<K, V>empty());
+                            }
+                            catch (GridCacheFilterFailedException e) {
+                                e.printStackTrace();
+
+                                assert false : "Empty filter failed: " + e;
+                            }
+                        }
+                        else {
+                            if (!hasPrevVal)
+                                v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet();
+                        }
+
+                        ret.value(v);
+                    }
+
+                    boolean pass = cacheCtx.isAll(cached, filter);
+
+                    // For remove operation we return true only if we are removing s/t,
+                    // i.e. cached value is not null.
+                    ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+                    if (onePhaseCommit())
+                        txEntry.filtersPassed(pass);
+
+                    if (pass) {
+                        txEntry.markValid();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Filter passed in post lock for key: " + k);
+                    }
+                    else {
+                        failed = skip(failed, k);
+
+                        // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+                        txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value());
+                        txEntry.filters(CU.<K, V>empty());
+                        txEntry.filtersSet(false);
+                    }
+
+                    break; // While.
+                }
+                // If entry cached within transaction got removed before lock.
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
+
+                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes());
+                }
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Entries that failed after lock filter check: " + failed);
+
+        return failed;
+    }
+
+    /**
+     * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+     * maps must be non-null.
+     *
+     * @param map Key-value map to store.
+     * @param transformMap Transform map.
+     * @param drMap DR map.
+     * @param retval Key-transform value map to store.
+     * @param cached Cached entry, if any.
+     * @param ttl Time to live.
+     * @param filter Filter.
+     * @return Operation future.
+     */
+    private IgniteFuture<GridCacheReturn<V>> putAllAsync0(
+        final GridCacheContext<K, V> cacheCtx,
+        @Nullable Map<? extends K, ? extends V> ma

<TRUNCATED>

Mime
View raw message