ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 'Single' operations optimizations for tx cache.
Date Fri, 13 Nov 2015 11:27:52 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-tx [created] f09d09fe5


'Single' operations optimizations for tx cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f09d09fe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f09d09fe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f09d09fe

Branch: refs/heads/ignite-single-op-tx
Commit: f09d09fe5e6f29d4bd10090db6247f219f780954
Parents: 5887ae4
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 13 14:10:44 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 13 14:10:44 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |    2 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |    7 +-
 .../transactions/IgniteTxLocalAdapter.java      | 1225 +++++++++++-------
 .../cache/transactions/IgniteTxLocalEx.java     |   15 +
 .../cache/transactions/IgniteTxMap.java         |    3 +-
 5 files changed, 765 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 419ccec..49ca1dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1948,7 +1948,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         Boolean stored = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return tx.putAllAsync(ctx, F.t(key, val), false, filter).get().success();
+                return tx.putAsync(ctx, key, val, false, filter).get().success();
             }
 
             @Override public String toString() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 6de8795..0869b90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -705,7 +705,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                 passedKeys,
                 read,
                 needRetVal,
-                skipped,
                 accessTtl,
                 null,
                 skipStore);
@@ -723,7 +722,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param passedKeys Passed keys.
      * @param read {@code True} if read.
      * @param needRetVal Return value flag.
-     * @param skipped Skipped keys.
      * @param accessTtl TTL for read operation.
      * @param filter Entry write filter.
      * @param skipStore Skip store flag.
@@ -735,13 +733,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         final Collection<KeyCacheObject> passedKeys,
         final boolean read,
         final boolean needRetVal,
-        final Set<KeyCacheObject> skipped,
         final long accessTtl,
         @Nullable final CacheEntryPredicate[] filter,
         boolean skipStore) {
         if (log.isDebugEnabled())
-            log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" +
-                skipped + ']');
+            log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']');
 
         if (passedKeys.isEmpty())
             return new GridFinishedFuture<>(ret);
@@ -768,7 +764,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
                     postLockWrite(cacheCtx,
                         passedKeys,
-                        skipped,
                         ret,
                         /*remove*/false,
                         /*retval*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2c7bf8a..ada2538 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1582,25 +1582,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
-     * Adds skipped key.
-     *
-     * @param skipped Skipped set (possibly {@code null}).
-     * @param key Key to add.
-     * @return Skipped set.
-     */
-    private Set<KeyCacheObject> skip(Set<KeyCacheObject> skipped, KeyCacheObject key) {
-        if (skipped == null)
-            skipped = new GridLeanSet<>();
-
-        skipped.add(key);
-
-        if (log.isDebugEnabled())
-            log.debug("Added key to skipped set: " + key);
-
-        return skipped;
-    }
-
-    /**
      * Loads all missed keys for
      * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
      *
@@ -1954,6 +1935,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+        GridCacheContext cacheCtx,
+        K key,
+        V val,
+        boolean retval,
+        CacheEntryPredicate[] filter) {
+        return putAsync0(cacheCtx, key, val, retval, filter);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> putAllDrAsync(
         GridCacheContext cacheCtx,
         Map<KeyCacheObject, GridCacheDrInfo> drMap
@@ -2009,12 +2000,85 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
+     * @param cacheKey Key to enlist.
+     * @param val Value.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param entryProcessor Entry processor (for invoke operation).
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param retval Flag indicating whether a value should be returned.
+     * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+     * @param filter User filters.
+     * @param ret Return value.
+     * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @return Future for entry values loading.
+     */
+    private <K, V> IgniteInternalFuture<Void> enlistWrite(
+        final GridCacheContext cacheCtx,
+        KeyCacheObject cacheKey,
+        Object val,
+        @Nullable ExpiryPolicy expiryPlc,
+        @Nullable EntryProcessor<K, V, Object> entryProcessor,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        boolean lockOnly,
+        final CacheEntryPredicate[] filter,
+        final GridCacheReturn ret,
+        boolean skipStore,
+        final boolean singleRmv) {
+        try {
+            addActiveCache(cacheCtx);
+
+            final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+            final boolean needVal = singleRmv || retval || hasFilters;
+            final boolean needReadVer = needVal && (serializable() && optimistic());
+
+            boolean loadMissed = enlistWriteEntry(cacheCtx,
+                cacheKey,
+                val,
+                entryProcessor,
+                invokeArgs,
+                expiryPlc,
+                retval,
+                lockOnly,
+                filter,
+                /*drVer*/null,
+                /*drTtl*/-1L,
+                /*drExpireTime*/-1L,
+                ret,
+                /*enlisted*/null,
+                skipStore,
+                singleRmv,
+                hasFilters,
+                needVal,
+                needReadVer);
+
+            if (loadMissed) {
+                return loadMissing(cacheCtx,
+                    Collections.singleton(cacheKey),
+                    filter,
+                    ret,
+                    needReadVer,
+                    singleRmv,
+                    hasFilters,
+                    skipStore,
+                    retval);
+            }
+
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
      * Internal routine for <tt>putAll(..)</tt>
      *
      * @param cacheCtx Cache context.
      * @param keys Keys to enlist.
      * @param expiryPlc Explicitly specified expiry policy for entry.
-     * @param implicit Implicit flag.
      * @param lookup Value lookup map ({@code null} for remove).
      * @param invokeMap Map with entry processors for invoke operation.
      * @param invokeArgs Optional arguments for EntryProcessor.
@@ -2027,13 +2091,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param drRmvMap DR remove map (optional).
      * @param skipStore Skip store flag.
      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
-     * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
+     * @return Future for missing values loading.
      */
-    private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+    private <K, V> IgniteInternalFuture<Void> enlistWrite(
         final GridCacheContext cacheCtx,
         Collection<?> keys,
         @Nullable ExpiryPolicy expiryPlc,
-        boolean implicit,
         @Nullable Map<?, ?> lookup,
         @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
         @Nullable Object[] invokeArgs,
@@ -2056,8 +2119,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             return new GridFinishedFuture<>(e);
         }
 
-        Set<KeyCacheObject> skipped = null;
-
         boolean rmv = lookup == null && invokeMap == null;
 
         Set<KeyCacheObject> missedForLoad = null;
@@ -2115,345 +2176,441 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
                 KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
 
-                IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
-                IgniteTxEntry txEntry = entry(txKey);
-
-                // First time access.
-                if (txEntry == null) {
-                    while (true) {
-                        GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
-
-                        try {
-                            entry.unswap(false);
-
-                            // Check if lock is being explicitly acquired by the same thread.
-                            if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
-                                entry.lockedByThread(threadId, xidVer))
-                                throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
-                                    "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
-                                    ", threadId=" + threadId +
-                                    ", locNodeId=" + cctx.localNodeId() + ']');
-
-                            CacheObject old = null;
-                            GridCacheVersion readVer = null;
+                boolean loadMissed = enlistWriteEntry(cacheCtx,
+                    cacheKey,
+                    val,
+                    entryProcessor,
+                    invokeArgs,
+                    expiryPlc,
+                    retval,
+                    lockOnly,
+                    filter,
+                    drVer,
+                    drTtl,
+                    drExpireTime,
+                    ret,
+                    enlisted,
+                    skipStore,
+                    singleRmv,
+                    hasFilters,
+                    needVal,
+                    needReadVer);
+
+                if (loadMissed) {
+                    if (missedForLoad == null)
+                        missedForLoad = new HashSet<>();
+
+                    missedForLoad.add(cacheKey);
+                }
+            }
 
-                            if (optimistic() && !implicit()) {
-                                try {
-                                    if (needReadVer) {
-                                        T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
-                                            entry.innerGetVersioned(this,
-                                                /*swap*/false,
-                                                /*unmarshal*/retval,
-                                                /*metrics*/retval,
-                                                /*events*/retval,
-                                                CU.subjectId(this, cctx),
-                                                entryProcessor,
-                                                resolveTaskName(),
-                                                null) : null;
+            if (missedForLoad != null) {
+                return loadMissing(cacheCtx,
+                    missedForLoad,
+                    filter,
+                    ret,
+                    needReadVer,
+                    singleRmv,
+                    hasFilters,
+                    skipStore,
+                    retval);
+            }
 
-                                        if (res != null) {
-                                            old = res.get1();
-                                            readVer = res.get2();
-                                        }
-                                    }
-                                    else {
-                                        old = entry.innerGet(this,
-                                            /*swap*/false,
-                                            /*read-through*/false,
-                                            /*fail-fast*/false,
-                                            /*unmarshal*/retval,
-                                            /*metrics*/retval,
-                                            /*events*/retval,
-                                            /*temporary*/false,
-                                            CU.subjectId(this, cctx),
-                                            entryProcessor,
-                                            resolveTaskName(),
-                                            null);
-                                    }
-                                }
-                                catch (ClusterTopologyCheckedException e) {
-                                    entry.context().evicts().touch(entry, topologyVersion());
+            return new GridFinishedFuture<>();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
 
-                                    throw e;
-                                }
-                            }
-                            else
-                                old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+    /**
+     * @param cacheCtx Cache context.
+     * @param keys Keys to load.
+     * @param ret Return value.
+     * @param needReadVer Read version flag.
+     * @param singleRmv {@code True} for single remove operation.
+     * @param hasFilters {@code True} if filters not empty.
+     * @param skipStore Skip store flag.
+     * @param retval Return value flag.
+     * @return Load future.
+     */
+    private IgniteInternalFuture<Void> loadMissing(
+        final GridCacheContext cacheCtx,
+        final Set<KeyCacheObject> keys,
+        final CacheEntryPredicate[] filter,
+        final GridCacheReturn ret,
+        final boolean needReadVer,
+        final boolean singleRmv,
+        final boolean hasFilters,
+        final boolean skipStore,
+        final boolean retval) {
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+            new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+                @Override public void apply(KeyCacheObject key,
+                    @Nullable Object val,
+                    @Nullable GridCacheVersion loadVer) {
+                    if (log.isDebugEnabled())
+                        log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
 
-                            if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
-                                skipped = skip(skipped, cacheKey);
+                    IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
 
-                                ret.set(cacheCtx, old, false);
+                    assert e != null;
 
-                                if (!readCommitted()) {
-                                    // Enlist failed filters as reads for non-read-committed mode,
-                                    // so future ops will get the same values.
-                                    txEntry = addEntry(READ,
-                                        old,
-                                        null,
-                                        null,
-                                        entry,
-                                        null,
-                                        CU.empty0(),
-                                        false,
-                                        -1L,
-                                        -1L,
-                                        null,
-                                        skipStore);
+                    if (needReadVer) {
+                        assert loadVer != null;
 
-                                    txEntry.markValid();
+                        e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+                    }
 
-                                    if (needReadVer) {
-                                        assert readVer != null;
+                    if (singleRmv) {
+                        assert !hasFilters && !retval;
+                        assert val == null || Boolean.TRUE.equals(val) : val;
 
-                                        txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
-                                    }
-                                }
+                        ret.set(cacheCtx, null, val != null);
+                    }
+                    else {
+                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
 
-                                if (readCommitted())
-                                    cacheCtx.evicts().touch(entry, topologyVersion());
+                        if (e.op() == TRANSFORM) {
+                            GridCacheVersion ver;
 
-                                break; // While.
+                            try {
+                                ver = e.cached().version();
                             }
+                            catch (GridCacheEntryRemovedException ex) {
+                                assert optimistic() : e;
 
-                            final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
-                                entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
 
-                            txEntry = addEntry(op,
-                                cacheCtx.toCacheObject(val),
-                                entryProcessor,
-                                invokeArgs,
-                                entry,
-                                expiryPlc,
-                                filter,
-                                true,
-                                drTtl,
-                                drExpireTime,
-                                drVer,
-                                skipStore);
+                                ver = null;
+                            }
 
-                            if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
-                                cacheCtx.evicts().touch(entry, topologyVersion());
+                            addInvokeResult(e, cacheVal, ret, ver);
+                        }
+                        else {
+                            boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
 
-                            enlisted.add(cacheKey);
+                            ret.set(cacheCtx, cacheVal, success);
+                        }
+                    }
+                }
+            };
 
-                            if (!pessimistic() && !implicit()) {
-                                txEntry.markValid();
+        return loadMissing(
+            cacheCtx,
+            /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+            /*async*/true,
+            keys,
+            /*skipVals*/singleRmv,
+            needReadVer,
+            c);
+    }
 
-                                if (old == null) {
-                                    if (needVal) {
-                                        if (missedForLoad == null)
-                                            missedForLoad = new HashSet<>();
+    /**
+     * @param cacheCtx Cache context.
+     * @param cacheKey Key.
+     * @param val Value.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
+     * @param expiryPlc Explicitly specified expiry policy for entry.
+     * @param retval Return value flag.
+     * @param lockOnly
+     * @param filter Filter.
+     * @param drVer DR version.
+     * @param drTtl DR ttl.
+     * @param drExpireTime DR expire time.
+     * @param ret Return value.
+     * @param enlisted Enlisted keys collection.
+     * @param skipStore Skip store flag.
+     * @param singleRmv {@code True} for single remove operation.
+     * @param hasFilters {@code True} if filters not empty.
+     * @param needVal {@code True} if value is needed.
+     * @param needReadVer {@code True} if need read entry version.
+     * @return {@code True} if entry value should be loaded.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+        final KeyCacheObject cacheKey,
+        final @Nullable Object val,
+        final @Nullable EntryProcessor<?, ?, ?> entryProcessor,
+        final @Nullable Object[] invokeArgs,
+        final @Nullable ExpiryPolicy expiryPlc,
+        final boolean retval,
+        final boolean lockOnly,
+        final CacheEntryPredicate[] filter,
+        final GridCacheVersion drVer,
+        final long drTtl,
+        long drExpireTime,
+        final GridCacheReturn ret,
+        @Nullable final Collection<KeyCacheObject> enlisted,
+        boolean skipStore,
+        boolean singleRmv,
+        boolean hasFilters,
+        final boolean needVal,
+        boolean needReadVer
+    ) throws IgniteCheckedException {
+        boolean loadMissed = false;
 
-                                        missedForLoad.add(cacheKey);
-                                    }
-                                    else {
-                                        assert !implicit() || !transform : this;
-                                        assert txEntry.op() != TRANSFORM : txEntry;
+        final boolean rmv = val == null && entryProcessor == null;
 
-                                        if (retval)
-                                            ret.set(cacheCtx, null, true);
-                                        else
-                                            ret.success(true);
-                                    }
-                                }
-                                else {
-                                    if (needReadVer) {
-                                        assert readVer != null;
+        IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
 
-                                        txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
-                                    }
+        IgniteTxEntry txEntry = entry(txKey);
 
-                                    if (retval && !transform)
-                                        ret.set(cacheCtx, old, true);
-                                    else {
-                                        if (txEntry.op() == TRANSFORM) {
-                                            GridCacheVersion ver;
+        // First time access.
+        if (txEntry == null) {
+            while (true) {
+                GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
 
-                                            try {
-                                                ver = entry.version();
-                                            }
-                                            catch (GridCacheEntryRemovedException ex) {
-                                                assert optimistic() : txEntry;
+                try {
+                    entry.unswap(false);
+
+                    // Check if lock is being explicitly acquired by the same thread.
+                    if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+                        entry.lockedByThread(threadId, xidVer)) {
+                        throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+                            "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+                            ", entry=" + entry +
+                            ", xidVer=" + xidVer +
+                            ", threadId=" + threadId +
+                            ", locNodeId=" + cctx.localNodeId() + ']');
+                    }
 
-                                                if (log.isDebugEnabled())
-                                                    log.debug("Failed to get entry version " +
-                                                        "[err=" + ex.getMessage() + ']');
+                    CacheObject old = null;
+                    GridCacheVersion readVer = null;
 
-                                                ver = null;
-                                            }
+                    if (optimistic() && !implicit()) {
+                        try {
+                            if (needReadVer) {
+                                T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+                                    entry.innerGetVersioned(this,
+                                        /*swap*/false,
+                                        /*unmarshal*/retval,
+                                        /*metrics*/retval,
+                                        /*events*/retval,
+                                        CU.subjectId(this, cctx),
+                                        entryProcessor,
+                                        resolveTaskName(),
+                                        null) : null;
 
-                                            addInvokeResult(txEntry, old, ret, ver);
-                                        }
-                                        else
-                                            ret.success(true);
-                                    }
+                                if (res != null) {
+                                    old = res.get1();
+                                    readVer = res.get2();
                                 }
                             }
-                            // Pessimistic.
                             else {
-                                if (retval && !transform)
-                                    ret.set(cacheCtx, old, true);
-                                else
-                                    ret.success(true);
+                                old = entry.innerGet(this,
+                                    /*swap*/false,
+                                    /*read-through*/false,
+                                    /*fail-fast*/false,
+                                    /*unmarshal*/retval,
+                                    /*metrics*/retval,
+                                    /*events*/retval,
+                                    /*temporary*/false,
+                                    CU.subjectId(this, cctx),
+                                    entryProcessor,
+                                    resolveTaskName(),
+                                    null);
                             }
-
-                            break; // While.
                         }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            if (log.isDebugEnabled())
-                                log.debug("Got removed entry in transaction putAll0 method: " + entry);
+                        catch (ClusterTopologyCheckedException e) {
+                            entry.context().evicts().touch(entry, topologyVersion());
+
+                            throw e;
                         }
                     }
-                }
-                else {
-                    if (entryProcessor == null && txEntry.op() == TRANSFORM)
-                        throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
-                            "transaction after EntryProcessor is applied): " + key);
+                    else
+                        old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
 
-                    GridCacheEntryEx entry = txEntry.cached();
+                    if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+                        ret.set(cacheCtx, old, false);
 
-                    CacheObject v = txEntry.value();
-
-                    boolean del = txEntry.op() == DELETE && rmv;
+                        if (!readCommitted()) {
+                            // Enlist failed filters as reads for non-read-committed mode,
+                            // so future ops will get the same values.
+                            txEntry = addEntry(READ,
+                                old,
+                                null,
+                                null,
+                                entry,
+                                null,
+                                CU.empty0(),
+                                false,
+                                -1L,
+                                -1L,
+                                null,
+                                skipStore);
 
-                    if (!del) {
-                        if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
-                            skipped = skip(skipped, cacheKey);
+                            txEntry.markValid();
 
-                            ret.set(cacheCtx, v, false);
+                            if (needReadVer) {
+                                assert readVer != null;
 
-                            continue;
+                                txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                            }
                         }
 
-                        GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
-                            v != null ? UPDATE : CREATE;
+                        if (readCommitted())
+                            cacheCtx.evicts().touch(entry, topologyVersion());
 
-                        txEntry = addEntry(op,
-                            cacheCtx.toCacheObject(val),
-                            entryProcessor,
-                            invokeArgs,
-                            entry,
-                            expiryPlc,
-                            filter,
-                            true,
-                            drTtl,
-                            drExpireTime,
-                            drVer,
-                            skipStore);
+                        break; // While.
+                    }
 
+                    final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                        entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+                    txEntry = addEntry(op,
+                        cacheCtx.toCacheObject(val),
+                        entryProcessor,
+                        invokeArgs,
+                        entry,
+                        expiryPlc,
+                        filter,
+                        true,
+                        drTtl,
+                        drExpireTime,
+                        drVer,
+                        skipStore);
+
+                    if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+                        cacheCtx.evicts().touch(entry, topologyVersion());
+
+                    if (enlisted != null)
                         enlisted.add(cacheKey);
 
-                        if (txEntry.op() == TRANSFORM) {
-                            GridCacheVersion ver;
+                    if (!pessimistic() && !implicit()) {
+                        txEntry.markValid();
 
-                            try {
-                                ver = entry.version();
-                            }
-                            catch (GridCacheEntryRemovedException e) {
-                                assert optimistic() : txEntry;
+                        if (old == null) {
+                            if (needVal)
+                                loadMissed = true;
+                            else {
+                                assert !implicit() || !transform : this;
+                                assert txEntry.op() != TRANSFORM : txEntry;
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+                                if (retval)
+                                    ret.set(cacheCtx, null, true);
+                                else
+                                    ret.success(true);
+                            }
+                        }
+                        else {
+                            if (needReadVer) {
+                                assert readVer != null;
 
-                                ver = null;
+                                txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
                             }
 
-                            addInvokeResult(txEntry, txEntry.value(), ret, ver);
-                        }
-                    }
+                            if (retval && !transform)
+                                ret.set(cacheCtx, old, true);
+                            else {
+                                if (txEntry.op() == TRANSFORM) {
+                                    GridCacheVersion ver;
 
-                    if (!pessimistic()) {
-                        txEntry.markValid();
+                                    try {
+                                        ver = entry.version();
+                                    }
+                                    catch (GridCacheEntryRemovedException ex) {
+                                        assert optimistic() : txEntry;
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to get entry version " +
+                                                "[err=" + ex.getMessage() + ']');
 
+                                        ver = null;
+                                    }
+
+                                    addInvokeResult(txEntry, old, ret, ver);
+                                }
+                                else
+                                    ret.success(true);
+                            }
+                        }
+                    }
+                    // Pessimistic.
+                    else {
                         if (retval && !transform)
-                            ret.set(cacheCtx, v, true);
+                            ret.set(cacheCtx, old, true);
                         else
                             ret.success(true);
                     }
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry in transaction putAll0 method: " + entry);
                 }
             }
         }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
+        else {
+            if (entryProcessor == null && txEntry.op() == TRANSFORM)
+                throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+                    "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
 
-        if (missedForLoad != null) {
-            final boolean skipVals = singleRmv;
+            GridCacheEntryEx entry = txEntry.cached();
 
-            IgniteInternalFuture<Void> fut = loadMissing(
-                cacheCtx,
-                /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
-                /*async*/true,
-                missedForLoad,
-                skipVals,
-                needReadVer,
-                new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
-                    @Override public void apply(KeyCacheObject key,
-                        @Nullable Object val,
-                        @Nullable GridCacheVersion loadVer) {
-                        if (log.isDebugEnabled())
-                            log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+            CacheObject v = txEntry.value();
 
-                        IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+            boolean del = txEntry.op() == DELETE && rmv;
 
-                        assert e != null;
+            if (!del) {
+                if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+                    ret.set(cacheCtx, v, false);
 
-                        if (needReadVer) {
-                            assert loadVer != null;
+                    return loadMissed;
+                }
 
-                            e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
-                        }
+                GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+                    v != null ? UPDATE : CREATE;
 
-                        if (singleRmv) {
-                            assert !hasFilters && !retval;
-                            assert val == null || Boolean.TRUE.equals(val) : val;
+                txEntry = addEntry(op,
+                    cacheCtx.toCacheObject(val),
+                    entryProcessor,
+                    invokeArgs,
+                    entry,
+                    expiryPlc,
+                    filter,
+                    true,
+                    drTtl,
+                    drExpireTime,
+                    drVer,
+                    skipStore);
 
-                            ret.set(cacheCtx, null, val != null);
-                        }
-                        else {
-                            CacheObject cacheVal = cacheCtx.toCacheObject(val);
+                if (enlisted != null)
+                    enlisted.add(cacheKey);
 
-                            if (e.op() == TRANSFORM) {
-                                GridCacheVersion ver;
+                if (txEntry.op() == TRANSFORM) {
+                    GridCacheVersion ver;
 
-                                try {
-                                    ver = e.cached().version();
-                                }
-                                catch (GridCacheEntryRemovedException ex) {
-                                    assert optimistic() : e;
+                    try {
+                        ver = entry.version();
+                    }
+                    catch (GridCacheEntryRemovedException e) {
+                        assert optimistic() : txEntry;
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
 
-                                    ver = null;
-                                }
+                        ver = null;
+                    }
 
-                                addInvokeResult(e, cacheVal, ret, ver);
-                            }
-                            else {
-                                boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
+                    addInvokeResult(txEntry, txEntry.value(), ret, ver);
+                }
+            }
 
-                                ret.set(cacheCtx, cacheVal, success);
-                            }
-                        }
-                    }
-                });
-
-            return new GridEmbeddedFuture<>(
-                new C2<Void, Exception, Set<KeyCacheObject>>() {
-                    @Override public Set<KeyCacheObject> apply(Void b, Exception e) {
-                        if (e != null)
-                            throw new GridClosureException(e);
+            if (!pessimistic()) {
+                txEntry.markValid();
 
-                        return Collections.emptySet();
-                    }
-                }, fut
-            );
+                if (retval && !transform)
+                    ret.set(cacheCtx, v, true);
+                else
+                    ret.success(true);
+            }
         }
 
-        return new GridFinishedFuture<>(skipped);
+        return loadMissed;
     }
 
     /**
@@ -2486,22 +2643,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      *
      * @param cacheCtx Context.
      * @param keys Keys.
-     * @param failed Collection of potentially failed keys (need to populate in this method).
      * @param ret Return value.
      * @param rmv {@code True} if remove.
      * @param retval Flag to return value or not.
      * @param read {@code True} if read.
      * @param accessTtl TTL for read operation.
      * @param filter Filter to check entries.
-     * @return Failed keys.
      * @throws IgniteCheckedException If error.
      * @param computeInvoke If {@code true} computes return value for invoke operation.
      */
     @SuppressWarnings("unchecked")
-    protected Set<KeyCacheObject> postLockWrite(
+    protected final void postLockWrite(
         GridCacheContext cacheCtx,
         Iterable<KeyCacheObject> keys,
-        Set<KeyCacheObject> failed,
         GridCacheReturn ret,
         boolean rmv,
         boolean retval,
@@ -2606,8 +2760,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                             log.debug("Filter passed in post lock for key: " + k);
                     }
                     else {
-                        failed = skip(failed, k);
-
                         // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
                         txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
                         txEntry.filters(CU.empty0());
@@ -2638,11 +2790,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 }
             }
         }
-
-        if (log.isDebugEnabled())
-            log.debug("Entries that failed after lock filter check: " + failed);
-
-        return failed;
     }
 
     /**
@@ -2696,6 +2843,140 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
+     * @param cacheCtx Cache context.
+     * @param retval Return value flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void beforePut(GridCacheContext cacheCtx, boolean retval) throws IgniteCheckedException {
+        checkUpdatesAllowed(cacheCtx);
+
+        cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
+
+        if (retval)
+            needReturnValue(true);
+
+        checkValid();
+
+        init();
+    }
+
+    /**
+     * Internal method for single update operation.
+     *
+     * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @return Operation future.
+     */
+    private <K, V> IgniteInternalFuture putAsync0(
+        final GridCacheContext cacheCtx,
+        K key,
+        V val,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate[] filter
+    ) {
+        assert key != null;
+
+        try {
+            beforePut(cacheCtx, retval);
+
+            final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+            KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+            final IgniteInternalFuture<Void> loadFut = enlistWrite(
+                cacheCtx,
+                cacheKey,
+                val,
+                opCtx != null ? opCtx.expiry() : null,
+                null,
+                null,
+                retval,
+                /*lockOnly*/false,
+                filter,
+                ret,
+                opCtx != null && opCtx.skipStore(),
+                /*singleRmv*/false);
+
+            if (pessimistic()) {
+                assert loadFut == null || loadFut.isDone() : loadFut;
+
+                final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+                if (log.isDebugEnabled())
+                    log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                    lockTimeout(),
+                    this,
+                    false,
+                    retval,
+                    isolation,
+                    isInvalidate(),
+                    -1L);
+
+                PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                    @Override public GridCacheReturn postLock(GridCacheReturn ret)
+                        throws IgniteCheckedException
+                    {
+                        if (log.isDebugEnabled())
+                            log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+                        postLockWrite(cacheCtx,
+                            enlisted,
+                            ret,
+                            /*remove*/false,
+                            retval,
+                            /*read*/false,
+                            -1L,
+                            filter,
+                            /*computeInvoke*/true);
+
+                        return ret;
+                    }
+                };
+
+                if (fut.isDone()) {
+                    try {
+                        return nonInterruptable(plc1.apply(fut.get(), null));
+                    }
+                    catch (GridClosureException e) {
+                        return new GridFinishedFuture<>(e.unwrap());
+                    }
+                    catch (IgniteCheckedException e) {
+                        try {
+                            return nonInterruptable(plc1.apply(false, e));
+                        }
+                        catch (Exception e1) {
+                            return new GridFinishedFuture<>(e1);
+                        }
+                    }
+                }
+                else {
+                    return nonInterruptable(new GridEmbeddedFuture<>(
+                        fut,
+                        plc1
+                    ));
+                }
+            }
+            else
+                return optimisticPutFuture(loadFut, ret);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture(e);
+        }
+        catch (RuntimeException e) {
+            onException();
+
+            throw e;
+        }
+    }
+
+    /**
      * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
      * maps must be non-null.
      *
@@ -2721,17 +3002,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         assert filter == null || invokeMap == null;
 
         try {
-            checkUpdatesAllowed(cacheCtx);
+            beforePut(cacheCtx, retval);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture(e);
         }
 
-        cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
-
-        if (retval)
-            needReturnValue(true);
-
         // Cached entry may be passed only from entry wrapper.
         final Map<?, ?> map0;
         final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
@@ -2757,15 +3033,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         assert map0 != null || invokeMap0 != null;
 
-        try {
-            checkValid();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(e);
-        }
-
-        init();
-
         final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
 
         if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
@@ -2783,15 +3050,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         try {
             Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
 
-            Collection<KeyCacheObject> enlisted = new ArrayList<>();
+            final Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
-            final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
+            final IgniteInternalFuture<Void> loadFut = enlistWrite(
                 cacheCtx,
                 keySet,
                 opCtx != null ? opCtx.expiry() : null,
-                implicit,
                 map0,
                 invokeMap0,
                 invokeArgs,
@@ -2806,15 +3072,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 false);
 
             if (pessimistic()) {
-                // Loose all skipped.
-                final Set<KeyCacheObject> loaded = loadFut.get();
-
-                final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded));
+                assert loadFut == null || loadFut.isDone() : loadFut;
 
                 if (log.isDebugEnabled())
-                    log.debug("Before acquiring transaction lock for put on keys: " + keys);
+                    log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
 
-                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys,
+                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     lockTimeout(),
                     this,
                     false,
@@ -2828,11 +3091,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         throws IgniteCheckedException
                     {
                         if (log.isDebugEnabled())
-                            log.debug("Acquired transaction lock for put on keys: " + keys);
+                            log.debug("Acquired transaction lock for put on keys: " + enlisted);
 
                         postLockWrite(cacheCtx,
-                            keys,
-                            loaded,
+                            enlisted,
                             ret,
                             /*remove*/false,
                             retval,
@@ -2861,64 +3123,79 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         }
                     }
                 }
-                else
+                else {
                     return nonInterruptable(new GridEmbeddedFuture<>(
                         fut,
                         plc1
                     ));
+                }
             }
-            else {
-                if (implicit()) {
-                    // Should never load missing values for implicit transaction as values will be returned
-                    // with prepare response, if required.
-                    assert loadFut.isDone();
+            else
+                return optimisticPutFuture(loadFut, ret);
+        }
+        catch (RuntimeException e) {
+            onException();
 
-                    try {
-                        loadFut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
+            throw e;
+        }
+    }
 
-                    return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException {
-                            try {
-                                txFut.get();
+    /**
+     * @param loadFut Missing keys load future.
+     * @param ret Future result.
+     * @return Future.
+     */
+    private IgniteInternalFuture optimisticPutFuture(IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret) {
+        if (implicit()) {
+            // Should never load missing values for implicit transaction as values will be returned
+            // with prepare response, if required.
+            assert loadFut.isDone();
 
-                                return implicitRes;
-                            }
-                            catch (IgniteCheckedException | RuntimeException e) {
-                                rollbackAsync();
+            try {
+                loadFut.get();
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
+            }
 
-                                throw e;
-                            }
+            return nonInterruptable(commitAsync().chain(
+                new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                        throws IgniteCheckedException {
+                        try {
+                            txFut.get();
+
+                            return implicitRes;
                         }
-                    }));
-                }
-                else
-                    return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
-                            f.get();
+                        catch (IgniteCheckedException | RuntimeException e) {
+                            rollbackAsync();
 
-                            return ret;
+                            throw e;
                         }
-                    }));
-            }
+                    }
+                }
+            ));
         }
-        catch (RuntimeException e) {
-            for (IgniteTxEntry txEntry : txMap.values()) {
-                GridCacheEntryEx cached0 = txEntry.cached();
-
-                if (cached0 != null)
-                    txEntry.context().evicts().touch(cached0, topologyVersion());
-            }
+        else {
+            return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+                @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException {
+                    f.get();
 
-            throw e;
+                    return ret;
+                }
+            }));
         }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
+    }
 
-            return new GridFinishedFuture<>(e);
+    /**
+     *
+     */
+    private void onException() {
+        for (IgniteTxEntry txEntry : txMap.values()) {
+            GridCacheEntryEx cached0 = txEntry.cached();
+
+            if (cached0 != null)
+                txEntry.context().evicts().touch(cached0, topologyVersion());
         }
     }
 
@@ -2974,9 +3251,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         assert keys0 != null;
 
-        if (log.isDebugEnabled())
+        if (log.isDebugEnabled()) {
             log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit +
                 ", retval=" + retval + "]");
+        }
 
         try {
             checkValid();
@@ -3002,140 +3280,131 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         init();
 
-        try {
-            Collection<KeyCacheObject> enlisted = new ArrayList<>();
+        final Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
-            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
-            ExpiryPolicy plc;
+        ExpiryPolicy plc;
 
-            if (!F.isEmpty(filter))
-                plc = opCtx != null ? opCtx.expiry() : null;
-            else
-                plc = null;
+        if (!F.isEmpty(filter))
+            plc = opCtx != null ? opCtx.expiry() : null;
+        else
+            plc = null;
 
-            final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
-                cacheCtx,
-                keys0,
-                plc,
-                implicit,
-                /** lookup map */null,
-                /** invoke map */null,
-                /** invoke arguments */null,
-                retval,
-                /** lock only */false,
-                filter,
-                ret,
-                enlisted,
-                null,
-                drMap,
-                opCtx != null && opCtx.skipStore(),
-                singleRmv
-            );
+        final IgniteInternalFuture<Void> loadFut = enlistWrite(
+            cacheCtx,
+            keys0,
+            plc,
+            /** lookup map */null,
+            /** invoke map */null,
+            /** invoke arguments */null,
+            retval,
+            /** lock only */false,
+            filter,
+            ret,
+            enlisted,
+            null,
+            drMap,
+            opCtx != null && opCtx.skipStore(),
+            singleRmv
+        );
 
-            if (log.isDebugEnabled())
-                log.debug("Remove keys: " + enlisted);
+        if (log.isDebugEnabled())
+            log.debug("Remove keys: " + enlisted);
 
-            // Acquire locks only after having added operation to the write set.
-            // Otherwise, during rollback we will not know whether locks need
-            // to be rolled back.
-            if (pessimistic()) {
-                // Loose all skipped.
-                final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+        // Acquire locks only after having added operation to the write set.
+        // Otherwise, during rollback we will not know whether locks need
+        // to be rolled back.
+        if (pessimistic()) {
+            assert loadFut.isDone() : loadFut;
 
-                if (log.isDebugEnabled())
-                    log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
-
-                IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys,
-                    lockTimeout(),
-                    this,
-                    false,
-                    retval,
-                    isolation,
-                    isInvalidate(),
-                    -1L);
+            if (log.isDebugEnabled())
+                log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
 
-                PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
-                    @Override protected GridCacheReturn postLock(GridCacheReturn ret)
-                        throws IgniteCheckedException
-                    {
-                        if (log.isDebugEnabled())
-                            log.debug("Acquired transaction lock for remove on keys: " + passedKeys);
+            IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+                lockTimeout(),
+                this,
+                false,
+                retval,
+                isolation,
+                isInvalidate(),
+                -1L);
+
+            PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+                @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+                    throws IgniteCheckedException
+                {
+                    if (log.isDebugEnabled())
+                        log.debug("Acquired transaction lock for remove on keys: " + enlisted);
 
-                        postLockWrite(cacheCtx,
-                            passedKeys,
-                            loadFut.get(),
-                            ret,
+                    postLockWrite(cacheCtx,
+                        enlisted,
+                        ret,
                             /*remove*/true,
-                            retval,
+                        retval,
                             /*read*/false,
-                            -1L,
-                            filter,
+                        -1L,
+                        filter,
                             /*computeInvoke*/false);
 
-                        return ret;
-                    }
-                };
+                    return ret;
+                }
+            };
 
-                if (fut.isDone()) {
+            if (fut.isDone()) {
+                try {
+                    return nonInterruptable(plc1.apply(fut.get(), null));
+                }
+                catch (GridClosureException e) {
+                    return new GridFinishedFuture<>(e.unwrap());
+                }
+                catch (IgniteCheckedException e) {
                     try {
-                        return nonInterruptable(plc1.apply(fut.get(), null));
-                    }
-                    catch (GridClosureException e) {
-                        return new GridFinishedFuture<>(e.unwrap());
+                        return nonInterruptable(plc1.apply(false, e));
                     }
-                    catch (IgniteCheckedException e) {
-                        try {
-                            return nonInterruptable(plc1.apply(false, e));
-                        }
-                        catch (Exception e1) {
-                            return new GridFinishedFuture<>(e1);
-                        }
+                    catch (Exception e1) {
+                        return new GridFinishedFuture<>(e1);
                     }
                 }
-                else
-                    return nonInterruptable(new GridEmbeddedFuture<>(
-                        fut,
-                        plc1
-                    ));
             }
-            else {
-                if (implicit()) {
-                    // Should never load missing values for implicit transaction as values will be returned
-                    // with prepare response, if required.
-                    assert loadFut.isDone();
-
-                    return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
-                            throws IgniteCheckedException {
-                            try {
-                                txFut.get();
+            else
+                return nonInterruptable(new GridEmbeddedFuture<>(
+                    fut,
+                    plc1
+                ));
+        }
+        else {
+            if (implicit()) {
+                // Should never load missing values for implicit transaction as values will be returned
+                // with prepare response, if required.
+                assert loadFut.isDone();
 
-                                return implicitRes;
-                            }
-                            catch (IgniteCheckedException | RuntimeException e) {
-                                rollbackAsync();
+                return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                        throws IgniteCheckedException {
+                        try {
+                            txFut.get();
 
-                                throw e;
-                            }
+                            return implicitRes;
                         }
-                    }));
-                }
-                else
-                    return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
-                        @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
-                            throws IgniteCheckedException {
-                            f.get();
+                        catch (IgniteCheckedException | RuntimeException e) {
+                            rollbackAsync();
 
-                            return ret;
+                            throw e;
                         }
-                    }));
+                    }
+                }));
             }
-        }
-        catch (IgniteCheckedException e) {
-            setRollbackOnly();
+            else {
+                return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+                    @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+                        throws IgniteCheckedException {
+                        f.get();
 
-            return new GridFinishedFuture<>(e);
+                        return ret;
+                    }
+                }));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0d83338..f9555cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -93,6 +93,21 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
 
     /**
      * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param val Value.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @return Future for put operation.
+     */
+    public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+        GridCacheContext cacheCtx,
+        K key,
+        V val,
+        boolean retval,
+        CacheEntryPredicate[] filter);
+
+    /**
+     * @param cacheCtx Cache context.
      * @param map Entry processors map.
      * @param invokeArgs Optional arguments for entry processor.
      * @return Transform operation future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f09d09fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
index 6408573..429c995 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
@@ -170,8 +170,7 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public IgniteTxEntry get(Object key) {
+    @Nullable @Override public IgniteTxEntry get(Object key) {
         IgniteTxEntry e = txMap.get(key);
 
         return e == null ? null : filter.apply(e) ? e : null;


Mime
View raw message