ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [43/50] [abbrv] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode
Date Wed, 28 Oct 2015 12:47:43 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f22e753..82e5f2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -31,12 +31,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,7 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -85,10 +86,8 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -105,6 +104,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -424,46 +425,126 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> loadMissing(
+    @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         final boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
-        final IgniteBiInClosure<KeyCacheObject, Object> c
+        boolean needVer,
+        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
     ) {
-        if (!async) {
-            try {
-                if (!readThrough || !cacheCtx.readThrough()) {
-                    for (KeyCacheObject key : keys)
-                        c.apply(key, null);
+        assert cacheCtx.isLocal() : cacheCtx.name();
 
-                    return new GridFinishedFuture<>(false);
-                }
+        if (!readThrough || !cacheCtx.readThrough()) {
+            for (KeyCacheObject key : keys)
+                c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
 
-                return new GridFinishedFuture<>(
-                    cacheCtx.store().loadAll(this, keys, c));
-            }
-            catch (IgniteCheckedException e) {
-                return new GridFinishedFuture<>(e);
-            }
+            return new GridFinishedFuture<>();
         }
-        else {
-            return cctx.kernalContext().closure().callLocalSafe(
-                new GPC<Boolean>() {
-                    @Override public Boolean call() throws Exception {
-                        if (!readThrough || !cacheCtx.readThrough()) {
-                            for (KeyCacheObject key : keys)
-                                c.apply(key, null);
-
-                            return false;
+
+        try {
+            IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys);
+
+            Map<KeyCacheObject, GridCacheVersion> misses = null;
+
+            for (KeyCacheObject key : keys) {
+                while (true) {
+                    IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
+
+                    GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
+                        txEntry.cached();
+
+                    if (entry == null)
+                        continue;
+
+                    try {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+                            /*readSwap*/true,
+                            /*unmarshal*/true,
+                            /*update-metrics*/!skipVals,
+                            /*event*/!skipVals,
+                            CU.subjectId(this, cctx),
+                            null,
+                            resolveTaskName(),
+                            expiryPlc);
+
+                        if (res == null) {
+                            if (misses == null)
+                                misses = new LinkedHashMap<>();
+
+                            misses.put(key, entry.version());
                         }
+                        else
+                            c.apply(key, skipVals ? true : res.get1(), res.get2());
 
-                        return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c);
+                        break;
                     }
-                },
-                true);
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry, will retry: " + key);
+
+                        if (txEntry != null)
+                            txEntry.cached(cacheCtx.cache().entryEx(key));
+                    }
+                }
+            }
+
+            if (misses != null) {
+                final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
+
+                cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
+                    @Override public void apply(KeyCacheObject key, Object val) {
+                        GridCacheVersion ver = misses0.remove(key);
+
+                        assert ver != null : key;
+
+                        if (val != null) {
+                            CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+                            while (true) {
+                                GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+
+                                try {
+                                    GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
+
+                                    boolean set = setVer != null;
+
+                                    if (set)
+                                        ver = setVer;
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Set value loaded from store into entry [set=" + set +
+                                            ", curVer=" + ver + ", newVer=" + setVer + ", " +
+                                            "entry=" + entry + ']');
+
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException ignore) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got removed entry, (will retry): " + entry);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    // Wrap errors (will be unwrapped).
+                                    throw new GridClosureException(e);
+                                }
+                            }
+                        }
+                        else
+                            ver = SER_READ_EMPTY_ENTRY_VER;
+
+                        c.apply(key, val, ver);
+                    }
+                });
+
+                for (KeyCacheObject key : misses0.keySet())
+                    c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
+            }
+
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
         }
     }
 
@@ -834,13 +915,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
                                         true);
 
+                                    GridCacheVersion dhtVer = null;
+
                                     // For near local transactions we must record DHT version
                                     // in order to keep near entries on backup nodes until
                                     // backup remote transaction completes.
                                     if (cacheCtx.isNear()) {
                                         if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
                                             txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
-                                            ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
+                                            dhtVer = txEntry.dhtVersion();
 
                                         if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
                                             txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
@@ -921,6 +1004,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         txEntry.conflictVersion(explicitVer);
                                     }
 
+                                    if (dhtVer == null)
+                                        dhtVer = explicitVer != null ? explicitVer : writeVersion();
+
                                     if (op == CREATE || op == UPDATE) {
                                         GridCacheUpdateTxResult updRes = cached.innerSet(
                                             this,
@@ -938,9 +1024,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             txEntry.conflictExpireTime(),
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            dhtVer);
 
-                                        if (nearCached != null && updRes.success())
+                                        if (nearCached != null && updRes.success()) {
                                             nearCached.innerSet(
                                                 null,
                                                 eventNodeId(),
@@ -957,7 +1044,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 txEntry.conflictExpireTime(),
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
+                                        }
                                     }
                                     else if (op == DELETE) {
                                         GridCacheUpdateTxResult updRes = cached.innerRemove(
@@ -973,9 +1062,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             cached.detached()  ? DR_NONE : drType,
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
-                                            resolveTaskName());
+                                            resolveTaskName(),
+                                            dhtVer);
 
-                                        if (nearCached != null && updRes.success())
+                                        if (nearCached != null && updRes.success()) {
                                             nearCached.innerRemove(
                                                 null,
                                                 eventNodeId(),
@@ -989,7 +1079,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                                 DR_NONE,
                                                 null,
                                                 CU.subjectId(this, cctx),
-                                                resolveTaskName());
+                                                resolveTaskName(),
+                                                dhtVer);
+                                        }
                                     }
                                     else if (op == RELOAD) {
                                         cached.innerReload();
@@ -1180,14 +1272,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 commitErr.get());
         }
 
+        if (near()) {
+            // Must evict near entries before rolling back from
+            // transaction manager, so they will be removed from cache.
+            for (IgniteTxEntry e : allEntries())
+                evictNearEntry(e, false);
+        }
+
         if (doneFlag.compareAndSet(false, true)) {
             try {
-                if (near())
-                    // Must evict near entries before rolling back from
-                    // transaction manager, so they will be removed from cache.
-                    for (IgniteTxEntry e : allEntries())
-                        evictNearEntry(e, false);
-
                 cctx.tm().rollbackTx(this);
 
                 if (!internal()) {
@@ -1228,12 +1321,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param entry Entry.
+     * @return {@code True} if local node is current primary for given entry.
+     */
+    private boolean primaryLocal(GridCacheEntryEx entry) {
+        return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+    }
+
+    /**
      * Checks if there is a cached or swapped value for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
+     * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
      *
      * @param cacheCtx Cache context.
      * @param keys Key to enlist.
-     * @param cached Cached entry, if called from entry wrapper.
      * @param expiryPlc Explicitly specified expiry policy for entry.
      * @param map Return map.
      * @param missed Map of missed keys.
@@ -1249,7 +1349,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     private <K, V> Collection<KeyCacheObject> enlistRead(
         final GridCacheContext cacheCtx,
         Collection<KeyCacheObject> keys,
-        @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         Map<K, V> map,
         Map<KeyCacheObject, GridCacheVersion> missed,
@@ -1261,7 +1360,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     ) throws IgniteCheckedException {
         assert !F.isEmpty(keys);
         assert keysCnt == keys.size();
-        assert cached == null || F.first(keys).equals(cached.key());
 
         cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -1271,11 +1369,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         AffinityTopologyVersion topVer = topologyVersion();
 
+        boolean needReadVer = serializable() && optimistic();
+
         // In this loop we cover only read-committed or optimistic transactions.
         // Transactions that are pessimistic and not read-committed are covered
         // outside of this loop.
         for (KeyCacheObject key : keys) {
-            if (pessimistic() && !readCommitted() && !skipVals)
+            if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
                 addActiveCache(cacheCtx);
 
             IgniteTxKey txKey = cacheCtx.txKey(key);
@@ -1337,13 +1437,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                             break;
                         }
-                        catch (GridCacheFilterFailedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Filter validation failed for entry: " + txEntry);
-
-                            if (!readCommitted())
-                                txEntry.readValue(e.<V>value());
-                        }
                         catch (GridCacheEntryRemovedException ignored) {
                             txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
                         }
@@ -1359,38 +1452,49 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     lockKeys.add(key);
 
                 while (true) {
-                    GridCacheEntryEx entry;
-
-                    if (cached != null) {
-                        entry = cached;
-
-                        cached = null;
-                    }
-                    else
-                        entry = entryEx(cacheCtx, txKey, topVer);
+                    GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
 
                     try {
                         GridCacheVersion ver = entry.version();
 
                         CacheObject val = null;
+                        GridCacheVersion readVer = null;
 
                         if (!pessimistic() || readCommitted() && !skipVals) {
                             IgniteCacheExpiryPolicy accessPlc =
                                 optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
 
-                            // This call will check for filter.
-                            val = entry.innerGet(this,
-                                /*swap*/true,
-                                /*no read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /*metrics*/true,
-                                /*event*/true,
-                                /*temporary*/false,
-                                CU.subjectId(this, cctx),
-                                null,
-                                resolveTaskName(),
-                                accessPlc);
+                            if (needReadVer) {
+                                T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+                                    entry.innerGetVersioned(this,
+                                        /*swap*/true,
+                                        /*unmarshal*/true,
+                                        /*metrics*/true,
+                                        /*event*/true,
+                                        CU.subjectId(this, cctx),
+                                        null,
+                                        resolveTaskName(),
+                                        accessPlc) : null;
+
+                                if (res != null) {
+                                    val = res.get1();
+                                    readVer = res.get2();
+                                }
+                            }
+                            else {
+                                val = entry.innerGet(this,
+                                    /*swap*/true,
+                                    /*no read-through*/false,
+                                    /*fail-fast*/true,
+                                    /*unmarshal*/true,
+                                    /*metrics*/true,
+                                    /*event*/true,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    null,
+                                    resolveTaskName(),
+                                    accessPlc);
+                            }
 
                             if (val != null) {
                                 cacheCtx.addResult(map,
@@ -1424,8 +1528,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                             // As optimization, mark as checked immediately
                             // for non-pessimistic if value is not null.
-                            if (val != null && !pessimistic())
+                            if (val != null && !pessimistic()) {
                                 txEntry.markValid();
+
+                                if (needReadVer) {
+                                    assert readVer != null;
+
+                                    txEntry.serializableReadVersion(readVer);
+                                }
+                            }
                         }
 
                         break; // While.
@@ -1434,34 +1545,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
                     }
-                    catch (GridCacheFilterFailedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Filter validation failed for entry: " + entry);
-
-                        if (!readCommitted()) {
-                            // Value for which failure occurred.
-                            CacheObject val = e.value();
-
-                            txEntry = addEntry(READ,
-                                val,
-                                null,
-                                null,
-                                entry,
-                                expiryPlc,
-                                CU.empty0(),
-                                false,
-                                -1L,
-                                -1L,
-                                null,
-                                skipStore);
-
-                            // Mark as checked immediately for non-pessimistic.
-                            if (val != null && !pessimistic())
-                                txEntry.markValid();
-                        }
-
-                        break; // While loop.
-                    }
                     finally {
                         if (cacheCtx.isNear() && entry != null && readCommitted()) {
                             if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) {
@@ -1492,6 +1575,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys.
+     * @return Expiry policy.
+     */
+    protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+        return null;
+    }
+
+    /**
      * Adds skipped key.
      *
      * @param skipped Skipped set (possibly {@code null}).
@@ -1512,12 +1604,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /**
      * Loads all missed keys for
-     * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
+     * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
      *
      * @param cacheCtx Cache context.
      * @param map Return map.
      * @param missedMap Missed keys.
-     * @param redos Keys to retry.
      * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
      * @param keepCacheObjects Keep cache objects flag.
@@ -1528,55 +1619,25 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         final GridCacheContext cacheCtx,
         final Map<K, V> map,
         final Map<KeyCacheObject, GridCacheVersion> missedMap,
-        @Nullable final Collection<KeyCacheObject> redos,
         final boolean deserializePortable,
         final boolean skipVals,
         final boolean keepCacheObjects,
         final boolean skipStore
     ) {
-        assert redos != null || pessimistic();
-
         if (log.isDebugEnabled())
             log.debug("Loading missed values for missed map: " + missedMap);
 
-        final Collection<KeyCacheObject> loaded = new HashSet<>();
+        final boolean needReadVer = serializable() && optimistic();
 
         return new GridEmbeddedFuture<>(
-            new C2<Boolean, Exception, Map<K, V>>() {
-                @Override public Map<K, V> apply(Boolean b, Exception e) {
+            new C2<Void, Exception, Map<K, V>>() {
+                @Override public Map<K, V> apply(Void v, Exception e) {
                     if (e != null) {
                         setRollbackOnly();
 
                         throw new GridClosureException(e);
                     }
 
-                    if (!b && !readCommitted()) {
-                        // There is no store - we must mark the entries.
-                        for (KeyCacheObject key : missedMap.keySet()) {
-                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
-                            if (txEntry != null)
-                                txEntry.markValid();
-                        }
-                    }
-
-                    if (readCommitted()) {
-                        Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
-
-                        notFound.removeAll(loaded);
-
-                        // In read-committed mode touch entries that have just been read.
-                        for (KeyCacheObject key : notFound) {
-                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
-                            GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
-                                txEntry.cached();
-
-                            if (entry != null)
-                                cacheCtx.evicts().touch(entry, topologyVersion());
-                        }
-                    }
-
                     return map;
                 }
             },
@@ -1585,13 +1646,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 !skipStore,
                 false,
                 missedMap.keySet(),
-                deserializePortable,
                 skipVals,
-                new CI2<KeyCacheObject, Object>() {
-                    /** */
-                    private GridCacheVersion nextVer;
-
-                    @Override public void apply(KeyCacheObject key, Object val) {
+                needReadVer,
+                new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+                    @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
                         if (isRollbackOnly()) {
                             if (log.isDebugEnabled())
                                 log.debug("Ignoring loaded value for read because transaction was rolled back: " +
@@ -1600,15 +1658,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             return;
                         }
 
-                        GridCacheVersion ver = missedMap.get(key);
-
-                        if (ver == null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
-
-                            return;
-                        }
-
                         CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
                         CacheObject visibleVal = cacheVal;
@@ -1625,90 +1674,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 visibleVal = txEntry.applyEntryProcessors(visibleVal);
                         }
 
-                        // In pessimistic mode we hold the lock, so filter validation
-                        // should always be valid.
-                        if (pessimistic())
-                            ver = null;
-
-                        // Initialize next version.
-                        if (nextVer == null)
-                            nextVer = cctx.versions().next(topologyVersion());
-
-                        while (true) {
-                            assert txEntry != null || readCommitted() || skipVals;
-
-                            GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
-
-                            try {
-                                // Must initialize to true since even if filter didn't pass,
-                                // we still record the transaction value.
-                                boolean set;
-
-                                try {
-                                    set = e.versionedValue(cacheVal, ver, nextVer);
-                                }
-                                catch (GridCacheEntryRemovedException ignore) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Got removed entry in transaction getAll method " +
-                                            "(will try again): " + e);
-
-                                    if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
-                                        U.error(log, "Inconsistent transaction state (entry got removed while " +
-                                            "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
-
-                                        setRollbackOnly();
-
-                                        return;
-                                    }
-
-                                    if (txEntry != null)
-                                        txEntry.cached(entryEx(cacheCtx, txKey));
-
-                                    continue; // While loop.
-                                }
-
-                                // In pessimistic mode, we should always be able to set.
-                                assert set || !pessimistic();
-
-                                if (readCommitted() || skipVals) {
-                                    cacheCtx.evicts().touch(e, topologyVersion());
+                        assert txEntry != null || readCommitted() || skipVals;
 
-                                    if (visibleVal != null) {
-                                        cacheCtx.addResult(map,
-                                            key,
-                                            visibleVal,
-                                            skipVals,
-                                            keepCacheObjects,
-                                            deserializePortable,
-                                            false);
-                                    }
-                                }
-                                else {
-                                    assert txEntry != null;
+                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
 
-                                    txEntry.setAndMarkValid(cacheVal);
+                        if (readCommitted() || skipVals) {
+                            cacheCtx.evicts().touch(e, topologyVersion());
 
-                                    if (visibleVal != null) {
-                                        cacheCtx.addResult(map,
-                                            key,
-                                            visibleVal,
-                                            skipVals,
-                                            keepCacheObjects,
-                                            deserializePortable,
-                                            false);
-                                    }
-                                }
+                            if (visibleVal != null) {
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
+                            }
+                        }
+                        else {
+                            assert txEntry != null;
 
-                                loaded.add(key);
+                            txEntry.setAndMarkValid(cacheVal);
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Set value loaded from store into entry from transaction [set=" + set +
-                                        ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+                            if (needReadVer) {
+                                assert loadVer != null;
 
-                                break; // While loop.
+                                txEntry.serializableReadVersion(loadVer);
                             }
-                            catch (IgniteCheckedException ex) {
-                                throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+
+                            if (visibleVal != null) {
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                         }
                     }
@@ -1720,7 +1721,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
         Collection<KeyCacheObject> keys,
-        @Nullable GridCacheEntryEx cached,
         final boolean deserializePortable,
         final boolean skipVals,
         final boolean keepCacheObjects,
@@ -1747,7 +1747,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
             final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
                 keys,
-                cached,
                 expiryPlc,
                 retMap,
                 missed,
@@ -1850,20 +1849,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                                     txEntry.cached(entryEx(cacheCtx, txKey));
                                 }
-                                catch (GridCacheFilterFailedException e) {
-                                    // Failed value for the filter.
-                                    CacheObject val = e.value();
-
-                                    if (val != null) {
-                                        // If filter fails after lock is acquired, we don't reload,
-                                        // regardless if value is null or not.
-                                        missed.remove(cacheKey);
-
-                                        txEntry.setAndMarkValid(val);
-                                    }
-
-                                    break; // While.
-                                }
                             }
                         }
 
@@ -1871,7 +1856,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             return checkMissed(cacheCtx,
                                 retMap,
                                 missed,
-                                null,
                                 deserializePortable,
                                 skipVals,
                                 keepCacheObjects,
@@ -1920,8 +1904,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             else {
                 assert optimistic() || readCommitted() || skipVals;
 
-                final Collection<KeyCacheObject> redos = new ArrayList<>();
-
                 if (!missed.isEmpty()) {
                     if (!readCommitted())
                         for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
@@ -1937,67 +1919,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     if (missed.isEmpty())
                         return new GridFinishedFuture<>(retMap);
 
-                    IgniteInternalFuture<Map<K, V>> fut0 = checkMissed(cacheCtx,
+                    return checkMissed(cacheCtx,
                         retMap,
                         missed,
-                        redos,
                         deserializePortable,
                         skipVals,
                         keepCacheObjects,
                         skipStore);
-
-                    return new GridEmbeddedFuture<>(
-                        // First future.
-                        fut0,
-                        // Closure that returns another future, based on result from first.
-                        new PMC<Map<K, V>>() {
-                            @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) {
-                                if (redos.isEmpty())
-                                    return new GridFinishedFuture<>(
-                                        Collections.<K, V>emptyMap());
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Starting to future-recursively get values for keys: " + redos);
-
-                                // Future recursion.
-                                return getAllAsync(cacheCtx,
-                                    redos,
-                                    null,
-                                    deserializePortable,
-                                    skipVals,
-                                    true,
-                                    skipStore);
-                            }
-                        },
-                        // Finalize.
-                        new FinishClosure<Map<K, V>>() {
-                            @Override Map<K, V> finish(Map<K, V> loaded) {
-                                for (Map.Entry<K, V> entry : loaded.entrySet()) {
-                                    KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey();
-
-                                    IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey));
-
-                                    CacheObject val = (CacheObject)entry.getValue();
-
-                                    if (!readCommitted())
-                                        txEntry.readValue(val);
-
-                                    if (!F.isEmpty(txEntry.entryProcessors()))
-                                        val = txEntry.applyEntryProcessors(val);
-
-                                    cacheCtx.addResult(retMap,
-                                        cacheKey,
-                                        val,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializePortable,
-                                        false);
-                                }
-
-                                return retMap;
-                            }
-                        }
-                    );
                 }
 
                 return new GridFinishedFuture<>(retMap);
@@ -2016,8 +1944,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         GridCacheContext cacheCtx,
         Map<? extends K, ? extends V> map,
         boolean retval,
-        @Nullable GridCacheEntryEx cached,
-        long ttl,
         CacheEntryPredicate[] filter
     ) {
         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
@@ -2026,7 +1952,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             null,
             null,
             retval,
-            cached,
             filter);
     }
 
@@ -2041,7 +1966,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             null,
             drMap,
             false,
-            null,
             null);
     }
 
@@ -2058,7 +1982,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             invokeArgs,
             null,
             true,
-            null,
             null);
     }
 
@@ -2067,20 +1990,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         GridCacheContext cacheCtx,
         Map<KeyCacheObject, GridCacheVersion> drMap
     ) {
-        return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
+        return removeAllAsync0(cacheCtx, null, drMap, false, null, false);
     }
 
     /**
      * Checks filter for non-pessimistic transactions.
      *
-     * @param cached Cached entry.
+     * @param cctx Cache context.
+     * @param key Key.
+     * @param val Value.
      * @param filter Filter to check.
      * @return {@code True} if passed or pessimistic.
-     * @throws IgniteCheckedException If failed.
      */
-    private <K, V> boolean filter(GridCacheEntryEx cached,
-        CacheEntryPredicate[] filter) throws IgniteCheckedException {
-        return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
+    private boolean filter(
+        GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        CacheEntryPredicate[] filter) {
+        return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
     }
 
     /**
@@ -2088,7 +2015,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      *
      * @param cacheCtx Cache context.
      * @param keys Keys to enlist.
-     * @param cached Cached entry.
      * @param expiryPlc Explicitly specified expiry policy for entry.
      * @param implicit Implicit flag.
      * @param lookup Value lookup map ({@code null} for remove).
@@ -2102,28 +2028,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param drPutMap DR put map (optional).
      * @param drRmvMap DR remove map (optional).
      * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
      * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
      */
-    protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+    private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
         final GridCacheContext cacheCtx,
         Collection<?> keys,
-        @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         boolean implicit,
         @Nullable Map<?, ?> lookup,
         @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
         @Nullable Object[] invokeArgs,
-        boolean retval,
+        final boolean retval,
         boolean lockOnly,
-        CacheEntryPredicate[] filter,
+        final CacheEntryPredicate[] filter,
         final GridCacheReturn ret,
         Collection<KeyCacheObject> enlisted,
         @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
         @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
-        boolean skipStore
+        boolean skipStore,
+        final boolean singleRmv
     ) {
-        assert cached == null || keys.size() == 1;
-        assert cached == null || F.first(keys).equals(cached.key());
+        assert retval || invokeMap == null;
 
         try {
             addActiveCache(cacheCtx);
@@ -2138,6 +2064,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         Set<KeyCacheObject> missedForLoad = null;
 
+        final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+        final boolean needVal = singleRmv || retval || hasFilters;
+        final boolean needReadVer = needVal && (serializable() && optimistic());
+
         try {
             // Set transform flag for transaction.
             if (invokeMap != null)
@@ -2194,19 +2124,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 // First time access.
                 if (txEntry == null) {
                     while (true) {
-                        GridCacheEntryEx entry = null;
+                        GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
 
                         try {
-                            if (cached != null) {
-                                entry = cached;
-
-                                cached = null;
-                            }
-                            else {
-                                entry = entryEx(cacheCtx, txKey, topologyVersion());
-
-                                entry.unswap(false);
-                            }
+                            entry.unswap(false);
 
                             // Check if lock is being explicitly acquired by the same thread.
                             if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
@@ -2217,45 +2138,57 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     ", locNodeId=" + cctx.localNodeId() + ']');
 
                             CacheObject old = null;
-
-                            boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+                            GridCacheVersion readVer = null;
 
                             if (optimistic() && !implicit()) {
                                 try {
-                                    // Should read through if filter is specified.
-                                    old = entry.innerGet(this,
-                                        /*swap*/false,
-                                        /*read-through*/readThrough && cacheCtx.loadPreviousValue(),
-                                        /*fail-fast*/false,
-                                        /*unmarshal*/retval,
-                                        /*metrics*/retval,
-                                        /*events*/retval,
-                                        /*temporary*/false,
-                                        CU.subjectId(this, cctx),
-                                        entryProcessor,
-                                        resolveTaskName(),
-                                        null);
+                                    if (needReadVer) {
+                                        T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+                                            entry.innerGetVersioned(this,
+                                                /*swap*/false,
+                                                /*unmarshal*/retval,
+                                                /*metrics*/retval,
+                                                /*events*/retval,
+                                                CU.subjectId(this, cctx),
+                                                entryProcessor,
+                                                resolveTaskName(),
+                                                null) : null;
+
+                                        if (res != null) {
+                                            old = res.get1();
+                                            readVer = res.get2();
+                                        }
+                                    }
+                                    else {
+                                        old = entry.innerGet(this,
+                                            /*swap*/false,
+                                            /*read-through*/false,
+                                            /*fail-fast*/false,
+                                            /*unmarshal*/retval,
+                                            /*metrics*/retval,
+                                            /*events*/retval,
+                                            /*temporary*/false,
+                                            CU.subjectId(this, cctx),
+                                            entryProcessor,
+                                            resolveTaskName(),
+                                            null);
+                                    }
                                 }
                                 catch (ClusterTopologyCheckedException e) {
                                     entry.context().evicts().touch(entry, topologyVersion());
 
                                     throw e;
                                 }
-                                catch (GridCacheFilterFailedException e) {
-                                    e.printStackTrace();
-
-                                    assert false : "Empty filter failed: " + e;
-                                }
                             }
                             else
                                 old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
 
-                            if (!filter(entry, filter)) {
+                            if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
                                 skipped = skip(skipped, cacheKey);
 
                                 ret.set(cacheCtx, old, false);
 
-                                if (!readCommitted() && old != null) {
+                                if (!readCommitted()) {
                                     // Enlist failed filters as reads for non-read-committed mode,
                                     // so future ops will get the same values.
                                     txEntry = addEntry(READ,
@@ -2272,9 +2205,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                         skipStore);
 
                                     txEntry.markValid();
+
+                                    if (needReadVer) {
+                                        assert readVer != null;
+
+                                        txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                                    }
                                 }
 
-                                if (readCommitted() || old == null)
+                                if (readCommitted())
                                     cacheCtx.evicts().touch(entry, topologyVersion());
 
                                 break; // While.
@@ -2305,9 +2244,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                 txEntry.markValid();
 
                                 if (old == null) {
-                                    boolean load = retval && !readThrough;
-
-                                    if (load) {
+                                    if (needVal) {
                                         if (missedForLoad == null)
                                             missedForLoad = new HashSet<>();
 
@@ -2324,6 +2261,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     }
                                 }
                                 else {
+                                    if (needReadVer) {
+                                        assert readVer != null;
+
+                                        txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                                    }
+
                                     if (retval && !transform)
                                         ret.set(cacheCtx, old, true);
                                     else {
@@ -2369,7 +2312,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 else {
                     if (entryProcessor == null && txEntry.op() == TRANSFORM)
                         throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
-                            "transaction after transform closure is applied): " + key);
+                            "transaction after EntryProcessor is applied): " + key);
 
                     GridCacheEntryEx entry = txEntry.cached();
 
@@ -2378,7 +2321,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     boolean del = txEntry.op() == DELETE && rmv;
 
                     if (!del) {
-                        if (!filter(entry, filter)) {
+                        if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
                             skipped = skip(skipped, cacheKey);
 
                             ret.set(cacheCtx, v, false);
@@ -2439,15 +2382,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         }
 
         if (missedForLoad != null) {
-            IgniteInternalFuture<Boolean> fut = loadMissing(
+            final boolean skipVals = singleRmv;
+
+            IgniteInternalFuture<Void> fut = loadMissing(
                 cacheCtx,
                 /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
                 /*async*/true,
                 missedForLoad,
-                deserializePortables(cacheCtx),
-                /*skip values*/false,
-                new CI2<KeyCacheObject, Object>() {
-                    @Override public void apply(KeyCacheObject key, Object val) {
+                skipVals,
+                needReadVer,
+                new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+                    @Override public void apply(KeyCacheObject key,
+                        @Nullable Object val,
+                        @Nullable GridCacheVersion loadVer) {
                         if (log.isDebugEnabled())
                             log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
 
@@ -2455,33 +2402,50 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                         assert e != null;
 
-                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
+                        if (needReadVer) {
+                            assert loadVer != null;
 
-                        if (e.op() == TRANSFORM) {
-                            GridCacheVersion ver;
+                            e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+                        }
 
-                            try {
-                                ver = e.cached().version();
-                            }
-                            catch (GridCacheEntryRemovedException ex) {
-                                assert optimistic() : e;
+                        if (singleRmv) {
+                            assert !hasFilters && !retval;
+                            assert val == null || Boolean.TRUE.equals(val) : val;
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+                            ret.set(cacheCtx, null, val != null);
+                        }
+                        else {
+                            CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
-                                ver = null;
+                            if (e.op() == TRANSFORM) {
+                                GridCacheVersion ver;
+
+                                try {
+                                    ver = e.cached().version();
+                                }
+                                catch (GridCacheEntryRemovedException ex) {
+                                    assert optimistic() : e;
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+                                    ver = null;
+                                }
+
+                                addInvokeResult(e, cacheVal, ret, ver);
                             }
+                            else {
+                                boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
 
-                            addInvokeResult(e, cacheVal, ret, ver);
+                                ret.set(cacheCtx, cacheVal, success);
+                            }
                         }
-                        else
-                            ret.set(cacheCtx, cacheVal, true);
                     }
                 });
 
             return new GridEmbeddedFuture<>(
-                new C2<Boolean, Exception, Set<KeyCacheObject>>() {
-                    @Override public Set<KeyCacheObject> apply(Boolean b, Exception e) {
+                new C2<Void, Exception, Set<KeyCacheObject>>() {
+                    @Override public Set<KeyCacheObject> apply(Void b, Exception e) {
                         if (e != null)
                             throw new GridClosureException(e);
 
@@ -2495,6 +2459,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cctx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param filter Filter.
+     * @return {@code True} if filter passed.
+     */
+    private boolean isAll(GridCacheContext cctx,
+        KeyCacheObject key,
+        CacheObject val,
+        CacheEntryPredicate[] filter) {
+        GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+            @Nullable @Override public CacheObject peekVisibleValue() {
+                return rawGet();
+            }
+        };
+
+        for (CacheEntryPredicate p0 : filter) {
+            if (p0 != null && !p0.apply(e))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * Post lock processing for put or remove.
      *
      * @param cacheCtx Context.
@@ -2555,29 +2544,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                     if (retval || invoke) {
                         if (!cacheCtx.isNear()) {
-                            try {
-                                if (!hasPrevVal) {
-                                    boolean readThrough =
-                                        (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
-
-                                    v = cached.innerGet(this,
-                                        /*swap*/true,
-                                        readThrough,
-                                        /*failFast*/false,
-                                        /*unmarshal*/true,
-                                        /*metrics*/!invoke,
-                                        /*event*/!invoke && !dht(),
-                                        /*temporary*/false,
-                                        CU.subjectId(this, cctx),
-                                        null,
-                                        resolveTaskName(),
-                                        null);
-                                }
-                            }
-                            catch (GridCacheFilterFailedException e) {
-                                e.printStackTrace();
-
-                                assert false : "Empty filter failed: " + e;
+                            if (!hasPrevVal) {
+                                boolean readThrough =
+                                    (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+
+                                v = cached.innerGet(this,
+                                    /*swap*/true,
+                                    readThrough,
+                                    /*failFast*/false,
+                                    /*unmarshal*/true,
+                                    /*metrics*/!invoke,
+                                    /*event*/!invoke && !dht(),
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    null,
+                                    resolveTaskName(),
+                                    null);
                             }
                         }
                         else {
@@ -2725,7 +2707,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param invokeArgs Optional arguments for EntryProcessor.
      * @param drMap DR map.
      * @param retval Key-transform value map to store.
-     * @param cached Cached entry, if any.
      * @param filter Filter.
      * @return Operation future.
      */
@@ -2737,7 +2718,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         @Nullable final Object[] invokeArgs,
         @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
         final boolean retval,
-        @Nullable GridCacheEntryEx cached,
         @Nullable final CacheEntryPredicate[] filter
     ) {
         assert filter == null || invokeMap == null;
@@ -2778,8 +2758,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
 
         assert map0 != null || invokeMap0 != null;
-        assert cached == null ||
-            (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1);
 
         try {
             checkValid();
@@ -2814,7 +2792,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
                 cacheCtx,
                 keySet,
-                cached,
                 opCtx != null ? opCtx.expiry() : null,
                 implicit,
                 map0,
@@ -2827,7 +2804,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 enlisted,
                 drMap,
                 null,
-                opCtx != null && opCtx.skipStore());
+                opCtx != null && opCtx.skipStore(),
+                false);
 
             if (pessimistic()) {
                 // Loose all skipped.
@@ -2921,8 +2899,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 }
                 else
                     return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
-                        @Override
-                        public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
+                        @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
                             f.get();
 
                             return ret;
@@ -2951,11 +2928,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
         GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx cached,
         boolean retval,
-        CacheEntryPredicate[] filter
+        CacheEntryPredicate[] filter,
+        boolean singleRmv
     ) {
-        return removeAllAsync0(cacheCtx, keys, null, cached, retval, filter);
+        return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv);
     }
 
     /**
@@ -2963,8 +2940,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param keys Keys to remove.
      * @param drMap DR map.
      * @param retval Flag indicating whether a value should be returned.
-     * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1.
      * @param filter Filter.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
      * @return Future for asynchronous remove.
      */
     @SuppressWarnings("unchecked")
@@ -2972,9 +2949,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         final GridCacheContext cacheCtx,
         @Nullable final Collection<? extends K> keys,
         @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
-        @Nullable GridCacheEntryEx cached,
         final boolean retval,
-        @Nullable final CacheEntryPredicate[] filter) {
+        @Nullable final CacheEntryPredicate[] filter,
+        boolean singleRmv) {
         try {
             checkUpdatesAllowed(cacheCtx);
         }
@@ -2998,7 +2975,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             keys0 = keys;
 
         assert keys0 != null;
-        assert cached == null || keys0.size() == 1;
 
         if (log.isDebugEnabled())
             log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit +
@@ -3043,7 +3019,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
                 cacheCtx,
                 keys0,
-                /** cached entry */null,
                 plc,
                 implicit,
                 /** lookup map */null,
@@ -3056,7 +3031,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 enlisted,
                 null,
                 drMap,
-                opCtx != null && opCtx.skipStore()
+                opCtx != null && opCtx.skipStore(),
+                singleRmv
             );
 
             if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 6f72290..0d83338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -19,17 +19,17 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
 import java.util.Map;
+import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -64,8 +64,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys to get.
-     * @param cached Cached entry if this method is called from entry wrapper
-     *      Cached entry is passed if and only if there is only one key in collection of keys.
      * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
      * @param keepCacheObjects Keep cache objects
@@ -75,7 +73,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         GridCacheContext cacheCtx,
         Collection<KeyCacheObject> keys,
-        @Nullable GridCacheEntryEx cached,
         boolean deserializePortable,
         boolean skipVals,
         boolean keepCacheObjects,
@@ -85,17 +82,13 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param cacheCtx Cache context.
      * @param map Map to put.
      * @param retval Flag indicating whether a value should be returned.
-     * @param cached Cached entry, if any. Will be provided only if map has size 1.
      * @param filter Filter.
-     * @param ttl Time to live for entry. If negative, leave unchanged.
      * @return Future for put operation.
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
         GridCacheContext cacheCtx,
         Map<? extends K, ? extends V> map,
         boolean retval,
-        @Nullable GridCacheEntryEx cached,
-        long ttl,
         CacheEntryPredicate[] filter);
 
     /**
@@ -113,16 +106,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param cacheCtx Cache context.
      * @param keys Keys to remove.
      * @param retval Flag indicating whether a value should be returned.
-     * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1.
      * @param filter Filter.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
      * @return Future for asynchronous remove.
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
         GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx cached,
         boolean retval,
-        CacheEntryPredicate[] filter);
+        CacheEntryPredicate[] filter,
+        boolean singleRmv);
 
     /**
      * @param cacheCtx Cache context.
@@ -161,17 +154,17 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param readThrough Read through flag.
      * @param async if {@code True}, then loading will happen in a separate thread.
      * @param keys Keys.
-     * @param c Closure.
-     * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
+     * @param needVer If {@code true} version is required for loaded values.
+     * @param c Closure to be applied for loaded values.
      * @return Future with {@code True} value if loading took place.
      */
-    public IgniteInternalFuture<Boolean> loadMissing(
+    public IgniteInternalFuture<Void> loadMissing(
         GridCacheContext cacheCtx,
         boolean readThrough,
         boolean async,
         Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
-        IgniteBiInClosure<KeyCacheObject, Object> c);
+        boolean needVer,
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
 }
\ No newline at end of file


Mime
View raw message