ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [04/50] ignite git commit: ignite-114 Load value from store for cache 'invoke'
Date Mon, 20 Jun 2016 12:52:44 GMT
ignite-114 Load value from store for cache 'invoke'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e10ffef0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e10ffef0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e10ffef0

Branch: refs/heads/ignite-3341
Commit: e10ffef00e91f88932c6478741eb00abe8992de4
Parents: 2b64e7c
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jun 14 10:05:20 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jun 14 10:05:20 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  34 +-
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     | 100 +++--
 .../processors/cache/GridCacheUtils.java        |   3 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  19 +-
 .../dht/GridPartitionedGetFuture.java           |   2 -
 .../dht/GridPartitionedSingleGetFuture.java     |   2 -
 .../dht/atomic/GridDhtAtomicCache.java          |   8 -
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   3 +-
 .../distributed/near/GridNearGetFuture.java     |   4 -
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   8 -
 .../cache/transactions/IgniteTxAdapter.java     |   2 -
 .../cache/transactions/IgniteTxEntry.java       |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  46 +++
 .../transactions/IgniteTxLocalAdapter.java      |  34 +-
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../cache/IgniteCacheAbstractTest.java          |   2 +-
 ...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
 ...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
 .../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
 .../IgniteCacheReadThroughStoreCallTest.java    | 288 ++++++++++++++
 .../IgniteCacheLoaderWriterAbstractTest.java    |  10 +
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 30 files changed, 1110 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6ccb506..6e647c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3655,8 +3655,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         GridCacheEntryEx entry = entryEx(key, false);
 
         try {
-            entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
-                replicate ? DR_LOAD : DR_NONE);
+            entry.initialValue(cacheVal,
+                ver,
+                ttl,
+                CU.EXPIRE_TIME_CALCULATE,
+                false,
+                topVer,
+                replicate ? DR_LOAD : DR_NONE,
+                true);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to put cache value: " + entry, e);
@@ -5102,19 +5108,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         throws IgniteCheckedException, GridCacheEntryRemovedException
     {
         CacheObject val = entry.innerGet(
-            null,
-            null,
-            false,
-            false,
-            false,
-            true,
-            false,
-            false,
-            false,
-            null,
-            null,
-            null,
-            null,
+            /*ver*/null,
+            /*tx*/null,
+            /*swap*/false,
+            /*readThrough*/false,
+            /*metrics*/false,
+            /*evt*/false,
+            /*tmp*/false,
+            /*subjId*/null,
+            /*transformClo*/null,
+            /*taskName*/null,
+            /*expiryPlc*/null,
             !deserializeBinary);
 
         if (val == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 31bd887..c5fdde6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -278,9 +278,6 @@ public interface GridCacheEntryEx {
      * @param tx Ongoing transaction (possibly null).
      * @param readSwap Flag indicating whether to check swap memory.
      * @param readThrough Flag indicating whether to read through.
-     * @param failFast If {@code true}, then throw {@link GridCacheFilterFailedException} if
-     *      filter didn't pass.
-     * @param unmarshal Unmarshal flag.
      * @param updateMetrics If {@code true} then metrics should be updated.
      * @param evt Flag to signal event notification.
      * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held,
@@ -299,8 +296,6 @@ public interface GridCacheEntryEx {
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -677,6 +672,7 @@ public interface GridCacheEntryEx {
      * @param preload Flag indicating whether entry is being preloaded.
      * @param topVer Topology version.
      * @param drType DR type.
+     * @param fromStore {@code True} if value was loaded from store.
      * @return {@code True} if initial value was set.
      * @throws IgniteCheckedException In case of error.
      * @throws GridCacheEntryRemovedException If entry was removed.
@@ -687,7 +683,8 @@ public interface GridCacheEntryEx {
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException;
+        GridDrType drType,
+        boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
      * Sets new value if current version is <tt>0</tt> using swap entry data.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f442202..b5359f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -750,8 +750,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -766,7 +764,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             readSwap,
             readThrough,
             evt,
-            unmarshal,
             updateMetrics,
             tmp,
             subjId,
@@ -796,7 +793,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             readSwap,
             false,
             evt,
-            unmarshal,
             updateMetrics,
             false,
             subjId,
@@ -815,7 +811,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean readSwap,
         boolean readThrough,
         boolean evt,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean tmp,
         UUID subjId,
@@ -1985,6 +1980,51 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (isStartVersion())
                 unswap(retval, false);
 
+            // Prepare old value.
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
+
+            // Possibly read value from store.
+            boolean readFromStore = false;
+
+            Object old0 = null;
+
+            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+                old0 = readThrough(null, key, false, subjId, taskName);
+
+                oldVal = cctx.toCacheObject(old0);
+
+                readFromStore = true;
+
+                // Detach value before index update.
+                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
+
+                // Calculate initial TTL and expire time.
+                long initTtl;
+                long initExpireTime;
+
+                if (expiryPlc != null && oldVal != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+
+                if (oldVal != null)
+                    updateIndex(oldVal, initExpireTime, ver, null);
+                else
+                    clearIndex(null);
+
+                update(oldVal, initExpireTime, initTtl, ver, true);
+
+                if (deletedUnlocked() && oldVal != null && !isInternal())
+                    deletedUnlocked(false);
+            }
+
             Object transformClo = null;
 
             // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
@@ -2192,51 +2232,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
             }
 
-            // Prepare old value and value bytes.
-            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
-
-            // Possibly read value from store.
-            boolean readFromStore = false;
-
-            Object old0 = null;
-
-            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
-                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old0 = readThrough(null, key, false, subjId, taskName);
-
-                oldVal = cctx.toCacheObject(old0);
-
-                readFromStore = true;
-
-                // Detach value before index update.
-                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
-                // Calculate initial TTL and expire time.
-                long initTtl;
-                long initExpireTime;
-
-                if (expiryPlc != null && oldVal != null) {
-                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
-                    initTtl = initTtlAndExpireTime.get1();
-                    initExpireTime = initTtlAndExpireTime.get2();
-                }
-                else {
-                    initTtl = CU.TTL_ETERNAL;
-                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
-                }
-
-                if (oldVal != null)
-                    updateIndex(oldVal, initExpireTime, ver, null);
-                else
-                    clearIndex(null);
-
-                update(oldVal, initExpireTime, initTtl, ver, true);
-
-                if (deletedUnlocked() && oldVal != null && !isInternal())
-                    deletedUnlocked(false);
-            }
-
             // Apply metrics.
             if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
                 // PutIfAbsent methods mustn't update hit/miss statistics
@@ -3435,7 +3430,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType)
+        GridDrType drType,
+        boolean fromStore)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         synchronized (this) {
             checkObsolete();
@@ -3488,7 +3484,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     cctx.dataStructures().onEntryUpdated(key, false, true);
                 }
 
-                if (cctx.store().isLocal()) {
+                if (!fromStore && cctx.store().isLocal()) {
                     if (val != null)
                         cctx.store().put(null, key, val, ver);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 6ce5735..87c4a3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -155,6 +155,9 @@ public class GridCacheUtils {
     /** Keep serialized flag. */
     public static final int KEEP_BINARY_FLAG_MASK = 0x2;
 
+    /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
+    public static final int OLD_VAL_ON_PRIMARY = 0x4;
+
     /** Empty predicate array. */
     private static final IgnitePredicate[] EMPTY = new IgnitePredicate[0];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5cc079d..ed1996a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -564,8 +564,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     entry = entryEx(key, false);
 
-                    entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer,
-                        replicate ? DR_LOAD : DR_NONE);
+                    entry.initialValue(cacheVal,
+                        ver,
+                        ttl,
+                        CU.EXPIRE_TIME_CALCULATE,
+                        false,
+                        topVer,
+                        replicate ? DR_LOAD : DR_NONE,
+                        false);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException("Failed to put cache value: " + entry, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 5659436..0a3513a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1053,7 +1053,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                             try {
                                 CacheObject val0 = cctx.toCacheObject(val);
 
-                                entry0.initialValue(val0, ver, 0, 0, false, topVer, GridDrType.DR_LOAD);
+                                entry0.initialValue(val0,
+                                    ver,
+                                    0,
+                                    0,
+                                    false,
+                                    topVer,
+                                    GridDrType.DR_LOAD,
+                                    true);
                             }
                             catch (GridCacheEntryRemovedException e) {
                                 assert false : "Should not get removed exception while holding lock on entry " +
@@ -1210,8 +1217,13 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                     try {
                         GridCacheEntryEx entry = cctx.cache().entryEx(info.key(), topVer);
 
-                        if (entry.initialValue(info.value(), info.version(), info.ttl(),
-                            info.expireTime(), true, topVer, replicate ? DR_PRELOAD : DR_NONE)) {
+                        if (entry.initialValue(info.value(),
+                            info.version(),
+                            info.ttl(),
+                            info.expireTime(),
+                            true, topVer,
+                            replicate ? DR_PRELOAD : DR_NONE,
+                            false)) {
                             if (rec && !entry.isInternal())
                                 cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
                                     (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b2da39c..0ca02c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1181,8 +1181,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                         tx,
                                         /*swap*/true,
                                         /*read-through*/false,
-                                        /*fail-fast.*/false,
-                                        /*unmarshal*/false,
                                         /*update-metrics*/true,
                                         /*event notification*/req.returnValue(i),
                                         /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e17cbbc..cedfee5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -333,9 +333,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
                     cached.unswap(retVal);
 
-                    boolean readThrough = (retVal || hasFilters) &&
-                        cacheCtx.config().isLoadPreviousValue() &&
-                        !txEntry.skipStore();
+                    boolean readThrough = !txEntry.skipStore() &&
+                        (txEntry.op() == TRANSFORM || ((retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue()));
 
                     boolean evt = retVal || txEntry.op() == TRANSFORM;
 
@@ -351,8 +350,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         tx,
                         /*swap*/true,
                         readThrough,
-                        /*fail fast*/false,
-                        /*unmarshal*/true,
                         /*metrics*/retVal,
                         /*event*/evt,
                         /*tmp*/false,
@@ -376,6 +373,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                             boolean modified = false;
 
+                            txEntry.oldValueOnPrimary(val != null);
+
                             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
                                  CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
                                      txEntry.cached().version(), keepBinary, txEntry.cached());
@@ -1559,8 +1558,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE;
 
                         try {
-                            if (entry.initialValue(info.value(), info.version(),
-                                info.ttl(), info.expireTime(), true, topVer, drType)) {
+                            if (entry.initialValue(info.value(),
+                                info.version(),
+                                info.ttl(),
+                                info.expireTime(),
+                                true,
+                                topVer,
+                                drType,
+                                false)) {
                                 if (rec && !entry.isInternal())
                                     cacheCtx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),
                                         (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7d2c5db..b3a485c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -473,8 +473,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                             null,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /**update-metrics*/false,
                             /*event*/!skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index b0b9d7c..7788b63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -397,8 +397,6 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
                             null,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /**update-metrics*/false,
                             /*event*/!skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 08f1f30..ad87add 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1219,8 +1219,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                     null,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
                                     /**update-metrics*/false,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1695,8 +1693,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*read swap*/true,
                         /*read through*/true,
-                        /*fail fast*/false,
-                        /*unmarshal*/true,
                         /*metrics*/true,
                         /*event*/true,
                         /*temporary*/true,
@@ -1855,8 +1851,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                              null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
-                            /*fail fast*/false,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*event*/true,
                             /*temporary*/true,
@@ -1902,8 +1896,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
-                            /*fail fast*/false,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*event*/true,
                             /*temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 35831a3..e8e8298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -507,8 +507,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                                     null,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
                                     /**update-metrics*/false,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1057,7 +1055,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
             IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx,
                 keys,
-                tx.implicit(),
+                retval,
                 txRead,
                 accessTtl,
                 skipStore,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 4da1f38..7cbd77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -552,7 +552,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                             info.expireTime(),
                             true,
                             topVer,
-                            replicate ? DR_PRELOAD : DR_NONE
+                            replicate ? DR_PRELOAD : DR_NONE,
+                            false
                         )) {
                             if (rec && !entry.isInternal())
                                 cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6181d9e..6fde827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -689,7 +689,8 @@ public class GridDhtPartitionDemander {
                         entry.expireTime(),
                         true,
                         topVer,
-                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE
+                        cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
+                        false
                     )) {
                         cctx.evicts().touch(cached, topVer); // Start tracking.
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index edac92c..73b9d38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -457,8 +457,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             tx,
                             /*swap*/false,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /*metrics*/true,
                             /*events*/!skipVals,
                             /*temporary*/false,
@@ -599,8 +597,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                             tx,
                             /*swap*/true,
                             /*read-through*/false,
-                            /*fail-fast*/true,
-                            /*unmarshal*/true,
                             /*update-metrics*/false,
                             /*events*/!nearRead && !skipVals,
                             /*temporary*/false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 3ff165b..bea2ce5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1138,16 +1138,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys.
-     * @param implicit Implicit flag.
+     * @param retval Return value flag.
      * @param read Read flag.
      * @param accessTtl Access ttl.
      * @param <K> Key type.
      * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
      * @return Future with respond.
      */
     public <K> IgniteInternalFuture<GridCacheReturn> lockAllAsync(GridCacheContext cacheCtx,
         final Collection<? extends K> keys,
-        boolean implicit,
+        boolean retval,
         boolean read,
         long accessTtl,
         boolean skipStore,
@@ -1181,7 +1182,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             this,
             isInvalidate(),
             read,
-            /*retval*/false,
+            retval,
             isolation,
             accessTtl,
             CU.empty0(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index d2d8d62..7caea23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -628,8 +628,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                                 null,
                                 /*swap*/swapOrOffheap,
                                 /*read-through*/false,
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/!skipVals,
                                 /**temporary*/false,
@@ -1187,8 +1185,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                             null,
                             /*swap*/true,
                             /*read-through*/true,
-                            /*fail-fast*/false,
-                            /*unmarshal*/true,
                             /**update-metrics*/true,
                             /**event*/true,
                             /**temporary*/true,
@@ -1311,8 +1307,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                                 null,
                                 /*swap*/true,
                                 /*read-through*/ctx.loadPreviousValue(),
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/true,
                                 /**temporary*/true,
@@ -1348,8 +1342,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
                                 null,
                                 /*swap*/true,
                                 /*read-through*/ctx.loadPreviousValue(),
-                                /*fail-fast*/false,
-                                /*unmarshal*/true,
                                 /**update-metrics*/true,
                                 /**event*/true,
                                 /**temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 66d4df0..3bb3a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1492,8 +1492,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
                     this,
                     /*swap*/false,
                     /*read through*/false,
-                    /*fail fast*/true,
-                    /*unmarshal*/true,
                     /*metrics*/metrics,
                     /*event*/recordEvt,
                     /*temporary*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 90aa8d2..3258ce9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -57,6 +58,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.OLD_VAL_ON_PRIMARY;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
 
 /**
@@ -185,9 +187,11 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     private byte[] expiryPlcBytes;
 
     /**
-     * Additional flags.
-     * GridCacheUtils.SKIP_STORE_FLAG_MASK - for skipStore flag value.
-     * GridCacheUtils.KEEP_BINARY_FLAG_MASK - for withKeepBinary flag.
+     * Additional flags:
+     * <ul>
+     * <li>{@link GridCacheUtils#SKIP_STORE_FLAG_MASK} - for skipStore flag value.</li>
+     * <li>{@link GridCacheUtils#KEEP_BINARY_FLAG_MASK} - for withKeepBinary flag.</li>
+     * </ul>
      */
     private byte flags;
 
@@ -483,6 +487,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * @param oldValOnPrimary {@code True} If old value for 'invoke' operation was non null on primary node.
+     */
+    public void oldValueOnPrimary(boolean oldValOnPrimary) {
+        setFlag(oldValOnPrimary, OLD_VAL_ON_PRIMARY);
+    }
+
+    /**
+     * @return {@code True} If old value for 'invoke' operation was non null on primary node.
+     */
+    public boolean oldValueOnPrimary() {
+        return isFlag(OLD_VAL_ON_PRIMARY);
+    }
+
+    /**
      * Sets keep binary flag value.
      *
      * @param keepBinary Keep binary flag value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..5b09760 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,9 +25,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -73,6 +75,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.FINISH_NEAR_ONE_PHASE_SINCE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
@@ -1165,6 +1168,49 @@ public class IgniteTxHandler {
                             if (info != null && !info.isNew() && !info.isDeleted())
                                 res.addPreloadEntry(info);
                         }
+
+                        if (cacheCtx.readThroughConfigured() &&
+                            !entry.skipStore() &&
+                            entry.op() == TRANSFORM &&
+                            entry.oldValueOnPrimary() &&
+                            !entry.hasValue()) {
+                            while (true) {
+                                try {
+                                    GridCacheEntryEx cached = entry.cached();
+
+                                    if (cached == null)
+                                        cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+                                    CacheObject val = cached.innerGet(
+                                        /*ver*/null,
+                                        tx,
+                                        /*readSwap*/true,
+                                        /*readThrough*/false,
+                                        /*updateMetrics*/false,
+                                        /*evt*/false,
+                                        /*tmp*/false,
+                                        tx.subjectId(),
+                                        /*transformClo*/null,
+                                        tx.resolveTaskName(),
+                                        /*expiryPlc*/null,
+                                        /*keepBinary*/true);
+
+                                    if (val == null)
+                                        val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
+
+                                    if (val != null)
+                                        entry.readValue(val);
+
+                                    break;
+                                }
+                                catch (GridCacheEntryRemovedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Got entry removed exception, will retry: " + entry.txKey());
+
+                                    entry.cached(null);
+                                }
+                            }
+                        }
                     }
                     catch (GridDhtInvalidPartitionException e) {
                         tx.addInvalidPartition(cacheCtx, e.partition());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8eea8dd..28ecda5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1255,8 +1255,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     this,
                                     /*swap*/true,
                                     /*read-through*/false,
-                                    /*fail fast*/true,
-                                    /*unmarshal*/true,
                                     /*metrics*/true,
                                     /*event*/!skipVals,
                                     /*temporary*/false,
@@ -1340,9 +1338,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     null,
                                     this,
                                     /*swap*/true,
-                                    /*no read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
+                                    /*read-through*/false,
                                     /*metrics*/true,
                                     /*event*/true,
                                     /*temporary*/false,
@@ -1687,8 +1683,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                             IgniteTxLocalAdapter.this,
                                             cacheCtx.isSwapOrOffheapEnabled(),
                                             /*read-through*/false,
-                                            /*fail-fast*/true,
-                                            /*unmarshal*/true,
                                             /*metrics*/true,
                                             /*events*/!skipVals,
                                             /*temporary*/false,
@@ -1992,7 +1986,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     needReadVer,
                     singleRmv,
                     hasFilters,
-                    skipStore,
+                    /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                     retval,
                     keepBinary);
             }
@@ -2153,7 +2147,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     needReadVer,
                     singleRmv,
                     hasFilters,
-                    skipStore,
+                    /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
                     retval,
                     keepBinary);
             }
@@ -2173,7 +2167,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param needReadVer Read version flag.
      * @param singleRmv {@code True} for single remove operation.
      * @param hasFilters {@code True} if filters not empty.
-     * @param skipStore Skip store flag.
+     * @param readThrough Read through flag.
      * @param retval Return value flag.
      * @return Load future.
      */
@@ -2185,7 +2179,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         final boolean needReadVer,
         final boolean singleRmv,
         final boolean hasFilters,
-        final boolean skipStore,
+        final boolean readThrough,
         final boolean retval,
         final boolean keepBinary) {
         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
@@ -2218,6 +2212,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                         if (e.op() == TRANSFORM) {
                             GridCacheVersion ver;
 
+                            e.readValue(cacheVal);
+
                             try {
                                 ver = e.cached().version();
                             }
@@ -2243,7 +2239,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
         return loadMissing(
             cacheCtx,
-            /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+            readThrough,
             /*async*/true,
             keys,
             /*skipVals*/singleRmv,
@@ -2354,8 +2350,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                                     this,
                                     /*swap*/false,
                                     /*read-through*/false,
-                                    /*fail-fast*/false,
-                                    /*unmarshal*/retval,
                                     /*metrics*/retval,
                                     /*events*/retval,
                                     /*temporary*/false,
@@ -2652,16 +2646,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                     if (retval || invoke) {
                         if (!cacheCtx.isNear()) {
                             if (!hasPrevVal) {
-                                boolean readThrough =
-                                    (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+                                // For non-local cache should read from store after lock on primary.
+                                boolean readThrough = cacheCtx.isLocal() &&
+                                    (invoke || cacheCtx.loadPreviousValue()) &&
+                                    !txEntry.skipStore();
 
                                 v = cached.innerGet(
                                     null,
                                     this,
                                     /*swap*/true,
                                     readThrough,
-                                    /*failFast*/false,
-                                    /*unmarshal*/true,
                                     /*metrics*/!invoke,
                                     /*event*/!invoke && !dht(),
                                     /*temporary*/false,
@@ -2889,7 +2883,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     timeout,
                     this,
-                    false,
+                    /*read*/entryProcessor != null, // Needed to force load from store.
                     retval,
                     isolation,
                     isInvalidate(),
@@ -3069,7 +3063,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
                     timeout,
                     this,
-                    false,
+                    /*read*/invokeMap != null, // Needed to force load from store.
                     retval,
                     isolation,
                     isInvalidate(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4599060..9dc6a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1621,7 +1621,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         expiryTime,
                         false,
                         topVer,
-                        GridDrType.DR_LOAD);
+                        GridDrType.DR_LOAD,
+                        false);
 
                     cctx.evicts().touch(entry, topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 09cf7c8..ebca870 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -432,8 +432,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         @Nullable IgniteInternalTx tx,
         boolean readSwap,
         boolean readThrough,
-        boolean failFast,
-        boolean unmarshal,
         boolean updateMetrics,
         boolean evt,
         boolean tmp,
@@ -668,7 +666,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         long expireTime,
         boolean preload,
         AffinityTopologyVersion topVer,
-        GridDrType drType
+        GridDrType drType,
+        boolean fromStore
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
index ce60232..45b6e9d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java
@@ -54,7 +54,7 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest {
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    protected static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
+    public static final Map<Object, Object> storeMap = new ConcurrentHashMap8<>();
 
     /**
      * @return Grids count to start.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
new file mode 100644
index 0000000..294ebea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughAbstractTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.configuration.Factory;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class IgniteCacheInvokeReadThroughAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile boolean failed;
+
+    /** */
+    protected boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        failed = false;
+
+        startNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        IgniteCacheAbstractTest.storeMap.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @return Store factory.
+     */
+    protected Factory<CacheStore> cacheStoreFactory() {
+        return new IgniteCacheAbstractTest.TestStoreFactory();
+    }
+
+    /**
+     * @param data Data.
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    protected void putDataInStore(Map<Object, Object> data, String cacheName) throws Exception {
+        IgniteCacheAbstractTest.storeMap.putAll(data);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    protected abstract void startNodes() throws Exception;
+
+    /**
+     * @param ccfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    protected void invokeReadThrough(CacheConfiguration ccfg) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        ignite0.createCache(ccfg);
+
+        try {
+            int key = 0;
+
+            for (Ignite node : G.allGrids()) {
+                if (node.configuration().isClientMode() && ccfg.getNearConfiguration() != null)
+                    node.createNearCache(ccfg.getName(), ccfg.getNearConfiguration());
+            }
+
+            for (Ignite node : G.allGrids()) {
+                log.info("Test for node: " + node.name());
+
+                IgniteCache<Object, Object> cache = node.cache(ccfg.getName());
+
+                for (int i = 0; i < 50; i++)
+                    checkReadThrough(cache, key++, null, null);
+
+                Set<Object> keys = new HashSet<>();
+
+                for (int i = 0; i < 5; i++)
+                    keys.add(key++);
+
+                checkReadThroughInvokeAll(cache, keys, null, null);
+
+                keys = new HashSet<>();
+
+                for (int i = 0; i < 100; i++)
+                    keys.add(key++);
+
+                checkReadThroughInvokeAll(cache, keys, null, null);
+
+                if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+                    for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                        for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                            log.info("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+                            for (int i = 0; i < 50; i++)
+                                checkReadThrough(cache, key++, concurrency, isolation);
+
+                            keys = new HashSet<>();
+
+                            for (int i = 0; i < 5; i++)
+                                keys.add(key++);
+
+                            checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+
+                            keys = new HashSet<>();
+
+                            for (int i = 0; i < 100; i++)
+                                keys.add(key++);
+
+                            checkReadThroughInvokeAll(cache, keys, concurrency, isolation);
+                        }
+                    }
+
+                    for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+                        for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                            log.info("Test tx2 [concurrency=" + concurrency + ", isolation=" + isolation + ']');
+
+                            for (int i = 0; i < 50; i++)
+                                checkReadThroughGetAndInvoke(cache, key++, concurrency, isolation);
+                        }
+                    }
+                }
+            }
+
+            ignite0.cache(ccfg.getName()).removeAll();
+        }
+        finally {
+            ignite0.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThrough(IgniteCache<Object, Object> cache,
+        Object key,
+        @Nullable TransactionConcurrency concurrency,
+        @Nullable TransactionIsolation isolation) throws Exception {
+        putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+        Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+            : null;
+
+        try {
+            Object ret = cache.invoke(key, new TestEntryProcessor());
+
+            assertEquals(key, ret);
+
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThroughGetAndInvoke(IgniteCache<Object, Object> cache,
+        Object key,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation) throws Exception {
+        putDataInStore(Collections.singletonMap(key, key), cache.getName());
+
+        try (Transaction tx = cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)) {
+            cache.get(key);
+
+            Object ret = cache.invoke(key, new TestEntryProcessor());
+
+            assertEquals(key, ret);
+
+            tx.commit();
+        }
+
+        checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param keys Key.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void checkReadThroughInvokeAll(IgniteCache<Object, Object> cache,
+        Set<Object> keys,
+        @Nullable TransactionConcurrency concurrency,
+        @Nullable TransactionIsolation isolation) throws Exception {
+        Map<Object, Object> data = U.newHashMap(keys.size());
+
+        for (Object key : keys)
+            data.put(key, key);
+
+        putDataInStore(data, cache.getName());
+
+        Transaction tx = isolation != null ? cache.unwrap(Ignite.class).transactions().txStart(concurrency, isolation)
+            : null;
+
+        try {
+            Map<Object, EntryProcessorResult<Object>> ret = cache.invokeAll(keys, new TestEntryProcessor());
+
+            assertEquals(ret.size(), keys.size());
+
+            for (Object key : keys) {
+                EntryProcessorResult<Object> res = ret.get(key);
+
+                assertNotNull(res);
+                assertEquals(key, res.get());
+            }
+
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
+
+        for (Object key : keys)
+            checkValue(cache.getName(), key, (Integer)key + 1);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Expected value.
+     */
+    private void checkValue(String cacheName, Object key, Object val) {
+        for (Ignite ignite : G.allGrids()) {
+            assertEquals("Unexpected value for node: " + ignite.name(),
+                val,
+                ignite.cache(cacheName).get(key));
+        }
+
+        assertFalse(failed);
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param memoryMode Memory mode.
+     * @param backups Number of backups.
+     * @param nearCache Near cache flag.
+     * @return Cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    protected CacheConfiguration cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        int backups,
+        boolean nearCache) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setReadThrough(true);
+        ccfg.setWriteThrough(true);
+        ccfg.setCacheStoreFactory(cacheStoreFactory());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setMemoryMode(memoryMode);
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration());
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements EntryProcessor<Object, Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> entry, Object... args) {
+            if (!entry.exists()) {
+                failed = true;
+
+                fail();
+            }
+
+            Integer val = (Integer)entry.getValue();
+
+            if (!val.equals(entry.getKey())) {
+                failed = true;
+
+                assertEquals(val, entry.getKey());
+            }
+
+            entry.setValue(val + 1);
+
+            return val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
new file mode 100644
index 0000000..b451abf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughSingleNodeTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteCacheInvokeReadThroughSingleNodeTest extends IgniteCacheInvokeReadThroughAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void startNodes() throws Exception {
+        startGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicLocal() throws Exception {
+        invokeReadThrough(cacheConfiguration(LOCAL, ATOMIC, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxLocal() throws Exception {
+        invokeReadThrough(cacheConfiguration(LOCAL, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e10ffef0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 2464e81..9578227 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -17,111 +17,163 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import javax.cache.configuration.Factory;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.store.CacheStore;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
 
 /**
  *
  */
-public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
+public class IgniteCacheInvokeReadThroughTest extends IgniteCacheInvokeReadThroughAbstractTest {
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-114");
-    }
+    @Override protected void startNodes() throws Exception {
+        startGridsMultiThreaded(4);
 
-    /** */
-    private static volatile boolean failed;
+        client = true;
 
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return 3;
+        startGrid(4);
     }
 
-    /** {@inheritDoc} */
-    @Override protected CacheMode cacheMode() {
-        return PARTITIONED;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic0() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 0, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return TRANSACTIONAL;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic1() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected NearCacheConfiguration nearConfiguration() {
-        return null;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic2() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 2, false));
     }
 
-    /** {@inheritDoc} */
-    @Override protected Factory<CacheStore> cacheStoreFactory() {
-        return new TestStoreFactory();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, ONHEAP_TIERED, 1, true));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        failed = false;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, ONHEAP_TIERED, 0, false));
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testInvokeReadThrough() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
-        checkReadThrough(cache, primaryKey(cache));
+    public void testInvokeReadThroughAtomic0_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 0, false));
+    }
 
-        checkReadThrough(cache, backupKey(cache));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic1_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, false));
+    }
 
-        checkReadThrough(cache, nearKey(cache));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomic2_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 2, false));
     }
 
     /**
-     * @param cache Cache.
-     * @param key Key.
+     * @throws Exception If failed.
      */
-    private void checkReadThrough(IgniteCache<Integer, Integer> cache, Integer key) {
-        log.info("Test [key=" + key + ']');
+    public void testInvokeReadThroughAtomicNearCache_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, ATOMIC, OFFHEAP_TIERED, 1, true));
+    }
 
-        storeMap.put(key, key);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughAtomicReplicated_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, ATOMIC, OFFHEAP_TIERED, 0, false));
+    }
 
-        Object ret = cache.invoke(key, new EntryProcessor<Integer, Integer, Object>() {
-            @Override public Object process(MutableEntry<Integer, Integer> entry, Object... args) {
-                if (!entry.exists()) {
-                    failed = true;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx0() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
 
-                    fail();
-                }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx1() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, false));
+    }
 
-                Integer val = entry.getValue();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx2() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 2, false));
+    }
 
-                if (!val.equals(entry.getKey())) {
-                    failed = true;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, ONHEAP_TIERED, 1, true));
+    }
 
-                    assertEquals(val, entry.getKey());
-                }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, ONHEAP_TIERED, 0, false));
+    }
 
-                entry.setValue(val + 1);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx0_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
+    }
 
-                return val;
-            }
-        });
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx1_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, false));
+    }
 
-        assertEquals(key, ret);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTx2_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 2, false));
+    }
 
-        for (int i = 0; i < gridCount(); i++)
-            assertEquals("Unexpected value for node: " + i, key + 1, jcache(i).get(key));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxNearCache_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(PARTITIONED, TRANSACTIONAL, OFFHEAP_TIERED, 1, true));
+    }
 
-        assertFalse(failed);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvokeReadThroughTxReplicated_Offheap() throws Exception {
+        invokeReadThrough(cacheConfiguration(REPLICATED, TRANSACTIONAL, OFFHEAP_TIERED, 0, false));
     }
 }


Mime
View raw message