ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke
Date Wed, 22 Feb 2017 06:56:02 GMT
ignite-4652 Implemented BPlusTree.invoke


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee28b9cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee28b9cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee28b9cb

Branch: refs/heads/ignite-3477
Commit: ee28b9cb89400af6fcddd89a52fcd1adbcd5d4ff
Parents: 40f015d
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Feb 22 09:55:50 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Feb 22 09:55:50 2017 +0300

----------------------------------------------------------------------
 .../benchmarks/jmh/tree/BPlusTreeBenchmark.java |    3 +-
 .../internal/pagemem/wal/record/DataRecord.java |   10 +-
 .../processors/cache/GridCacheMapEntry.java     | 1729 +++++++++++-------
 .../cache/GridCacheUpdateAtomicResult.java      |   96 +-
 .../cache/IgniteCacheOffheapManager.java        |   48 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |  236 ++-
 .../processors/cache/database/CacheDataRow.java |    6 +
 .../cache/database/CacheDataRowAdapter.java     |   13 +
 .../cache/database/MetadataStorage.java         |    2 +-
 .../cache/database/tree/BPlusTree.java          |  936 ++++++++--
 .../distributed/dht/GridDhtCacheEntry.java      |    5 +
 .../apache/ignite/internal/util/IgniteTree.java |   47 +
 .../processors/database/BPlusTreeSelfTest.java  |  272 ++-
 .../database/FreeListImplSelfTest.java          |    5 +
 .../processors/query/h2/database/H2Tree.java    |    2 +-
 .../processors/query/h2/opt/GridH2Row.java      |    5 +
 .../query/h2/opt/GridH2TreeIndex.java           |    5 +
 17 files changed, 2462 insertions(+), 958 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
index 7355850..dc74363 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
@@ -190,7 +190,8 @@ public class BPlusTreeBenchmark extends JmhAbstractBenchmark {
         }
 
         /** {@inheritDoc} */
-        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException {
+        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore)
+            throws IgniteCheckedException {
             assert io.canGetRow() : io;
 
             return io.getLookupRow(this, pageAddr, idx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 6592852..d2747f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -17,15 +17,10 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
@@ -68,6 +63,7 @@ public class DataRecord extends WALRecord {
         return writeEntries == null ? Collections.<DataEntry>emptyList() : writeEntries;
     }
 
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataRecord.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dc1d04..d28ea25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,7 +40,9 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -83,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
@@ -1535,11 +1540,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public GridCacheUpdateAtomicResult innerUpdate(
-        GridCacheVersion newVer,
+        final GridCacheVersion newVer,
         final UUID evtNodeId,
         final UUID affNodeId,
-        GridCacheOperation op,
-        @Nullable Object writeObj,
+        final GridCacheOperation op,
+        @Nullable final Object writeObj,
         @Nullable final Object[] invokeArgs,
         final boolean writeThrough,
         final boolean readThrough,
@@ -1555,42 +1560,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         final GridDrType drType,
         final long explicitTtl,
         final long explicitExpireTime,
-        @Nullable GridCacheVersion conflictVer,
+        @Nullable final GridCacheVersion conflictVer,
         final boolean conflictResolve,
         final boolean intercept,
         @Nullable final UUID subjId,
         final String taskName,
         @Nullable final CacheObject prevVal,
         @Nullable final Long updateCntr,
-        @Nullable GridDhtAtomicAbstractUpdateFuture fut
+        @Nullable final GridDhtAtomicAbstractUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
-        assert cctx.atomic();
-
-        boolean res = true;
-
-        CacheObject oldVal;
-        CacheObject updated;
-
-        GridCacheVersion enqueueVer = null;
+        assert cctx.atomic() && !detached();
 
-        GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
-        IgniteBiTuple<Object, Exception> invokeRes = null;
-
-        // System TTL/ET which may have special values.
-        long newSysTtl;
-        long newSysExpireTime;
-
-        // TTL/ET which will be passed to entry on update.
-        long newTtl;
-        long newExpireTime;
-
-        Object key0 = null;
-        Object updated0 = null;
-
-        Long updateCntr0 = null;
+        AtomicCacheUpdateClosure c;
 
         synchronized (this) {
+            checkObsolete();
+
             boolean internal = isInternal() || !context().userCache();
 
             Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
@@ -1598,679 +1583,270 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
                 || !F.isEmptyOrNulls(filter);
 
-            checkObsolete();
-
-            CacheDataRow oldRow = null;
-
-            // Load and remove from swap if it is new.
-            if (isStartVersion())
-                oldRow = unswap(retval, false);
-
-            // Prepare old value.
-            oldVal = val;
-
             // Possibly read value from store.
-            boolean readFromStore = false;
-
-            Object old0 = null;
-
-            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
-                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old0 = readThrough(null, key, false, subjId, taskName);
-
-                oldVal = cctx.toCacheObject(old0);
-
-                readFromStore = true;
-
-                // Detach value before index update.
-                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
-                // Calculate initial TTL and expire time.
-                long initTtl;
-                long initExpireTime;
-
-                if (expiryPlc != null && oldVal != null) {
-                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
-                    initTtl = initTtlAndExpireTime.get1();
-                    initExpireTime = initTtlAndExpireTime.get2();
-                }
-                else {
-                    initTtl = CU.TTL_ETERNAL;
-                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
-                }
+            boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
+
+            c = new AtomicCacheUpdateClosure(this,
+                newVer,
+                op,
+                writeObj,
+                invokeArgs,
+                readFromStore,
+                writeThrough,
+                keepBinary,
+                expiryPlc,
+                primary,
+                verCheck,
+                filter,
+                explicitTtl,
+                explicitExpireTime,
+                conflictVer,
+                conflictResolve,
+                intercept,
+                updateCntr);
+
+            key.valueBytes(cctx.cacheObjectContext());
 
-                if (oldVal != null)
-                    storeValue(oldVal, initExpireTime, ver, oldRow);
-                // else nothing to do, real old value was null.
-
-                update(oldVal, initExpireTime, initTtl, ver, true);
+            if (isNear()) {
+                CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
 
-                if (deletedUnlocked() && oldVal != null && !isInternal())
-                    deletedUnlocked(false);
+                c.call(dataRow);
             }
+            else
+                cctx.offheap().invoke(key, localPartition(), c);
 
-            Object transformClo = null;
-
-            // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
-            if (conflictResolve) {
-                GridCacheVersion oldConflictVer = version().conflictVersion();
-
-                // Cache is conflict-enabled.
-                if (cctx.conflictNeedResolve()) {
-                    GridCacheVersionedEntryEx newEntry;
-
-                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
-                        explicitTtl,
-                        explicitExpireTime);
-
-                    // Prepare old and new entries for conflict resolution.
-                    GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
-
-                    if (op == GridCacheOperation.TRANSFORM) {
-                        transformClo = writeObj;
+            GridCacheUpdateAtomicResult updateRes = c.updateRes;
 
-                        EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+            assert updateRes != null : c;
 
-                        oldVal = this.val;
+            CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
+            CacheObject updateVal = null;
+            GridCacheVersion updateVer = c.newVer;
 
-                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(),
-                            keepBinary, this);
+            // Apply metrics.
+            if (metrics &&
+                updateRes.outcome().updateReadMetrics() &&
+                cctx.cache().configuration().isStatisticsEnabled() &&
+                needVal) {
+                // PutIfAbsent methods must not update hit/miss statistics.
+                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+                    cctx.cache().metrics0().onRead(oldVal != null);
+            }
 
-                        try {
-                            Object computed = entryProcessor.process(entry, invokeArgs);
+            switch (updateRes.outcome()) {
+                case VERSION_CHECK_FAILED: {
+                    if (!cctx.isNear()) {
+                        CacheObject evtVal;
 
-                            if (entry.modified())
-                                writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
-                            else
-                                writeObj = oldVal;
+                        if (op == GridCacheOperation.TRANSFORM) {
+                            EntryProcessor<Object, Object, ?> entryProcessor =
+                                (EntryProcessor<Object, Object, ?>)writeObj;
 
-                            key0 = entry.key();
+                            CacheInvokeEntry<Object, Object> entry =
+                                new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
 
-                            if (computed != null)
-                                invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
-                        }
-                        catch (Exception e) {
-                            invokeRes = new IgniteBiTuple(null, e);
+                            try {
+                                entryProcessor.process(entry, invokeArgs);
 
-                            writeObj = oldVal;
+                                evtVal = entry.modified() ?
+                                    cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                            }
+                            catch (Exception ignore) {
+                                evtVal = prevVal;
+                            }
                         }
-                    }
-
-                    newEntry = new GridCacheLazyPlainVersionedEntry<>(
-                        cctx,
-                        key,
-                        (CacheObject)writeObj,
-                        expiration.get1(),
-                        expiration.get2(),
-                        conflictVer != null ? conflictVer : newVer,
-                        keepBinary);
-
-                    // Resolve conflict.
-                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
-
-                    assert conflictCtx != null;
-
-                    boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
-                    // Use old value?
-                    if (conflictCtx.isUseOld()) {
-                        GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+                        else
+                            evtVal = (CacheObject)writeObj;
 
-                        // Handle special case with atomic comparator.
-                        if (!isNew() &&                                                                       // Not initial value,
-                            verCheck &&                                                                       // and atomic version check,
-                            oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
-                            cctx.writeThrough() &&                                                            // and store is enabled,
-                            primary)                                                                          // and we are primary.
-                        {
-                            CacheObject val = this.val;
+                        long updateCntr0 = nextPartCounter();
 
-                            if (val == null) {
-                                assert deletedUnlocked();
+                        if (updateCntr != null)
+                            updateCntr0 = updateCntr;
 
-                                cctx.store().remove(null, key);
-                            }
-                            else
-                                cctx.store().put(null, key, val, ver);
-                        }
+                        onUpdateFinished(updateCntr0);
 
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? this.val : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
+                        cctx.continuousQueries().onEntryUpdated(
+                            key,
+                            evtVal,
+                            prevVal,
+                            isInternal() || !context().userCache(),
+                            partition(),
+                            primary,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0);
+                            updateCntr0,
+                            null,
+                            topVer);
                     }
-                    // Will update something.
-                    else {
-                        // Merge is a local update which override passed value bytes.
-                        if (conflictCtx.isMerge()) {
-                            writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
-
-                            conflictVer = null;
-                        }
-                        else
-                            assert conflictCtx.isUseNew();
 
-                        // Update value is known at this point, so update operation type.
-                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
-                    }
+                    return updateRes;
                 }
-                else
-                    // Nullify conflict version on this update, so that we will use regular version during next updates.
-                    conflictVer = null;
-            }
-
-            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
-            // Perform version check only in case there was no explicit conflict resolution.
-            if (conflictCtx == null) {
-                if (verCheck) {
-                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
-                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received entry update with same version as current (will update store) " +
-                                    "[entry=" + this + ", newVer=" + newVer + ']');
 
-                            CacheObject val = this.val;
+                case CONFLICT_USE_OLD:
+                case FILTER_FAILED:
+                case INVOKE_NO_OP:
+                case INTERCEPTOR_CANCEL:
+                    return updateRes;
+            }
 
-                            if (val == null) {
-                                assert deletedUnlocked();
+            assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
 
-                                cctx.store().remove(null, key);
-                            }
-                            else
-                                cctx.store().put(null, key, val, ver);
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Received entry update with smaller version than current (will ignore) " +
-                                    "[entry=" + this + ", newVer=" + newVer + ']');
-                        }
+            CacheObject evtOld = null;
 
-                        if (!cctx.isNear()) {
-                            CacheObject evtVal;
+            if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                assert writeObj instanceof EntryProcessor : writeObj;
 
-                            if (op == GridCacheOperation.TRANSFORM) {
-                                EntryProcessor<Object, Object, ?> entryProcessor =
-                                    (EntryProcessor<Object, Object, ?>)writeObj;
+                evtOld = cctx.unwrapTemporary(oldVal);
 
-                                CacheInvokeEntry<Object, Object> entry =
-                                    new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+                Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
 
-                                try {
-                                    entryProcessor.process(entry, invokeArgs);
+                cctx.events().addEvent(partition(),
+                    key,
+                    evtNodeId,
+                    null,
+                    newVer,
+                    EVT_CACHE_OBJECT_READ,
+                    evtOld, evtOld != null,
+                    evtOld, evtOld != null,
+                    subjId,
+                    transformClo.getClass().getName(),
+                    taskName,
+                    keepBinary);
+            }
 
-                                    evtVal = entry.modified() ?
-                                        cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
-                                }
-                                catch (Exception ignore) {
-                                    evtVal = prevVal;
-                                }
-                            }
-                            else
-                                evtVal = (CacheObject)writeObj;
-
-                            updateCntr0 = nextPartCounter(topVer);
-
-                            if (updateCntr != null)
-                                updateCntr0 = updateCntr;
-
-                            onUpdateFinished(updateCntr0);
-
-                            cctx.continuousQueries().onEntryUpdated(
-                                key,
-                                evtVal,
-                                prevVal,
-                                isInternal() || !context().userCache(),
-                                partition(),
-                                primary,
-                                false,
-                                updateCntr0,
-                                null,
-                                topVer);
-                        }
+            if (c.op == GridCacheOperation.UPDATE) {
+                updateVal = val;
 
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? this.val : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            0);
-                    }
-                }
-                else
-                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
-                        "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
-            }
+                assert updateVal != null : c;
 
-            // Apply metrics.
-            if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
-                // PutIfAbsent methods mustn't update hit/miss statistics
-                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
-                    cctx.cache().metrics0().onRead(oldVal != null);
-            }
+                drReplicate(drType, updateVal, updateVer, topVer);
 
-            // Check filter inside of synchronization.
-            if (!F.isEmptyOrNulls(filter)) {
-                boolean pass = cctx.isAllLocked(this, filter);
+                recordNodeId(affNodeId, topVer);
 
-                if (!pass) {
-                    if (expiryPlc != null && !readFromStore && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter))
-                        updateTtl(expiryPlc);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                    return new GridCacheUpdateAtomicResult(false,
-                        retval ? oldVal : null,
-                        null,
-                        invokeRes,
-                        CU.TTL_ETERNAL,
-                        CU.EXPIRE_TIME_ETERNAL,
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
                         null,
+                        newVer,
+                        EVT_CACHE_OBJECT_PUT,
+                        updateVal,
+                        true,
+                        evtOld,
+                        evtOld != null,
+                        subjId,
                         null,
-                        false,
-                        updateCntr0 == null ? 0 : updateCntr0);
+                        taskName,
+                        keepBinary);
                 }
             }
+            else {
+                assert c.op == GridCacheOperation.DELETE : c.op;
 
-            // Calculate new value in case we met transform.
-            if (op == GridCacheOperation.TRANSFORM) {
-                assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier.";
-
-                transformClo = writeObj;
-
-                EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
-
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this);
-
-                try {
-                    Object computed = entryProcessor.process(entry, invokeArgs);
-
-                    if (entry.modified()) {
-                        updated0 = cctx.unwrapTemporary(entry.getValue());
-                        updated = cctx.toCacheObject(updated0);
-                    }
-                    else
-                        updated = oldVal;
-
-                    key0 = entry.key();
+                clearReaders();
 
-                    if (computed != null)
-                        invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
-                }
-                catch (Exception e) {
-                    invokeRes = new IgniteBiTuple(null, e);
+                drReplicate(drType, null, newVer, topVer);
 
-                    updated = oldVal;
-                }
+                recordNodeId(affNodeId, topVer);
 
-                if (!entry.modified()) {
-                    if (expiryPlc != null && !readFromStore && hasValueUnlocked())
-                        updateTtl(expiryPlc);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                    return new GridCacheUpdateAtomicResult(false,
-                        retval ? oldVal : null,
-                        null,
-                        invokeRes,
-                        CU.TTL_ETERNAL,
-                        CU.EXPIRE_TIME_ETERNAL,
-                        null,
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
+                        null, newVer,
+                        EVT_CACHE_OBJECT_REMOVED,
+                        null, false,
+                        evtOld, evtOld != null,
+                        subjId,
                         null,
-                        false,
-                        updateCntr0 == null ? 0 : updateCntr0);
+                        taskName,
+                        keepBinary);
                 }
             }
-            else
-                updated = (CacheObject)writeObj;
-
-            op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
-
-            assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null);
-
-            boolean hadVal = hasValueUnlocked();
-
-            // Incorporate conflict version into new version if needed.
-            if (conflictVer != null && conflictVer != newVer)
-                newVer = new GridCacheVersionEx(newVer.topologyVersion(),
-                    newVer.globalTime(),
-                    newVer.order(),
-                    newVer.nodeOrder(),
-                    newVer.dataCenterId(),
-                    conflictVer);
 
-            if (op == GridCacheOperation.UPDATE) {
-                // Conflict context is null if there were no explicit conflict resolution.
-                if (conflictCtx == null) {
-                    // Calculate TTL and expire time for local update.
-                    if (explicitTtl != CU.TTL_NOT_CHANGED) {
-                        // If conflict existed, expire time must be explicit.
-                        assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
-
-                        newSysTtl = newTtl = explicitTtl;
-                        newSysExpireTime = explicitExpireTime;
-
-                        newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
-                            explicitExpireTime : CU.toExpireTime(explicitTtl);
-                    }
-                    else {
-                        newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
-                            hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+            if (updateRes.success())
+                updateMetrics(c.op, metrics);
 
-                        if (newSysTtl == CU.TTL_NOT_CHANGED) {
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                            newTtl = ttlExtras();
-                            newExpireTime = expireTimeExtras();
-                        }
-                        else if (newSysTtl == CU.TTL_ZERO) {
-                            op = GridCacheOperation.DELETE;
+            // Continuous query filter should be perform under lock.
+            if (lsnrs != null) {
+                CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
 
-                            newSysTtl = CU.TTL_NOT_CHANGED;
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                cctx.continuousQueries().onEntryUpdated(lsnrs,
+                    key,
+                    evtVal,
+                    evtOldVal,
+                    internal,
+                    partition(),
+                    primary,
+                    false,
+                    c.updateRes.updateCounter(),
+                    fut,
+                    topVer);
+            }
 
-                            newTtl = CU.TTL_ETERNAL;
-                            newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+            cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
 
-                            updated = null;
-                        }
-                        else {
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                            newTtl = newSysTtl;
-                            newExpireTime = CU.toExpireTime(newTtl);
-                        }
-                    }
+            if (intercept) {
+                if (c.op == GridCacheOperation.UPDATE) {
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        null,
+                        updateVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
                 else {
-                    newSysTtl = newTtl = conflictCtx.ttl();
-                    newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+                    assert c.op == GridCacheOperation.DELETE : c.op;
+
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        null,
+                        oldVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
             }
-            else {
-                assert op == GridCacheOperation.DELETE;
-
-                newSysTtl = CU.TTL_NOT_CHANGED;
-                newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
 
-                newTtl = CU.TTL_ETERNAL;
-                newExpireTime = CU.EXPIRE_TIME_ETERNAL;
-            }
+        onUpdateFinished(c.updateRes.updateCounter());
 
-            // TTL and expire time must be resolved at this point.
-            assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
-            assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
+        return c.updateRes;
+    }
 
-            IgniteBiTuple<Boolean, Object> interceptRes = null;
+    /**
+     * @param val Value.
+     * @param cacheObj Cache object.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy flag.
+     * @return Cache object value.
+     */
+    @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
+        if (val != null)
+            return val;
 
-            // Actual update.
-            if (op == GridCacheOperation.UPDATE) {
-                if (log.isTraceEnabled()) {
-                    log.trace("innerUpdate [key=" + key +
-                        ", entry=" + System.identityHashCode(this) + ']');
-                }
+        return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
+    }
 
-                if (intercept) {
-                    updated0 = value(updated0, updated, keepBinary, false);
+    /**
+     * @param expiry Expiration policy.
+     * @return Tuple holding initial TTL and expire time with the given expiry.
+     */
+    private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+        assert expiry != null;
 
-                    Object interceptorVal = cctx.config().getInterceptor()
-                        .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0);
-
-                    if (interceptorVal == null)
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? oldVal : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            updateCntr0 == null ? 0 : updateCntr0);
-                    else if (interceptorVal != updated0) {
-                        updated0 = cctx.unwrapTemporary(interceptorVal);
-
-                        updated = cctx.toCacheObject(updated0);
-                    }
-                }
-
-                // Try write-through.
-                if (writeThrough)
-                    // Must persist inside synchronization in non-tx mode.
-                    cctx.store().put(null, key, updated, newVer);
-
-                if (!hadVal) {
-                    boolean new0 = isNew();
-
-                    assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" +
-                        cctx.localNodeId() + ']';
-
-                    if (!new0 && !isInternal())
-                        deletedUnlocked(false);
-                }
-                else {
-                    assert !deletedUnlocked() : "Invalid entry [entry=" + this +
-                        ", locNodeId=" + cctx.localNodeId() + ']';
-
-                    // Do not change size.
-                }
-
-                updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
-
-                updateCntr0 = nextPartCounter(topVer);
-
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
-
-                logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
-
-                storeValue(updated, newExpireTime, newVer, oldRow);
-
-                update(updated, newExpireTime, newTtl, newVer, true);
-
-                drReplicate(drType, updated, newVer, topVer);
-
-                recordNodeId(affNodeId, topVer);
-
-                if (evt) {
-                    CacheObject evtOld = null;
-
-                    if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(oldVal);
-
-                        transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
-                            evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
-                            keepBinary);
-                    }
-
-                    if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
-                            evtOld != null || hadVal, subjId, null, taskName, keepBinary);
-                    }
-                }
-            }
-            else {
-                if (intercept) {
-                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
-                        oldVal, old0, keepBinary, updateCntr0));
-
-                    if (cctx.cancelRemove(interceptRes))
-                        return new GridCacheUpdateAtomicResult(false,
-                            cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            updateCntr0 == null ? 0 : updateCntr0);
-                }
-
-                if (writeThrough)
-                    // Must persist inside synchronization in non-tx mode.
-                    cctx.store().remove(null, key);
-
-                updateCntr0 = nextPartCounter(topVer);
-
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
-
-                logUpdate(op, null, newVer, 0, updateCntr0);
-
-                removeValue();
-
-                if (hadVal) {
-                    assert !deletedUnlocked();
-
-                    if (!isInternal())
-                        deletedUnlocked(true);
-                }
-                else {
-                    boolean new0 = isNew();
-
-                    assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" +
-                        cctx.localNodeId() + ']';
-
-                    if (new0) {
-                        if (!isInternal())
-                            deletedUnlocked(true);
-                    }
-                }
-
-                enqueueVer = newVer;
-
-                // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
-
-                assert newSysTtl == CU.TTL_NOT_CHANGED;
-                assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
-
-                clearReaders();
-
-                recordNodeId(affNodeId, topVer);
-
-                drReplicate(drType, null, newVer, topVer);
-
-                if (evt) {
-                    CacheObject evtOld = null;
-
-                    if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(oldVal);
-
-                        transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
-                            evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
-                            keepBinary);
-                    }
-
-                    if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
-                            EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
-                            subjId, null, taskName, keepBinary);
-                    }
-                }
-
-                res = hadVal;
-            }
-
-            if (res)
-                updateMetrics(op, metrics);
-
-            // Continuous query filter should be perform under lock.
-            if (lsnrs != null) {
-                CacheObject evtVal = cctx.unwrapTemporary(updated);
-                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
-
-                cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
-                    partition(), primary, false, updateCntr0, fut, topVer);
-            }
-
-            cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
-
-            if (intercept) {
-                if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
-                        cctx,
-                        key,
-                        key0,
-                        updated,
-                        updated0,
-                        keepBinary,
-                        updateCntr0));
-                else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
-                        cctx,
-                        key,
-                        key0,
-                        oldVal,
-                        old0,
-                        keepBinary,
-                        updateCntr0));
-
-                if (interceptRes != null)
-                    oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
-            }
-        }
-
-        onUpdateFinished(updateCntr0);
-
-        if (log.isDebugEnabled())
-            log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
-
-        return new GridCacheUpdateAtomicResult(res,
-            oldVal,
-            updated,
-            invokeRes,
-            newSysTtl,
-            newSysExpireTime,
-            enqueueVer,
-            conflictCtx,
-            true,
-            updateCntr0 == null ? 0 : updateCntr0);
-    }
-
-    /**
-     * @param val Value.
-     * @param cacheObj Cache object.
-     * @param keepBinary Keep binary flag.
-     * @param cpy Copy flag.
-     * @return Cache object value.
-     */
-    @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
-        if (val != null)
-            return val;
-
-        return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
-    }
-
-    /**
-     * @param expiry Expiration policy.
-     * @return Tuple holding initial TTL and expire time with the given expiry.
-     */
-    private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
-        assert expiry != null;
-
-        long initTtl = expiry.forCreate();
-        long initExpireTime;
+        long initTtl = expiry.forCreate();
+        long initExpireTime;
 
         if (initTtl == CU.TTL_ZERO) {
             initTtl = CU.TTL_MINIMUM;
@@ -2294,8 +1870,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param expireTime Explicit expire time.
      * @return Result.
      */
-    private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime)
-        throws GridCacheEntryRemovedException {
+    private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+        assert !obsolete();
+
         boolean rmv = false;
 
         // 1. If TTL is not changed, then calculate it based on expiry.
@@ -2313,7 +1890,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
         if (ttl == CU.TTL_NOT_CHANGED) {
-            if (isNew())
+            if (isStartVersion())
                 ttl = CU.TTL_ETERNAL;
             else {
                 ttl = ttlExtras();
@@ -3027,6 +2604,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @return Update counter.
+     */
+    protected long nextPartCounter() {
+        return 0;
+    }
+
+    /**
      * @param topVer Topology version.
      * @return Update counter.
      */
@@ -3566,13 +3150,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         assert Thread.holdsLock(this);
         assert val != null : "null values in update for key: " + key;
 
-        cctx.offheap().update(key,
-            val,
-            ver,
-            expireTime,
-            partition(),
-            localPartition(),
-            oldRow);
+        cctx.offheap().invoke(key,  localPartition(), new UpdateClosure(this, val, ver, expireTime));
     }
 
     /**
@@ -4177,6 +3755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         /**
          * @param key Key.
+         * @param keepBinary Keep binary flag.
          */
         private LazyValueEntry(KeyCacheObject key, boolean keepBinary) {
             this.key = key;
@@ -4223,4 +3802,854 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return "IteratorEntry [key=" + key + ']';
         }
     }
+
+    /**
+     *
+     */
+    private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private final CacheObject val;
+
+        /** */
+        private final GridCacheVersion ver;
+
+        /** */
+        private final long expireTime;
+
+        /** */
+        private CacheDataRow newRow;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        /** */
+        private IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT;
+
+        /**
+         * @param entry Entry.
+         * @param val New value.
+         * @param ver New version.
+         * @param expireTime New expire time.
+         */
+        UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+            this.entry = entry;
+            this.val = val;
+            this.ver = ver;
+            this.expireTime = expireTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            this.oldRow = oldRow;
+
+            if (oldRow != null)
+                oldRow.key(entry.key);
+
+            newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key,
+                val,
+                ver,
+                expireTime,
+                oldRow);
+
+            treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return newRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return treeOp;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public CacheDataRow oldRow() {
+            return oldRow;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private GridCacheVersion newVer;
+
+        /** */
+        private GridCacheOperation op;
+
+        /** */
+        private Object writeObj;
+
+        /** */
+        private Object[] invokeArgs;
+
+        /** */
+        private final boolean readThrough;
+
+        /** */
+        private final boolean writeThrough;
+
+        /** */
+        private final boolean keepBinary;
+
+        /** */
+        private final IgniteCacheExpiryPolicy expiryPlc;
+
+        /** */
+        private final boolean primary;
+
+        /** */
+        private final boolean verCheck;
+
+        /** */
+        private final CacheEntryPredicate[] filter;
+
+        /** */
+        private final long explicitTtl;
+
+        /** */
+        private final long explicitExpireTime;
+
+        /** */
+        private GridCacheVersion conflictVer;
+
+        /** */
+        private final boolean conflictResolve;
+
+        /** */
+        private final boolean intercept;
+
+        /** */
+        private final Long updateCntr;
+
+        /** */
+        private GridCacheUpdateAtomicResult updateRes;
+
+        /** */
+        private IgniteTree.OperationType treeOp;
+
+        /** */
+        private CacheDataRow newRow;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+            GridCacheVersion newVer,
+            GridCacheOperation op,
+            Object writeObj,
+            Object[] invokeArgs,
+            boolean readThrough,
+            boolean writeThrough,
+            boolean keepBinary,
+            @Nullable IgniteCacheExpiryPolicy expiryPlc,
+            boolean primary,
+            boolean verCheck,
+            @Nullable CacheEntryPredicate[] filter,
+            long explicitTtl,
+            long explicitExpireTime,
+            @Nullable GridCacheVersion conflictVer,
+            boolean conflictResolve,
+            boolean intercept,
+            @Nullable Long updateCntr) {
+            assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
+
+            this.entry = entry;
+            this.newVer = newVer;
+            this.op = op;
+            this.writeObj = writeObj;
+            this.invokeArgs = invokeArgs;
+            this.readThrough = readThrough;
+            this.writeThrough = writeThrough;
+            this.keepBinary = keepBinary;
+            this.expiryPlc = expiryPlc;
+            this.primary = primary;
+            this.verCheck = verCheck;
+            this.filter = filter;
+            this.explicitTtl = explicitTtl;
+            this.explicitExpireTime = explicitExpireTime;
+            this.conflictVer = conflictVer;
+            this.conflictResolve = conflictResolve;
+            this.intercept = intercept;
+            this.updateCntr = updateCntr;
+
+            switch (op) {
+                case UPDATE:
+                    treeOp = IgniteTree.OperationType.PUT;
+
+                    break;
+
+                case DELETE:
+                    treeOp = IgniteTree.OperationType.REMOVE;
+
+                    break;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public CacheDataRow oldRow() {
+            return oldRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return newRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return treeOp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow;
+
+            if (oldRow != null)
+                oldRow.key(entry.key());
+
+            this.oldRow = oldRow;
+
+            GridCacheContext cctx = entry.context();
+
+            CacheObject oldVal;
+            CacheObject storeLoadedVal = null;
+
+            if (oldRow != null) {
+                oldVal = oldRow.value();
+
+                entry.update(oldVal, oldRow.expireTime(), 0, oldRow.version(), false);
+            }
+            else
+                oldVal = null;
+
+            if (oldVal == null && readThrough) {
+                storeLoadedVal = cctx.toCacheObject(cctx.store().load(null, entry.key));
+
+                if (storeLoadedVal != null) {
+                    oldVal = cctx.kernalContext().cacheObjects().prepareForCache(storeLoadedVal, cctx);
+
+                    entry.val = oldVal;
+
+                    if (entry.deletedUnlocked())
+                        entry.deletedUnlocked(false);
+                }
+            }
+
+            CacheInvokeEntry<Object, Object> invokeEntry = null;
+            IgniteBiTuple<Object, Exception> invokeRes = null;
+
+            boolean invoke = op == TRANSFORM;
+
+            if (invoke) {
+                invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
+
+                invokeRes = runEntryProcessor(invokeEntry);
+
+                op = writeObj == null ? DELETE : UPDATE;
+            }
+
+            CacheObject newVal = (CacheObject)writeObj;
+
+            GridCacheVersionConflictContext<?, ?> conflictCtx = null;
+
+            if (conflictResolve) {
+                conflictCtx = resolveConflict(newVal, invokeRes);
+
+                if (updateRes != null) {
+                    assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx;
+                    assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+                    return;
+                }
+            }
+
+            if (conflictCtx == null) {
+                // Perform version check only in case there was no explicit conflict resolution.
+                versionCheck(invokeRes);
+
+                if (updateRes != null) {
+                    assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+                    return;
+                }
+            }
+
+            if (!F.isEmptyOrNulls(filter)) {
+                boolean pass = cctx.isAllLocked(entry, filter);
+
+                if (!pass) {
+                    initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter));
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+            }
+
+            if (invoke) {
+                if (!invokeEntry.modified()) {
+                    initResultOnCancelUpdate(storeLoadedVal, true);
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+
+                op = writeObj == null ? DELETE : UPDATE;
+            }
+
+            // Incorporate conflict version into new version if needed.
+            if (conflictVer != null && conflictVer != newVer) {
+                newVer = new GridCacheVersionEx(newVer.topologyVersion(),
+                    newVer.globalTime(),
+                    newVer.order(),
+                    newVer.nodeOrder(),
+                    newVer.dataCenterId(),
+                    conflictVer);
+            }
+
+            if (op == UPDATE) {
+                assert writeObj != null;
+
+                update(conflictCtx, invokeRes, storeLoadedVal != null);
+            }
+            else {
+                assert op == DELETE && writeObj == null : op;
+
+                remove(conflictCtx, invokeRes, storeLoadedVal != null);
+            }
+
+            assert updateRes != null && treeOp != null;
+        }
+
+        /**
+         * @param storeLoadedVal Value loaded from store.
+         * @param updateExpireTime {@code True} if need update expire time.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime)
+            throws IgniteCheckedException {
+            boolean needUpdate = false;
+
+            if (storeLoadedVal != null) {
+                long initTtl;
+                long initExpireTime;
+
+                if (expiryPlc != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+
+                entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true);
+
+                needUpdate = true;
+            }
+            else if (updateExpireTime && expiryPlc != null && entry.val != null){
+                long ttl = expiryPlc.forAccess();
+
+                if (ttl != CU.TTL_NOT_CHANGED) {
+                    long expireTime;
+
+                    if (ttl == CU.TTL_ZERO) {
+                        ttl = CU.TTL_MINIMUM;
+                        expireTime = CU.expireTimeInPast();
+                    }
+                    else
+                        expireTime = CU.toExpireTime(ttl);
+
+                    if (entry.expireTimeExtras() != expireTime) {
+                        entry.update(entry.val, expireTime, ttl, entry.ver, true);
+
+                        expiryPlc.ttlUpdated(entry.key, entry.ver, null);
+
+                        needUpdate = true;
+                    }
+                }
+            }
+
+            if (needUpdate) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    storeLoadedVal,
+                    newVer,
+                    entry.expireTimeExtras(),
+                    oldRow);
+
+                treeOp = IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.NOOP;
+        }
+
+        /**
+         * @param conflictCtx Conflict context.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            final CacheObject oldVal = entry.val;
+            CacheObject updated = (CacheObject)writeObj;
+
+            long newSysTtl;
+            long newSysExpireTime;
+
+            long newTtl;
+            long newExpireTime;
+
+            // Conflict context is null if there were no explicit conflict resolution.
+            if (conflictCtx == null) {
+                // Calculate TTL and expire time for local update.
+                if (explicitTtl != CU.TTL_NOT_CHANGED) {
+                    // If conflict existed, expire time must be explicit.
+                    assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
+                    newSysTtl = newTtl = explicitTtl;
+                    newSysExpireTime = explicitExpireTime;
+
+                    newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
+                        explicitExpireTime : CU.toExpireTime(explicitTtl);
+                }
+                else {
+                    newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
+                        entry.val != null ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+
+                    if (newSysTtl == CU.TTL_NOT_CHANGED) {
+                        newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                        newTtl = entry.ttlExtras();
+                        newExpireTime = entry.expireTimeExtras();
+                    }
+                    else if (newSysTtl == CU.TTL_ZERO) {
+                        op = GridCacheOperation.DELETE;
+
+                        writeObj = null;
+
+                        remove(conflictCtx, invokeRes, readFromStore);
+
+                        return;
+                    }
+                    else {
+                        newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                        newTtl = newSysTtl;
+                        newExpireTime = CU.toExpireTime(newTtl);
+                    }
+                }
+            }
+            else {
+                newSysTtl = newTtl = conflictCtx.ttl();
+                newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+            }
+
+            if (intercept) {
+                Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false);
+
+                CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
+                    entry.key,
+                    null,
+                    oldVal,
+                    null,
+                    keepBinary);
+
+                Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0);
+
+                if (interceptorVal == null) {
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+                else if (interceptorVal != updated0) {
+                    updated0 = cctx.unwrapTemporary(interceptorVal);
+
+                    updated = cctx.toCacheObject(updated0);
+                }
+            }
+
+            updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
+
+            if (writeThrough)
+                // Must persist inside synchronization in non-tx mode.
+                cctx.store().put(null, entry.key, updated, newVer);
+
+            if (entry.val == null) {
+                boolean new0 = entry.isStartVersion();
+
+                assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+
+                if (!new0 && !entry.isInternal())
+                    entry.deletedUnlocked(false);
+            }
+            else {
+                assert !entry.deletedUnlocked() : "Invalid entry [entry=" + this +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+            }
+
+            long updateCntr0 = entry.nextPartCounter();
+
+            if (updateCntr != null)
+                updateCntr0 = updateCntr;
+
+            entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+
+            if (!entry.isNear()) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    updated,
+                    newVer,
+                    newExpireTime,
+                    oldRow);
+
+                treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                    IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.PUT;
+
+            entry.update(updated, newExpireTime, newTtl, newVer, true);
+
+            updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
+                oldVal,
+                updated,
+                invokeRes,
+                newSysTtl,
+                newSysExpireTime,
+                null,
+                conflictCtx,
+                updateCntr0);
+        }
+
+        /**
+         * @param conflictCtx Conflict context.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+         * @throws IgniteCheckedException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            CacheObject oldVal = entry.val;
+
+            IgniteBiTuple<Boolean, Object> interceptRes = null;
+
+            if (intercept) {
+                CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
+                    entry.key,
+                    null,
+                    oldVal,
+                    null,
+                    keepBinary);
+
+                interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
+
+                if (cctx.cancelRemove(interceptRes)) {
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+                        cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+            }
+
+            if (writeThrough)
+                // Must persist inside synchronization in non-tx mode.
+                cctx.store().remove(null, entry.key);
+
+            long updateCntr0 = entry.nextPartCounter();
+
+            if (updateCntr != null)
+                updateCntr0 = updateCntr;
+
+            if (oldVal != null) {
+                assert !entry.deletedUnlocked();
+
+                if (!entry.isInternal())
+                    entry.deletedUnlocked(true);
+            }
+            else {
+                boolean new0 = entry.isStartVersion();
+
+                assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + this +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+
+                if (new0) {
+                    if (!entry.isInternal())
+                        entry.deletedUnlocked(true);
+                }
+            }
+
+            GridCacheVersion enqueueVer = newVer;
+
+            entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
+
+            treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP :
+                IgniteTree.OperationType.REMOVE;
+
+            UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
+
+            if (interceptRes != null)
+                oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+
+            updateRes = new GridCacheUpdateAtomicResult(outcome,
+                oldVal,
+                null,
+                invokeRes,
+                CU.TTL_NOT_CHANGED,
+                CU.EXPIRE_TIME_CALCULATE,
+                enqueueVer,
+                conflictCtx,
+                updateCntr0);
+        }
+
+        /**
+         * @param newVal New entry value.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @return Conflict context.
+         * @throws IgniteCheckedException If failed.
+         */
+        private GridCacheVersionConflictContext<?, ?> resolveConflict(
+            CacheObject newVal,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            // Cache is conflict-enabled.
+            if (cctx.conflictNeedResolve()) {
+                GridCacheVersion oldConflictVer = entry.ver.conflictVersion();
+
+                // Prepare old and new entries for conflict resolution.
+                GridCacheVersionedEntryEx oldEntry = new GridCacheLazyPlainVersionedEntry<>(cctx,
+                    entry.key,
+                    entry.val,
+                    entry.ttlExtras(),
+                    entry.expireTimeExtras(),
+                    entry.ver.conflictVersion(),
+                    entry.isStartVersion(),
+                    keepBinary);
+
+                GridTuple3<Long, Long, Boolean> expiration = entry.ttlAndExpireTime(expiryPlc,
+                    explicitTtl,
+                    explicitExpireTime);
+
+                GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry<>(
+                    cctx,
+                    entry.key,
+                    newVal,
+                    expiration.get1(),
+                    expiration.get2(),
+                    conflictVer != null ? conflictVer : newVer,
+                    keepBinary);
+
+                // Resolve conflict.
+                GridCacheVersionConflictContext<?, ?> conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+                assert conflictCtx != null;
+
+                boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+                // Use old value?
+                if (conflictCtx.isUseOld()) {
+                    GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+
+                    // Handle special case with atomic comparator.
+                    if (!entry.isStartVersion() &&                                                        // Not initial value,
+                        verCheck &&                                                                       // and atomic version check,
+                        oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
+                        ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+                        cctx.writeThrough() &&                                                            // and store is enabled,
+                        primary)                                                                          // and we are primary.
+                    {
+                        CacheObject val = entry.val;
+
+                        if (val == null) {
+                            assert entry.deletedUnlocked();
+
+                            cctx.store().remove(null, entry.key);
+                        }
+                        else
+                            cctx.store().put(null, entry.key, val, entry.ver);
+                    }
+
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.CONFLICT_USE_OLD,
+                        entry.val,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+                }
+                // Will update something.
+                else {
+                    // Merge is a local update which override passed value bytes.
+                    if (conflictCtx.isMerge()) {
+                        writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
+
+                        conflictVer = null;
+                    }
+                    else
+                        assert conflictCtx.isUseNew();
+
+                    // Update value is known at this point, so update operation type.
+                    op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+                }
+
+                return conflictCtx;
+            }
+            else
+                // Nullify conflict version on this update, so that we will use regular version during next updates.
+                conflictVer = null;
+
+            return null;
+        }
+
+        /**
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @throws IgniteCheckedException If failed.
+         */
+        private void versionCheck(@Nullable IgniteBiTuple<Object, Exception> invokeRes) throws IgniteCheckedException {
+            GridCacheContext cctx = entry.context();
+
+            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+            if (verCheck) {
+                if (!entry.isStartVersion() && ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) >= 0) {
+                    if (ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received entry update with same version as current (will update store) " +
+                                "[entry=" + this + ", newVer=" + newVer + ']');
+
+                        CacheObject val = entry.val;
+
+                        if (val == null) {
+                            assert entry.deletedUnlocked();
+
+                            cctx.store().remove(null, entry.key);
+                        }
+                        else
+                            cctx.store().put(null, entry.key, val, entry.ver);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Received entry update with smaller version than current (will ignore) " +
+                                "[entry=" + this + ", newVer=" + newVer + ']');
+                    }
+
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.VERSION_CHECK_FAILED,
+                        entry.val,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+                }
+            }
+            else
+                assert entry.isStartVersion() || ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) <= 0 :
+                    "Invalid version for inner update [isNew=" + entry.isStartVersion() + ", entry=" + this + ", newVer=" + newVer + ']';
+        }
+
+        /**
+         * @param invokeEntry Entry for {@link EntryProcessor}.
+         * @return Entry processor return value.
+         */
+        @SuppressWarnings("unchecked")
+        private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) {
+            EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+
+            try {
+                Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+                if (invokeEntry.modified()) {
+                    GridCacheContext cctx = entry.context();
+
+                    writeObj = cctx.toCacheObject(cctx.unwrapTemporary(invokeEntry.getValue()));
+                }
+                else
+                    writeObj = invokeEntry.valObj;
+
+                if (computed != null)
+                    return new IgniteBiTuple<>(entry.cctx.unwrapTemporary(computed), null);
+
+                return null;
+            }
+            catch (Exception e) {
+                writeObj = invokeEntry.valObj;
+
+                return new IgniteBiTuple<>(null, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AtomicCacheUpdateClosure.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 2355b7c..97cb534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -29,8 +29,8 @@ import org.jetbrains.annotations.Nullable;
  * Cache entry atomic update result.
  */
 public class GridCacheUpdateAtomicResult {
-    /** Success flag.*/
-    private final boolean success;
+    /** Update operation outcome. */
+    private final UpdateOutcome outcome;
 
     /** Old value. */
     @GridToStringInclude
@@ -54,9 +54,6 @@ public class GridCacheUpdateAtomicResult {
     @GridToStringInclude
     private final GridCacheVersionConflictContext<?, ?> conflictRes;
 
-    /** Whether update should be propagated to DHT node. */
-    private final boolean sndToDht;
-
     /** */
     private final long updateCntr;
 
@@ -66,7 +63,7 @@ public class GridCacheUpdateAtomicResult {
     /**
      * Constructor.
      *
-     * @param success Success flag.
+     * @param outcome Update outcome.
      * @param oldVal Old value.
      * @param newVal New value.
      * @param res Value computed by the {@link EntryProcessor}.
@@ -74,10 +71,9 @@ public class GridCacheUpdateAtomicResult {
      * @param conflictExpireTime Explicit DR expire time (if any).
      * @param rmvVer Version for deferred delete.
      * @param conflictRes DR resolution result.
-     * @param sndToDht Whether update should be propagated to DHT node.
      * @param updateCntr Partition update counter.
      */
-    public GridCacheUpdateAtomicResult(boolean success,
+    GridCacheUpdateAtomicResult(UpdateOutcome outcome,
         @Nullable CacheObject oldVal,
         @Nullable CacheObject newVal,
         @Nullable IgniteBiTuple<Object, Exception> res,
@@ -85,9 +81,10 @@ public class GridCacheUpdateAtomicResult {
         long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
-        boolean sndToDht,
         long updateCntr) {
-        this.success = success;
+        assert outcome != null;
+
+        this.outcome = outcome;
         this.oldVal = oldVal;
         this.newVal = newVal;
         this.res = res;
@@ -95,11 +92,17 @@ public class GridCacheUpdateAtomicResult {
         this.conflictExpireTime = conflictExpireTime;
         this.rmvVer = rmvVer;
         this.conflictRes = conflictRes;
-        this.sndToDht = sndToDht;
         this.updateCntr = updateCntr;
     }
 
     /**
+     * @return Update operation outcome.
+     */
+    UpdateOutcome outcome() {
+        return outcome;
+    }
+
+    /**
      * @return Value computed by the {@link EntryProcessor}.
      */
     @Nullable public IgniteBiTuple<Object, Exception> computedResult() {
@@ -110,7 +113,7 @@ public class GridCacheUpdateAtomicResult {
      * @return Success flag.
      */
     public boolean success() {
-        return success;
+        return outcome.success();
     }
 
     /**
@@ -167,7 +170,74 @@ public class GridCacheUpdateAtomicResult {
      * @return Whether update should be propagated to DHT node.
      */
     public boolean sendToDht() {
-        return sndToDht;
+        return outcome.sendToDht();
+    }
+
+    /**
+     *
+     */
+    public enum UpdateOutcome {
+        /** */
+        CONFLICT_USE_OLD(false, false, false),
+
+        /** */
+        VERSION_CHECK_FAILED(false, false, false),
+
+        /** */
+        FILTER_FAILED(false, false, true),
+
+        /** */
+        INVOKE_NO_OP(false, false, true),
+
+        /** */
+        INTERCEPTOR_CANCEL(false, false, true),
+
+        /** */
+        REMOVE_NO_VAL(false, true, true),
+
+        /** */
+        SUCCESS(true, true, true);
+
+        /** */
+        private final boolean success;
+
+        /** */
+        private final boolean sndToDht;
+
+        /** */
+        private final boolean updateReadMetrics;
+
+        /**
+         * @param success Success flag.
+         * @param sndToDht Whether update should be propagated to DHT node.
+         * @param updateReadMetrics Metrics update flag.
+         */
+        UpdateOutcome(boolean success, boolean sndToDht, boolean updateReadMetrics) {
+            this.success = success;
+            this.sndToDht = sndToDht;
+            this.updateReadMetrics = updateReadMetrics;
+        }
+
+        /**
+         * @return Success flag.
+         */
+        public boolean success() {
+            return success;
+        }
+
+        /**
+         * @return Whether update should be propagated to DHT node.
+         */
+        public boolean sendToDht() {
+            return sndToDht;
+        }
+
+        /**
+         * @return Metrics update flag.
+         */
+        public boolean updateReadMetrics() {
+            return updateReadMetrics;
+        }
     }
 
     /** {@inheritDoc} */


Mime
View raw message