ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [37/46] incubator-ignite git commit: # IGNITE-283: Fixed conflict resolution.
Date Wed, 25 Feb 2015 00:37:00 GMT
# IGNITE-283: Fixed conflict resolution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45ce8147
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45ce8147
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45ce8147

Branch: refs/heads/ignite-sql-tests
Commit: 45ce81472b47a4c2fe15489ef203e2c7502ae996
Parents: fdd150b
Author: vozerov <vozerov@gridgain.com>
Authored: Tue Feb 24 16:00:46 2015 +0300
Committer: vozerov <vozerov@gridgain.com>
Committed: Tue Feb 24 16:02:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 562 +++++++++++--------
 .../GridDistributedTxRemoteAdapter.java         |  66 +--
 .../dht/atomic/GridDhtAtomicCache.java          |  44 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  84 +--
 .../distributed/near/GridNearAtomicCache.java   |   3 -
 .../cache/transactions/IgniteTxAdapter.java     |  58 +-
 .../cache/transactions/IgniteTxEntry.java       |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |  40 +-
 .../cache/version/GridCacheVersion.java         |   3 +-
 .../GridCacheVersionConflictContext.java        |  35 +-
 .../cache/version/GridCacheVersionEx.java       |   2 +-
 .../dr/GridDrDataLoadCacheUpdater.java          |   6 +-
 12 files changed, 500 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 18fce53..db7272b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -802,18 +802,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 evt = false;
             }
 
-            if (ret != null && expiryPlc != null) {
-                long ttl = expiryPlc.forAccess();
-
-                if (ttl != CU.TTL_NOT_CHANGED) {
-                    updateTtl(ttl);
-
-                    expiryPlc.ttlUpdated(key(),
-                        getOrMarshalKeyBytes(),
-                        version(),
-                        hasReaders() ? ((GridDhtCacheEntry)this).readers() : null);
-                }
-            }
+            if (ret != null && expiryPlc != null)
+                updateTtl(expiryPlc);
         }
 
         if (ret != null)
@@ -1367,12 +1357,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
                 if (!pass) {
-                    if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) {
-                        long ttl = CU.toTtl(expiryPlc.getExpiryForAccess());
-
-                        if (ttl != CU.TTL_NOT_CHANGED)
-                            updateTtl(ttl);
-                    }
+                    if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked())
+                        updateTtl(expiryPlc);
 
                     return new T3<>(false, retval ? old : null, null);
                 }
@@ -1409,12 +1395,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 }
 
                 if (!entry.modified()) {
-                    if (expiryPlc != null && !readThrough && hasValueUnlocked()) {
-                        long newTtl = CU.toTtl(expiryPlc.getExpiryForAccess());
-
-                        if (newTtl != CU.TTL_NOT_CHANGED)
-                            updateTtl(newTtl);
-                    }
+                    if (expiryPlc != null && !readThrough && hasValueUnlocked())
+                        updateTtl(expiryPlc);
 
                     return new GridTuple3<>(false, null, invokeRes);
                 }
@@ -1512,7 +1494,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
 
-                update(null, null, 0, 0, ver);
+                update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);
 
                 if (evt) {
                     V evtOld = null;
@@ -1555,6 +1537,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate(
         GridCacheVersion newVer,
         UUID evtNodeId,
@@ -1572,8 +1555,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         boolean verCheck,
         @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
-        long conflictTtl,
-        long conflictExpireTime,
+        long explicitTtl,
+        long explicitExpireTime,
         @Nullable GridCacheVersion conflictVer,
         boolean conflictResolve,
         boolean intercept,
@@ -1582,21 +1565,24 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
-        V old;
-
         boolean res = true;
 
+        V oldVal;
         V updated;
 
         GridCacheVersion enqueueVer = null;
 
-        GridCacheVersionConflictContext<K, V> drRes = null;
+        GridCacheVersionConflictContext<K, V> conflictCtx = null;
 
         EntryProcessorResult<?> invokeRes = null;
 
-        long newTtl = -1L;
-        long newExpireTime = 0L;
-        long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
+        // System TTL/ET which may have special values.
+        long newSysTtl;
+        long newSysExpireTime;
+
+        // TTL/ET which will be passed to entry on update.
+        long newTtl;
+        long newExpireTime;
 
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1609,56 +1595,47 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
             Object transformClo = null;
 
+            // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
             if (conflictResolve) {
-                GridCacheVersion oldDrVer = version().conflictVersion();
-
-                boolean drNeedResolve = cctx.conflictNeedResolve();
-
-                if (drNeedResolve) {
-                    // Get old value.
-                    V oldVal = rawGetOrUnmarshalUnlocked(true);
+                GridCacheVersion oldConflictVer = version().conflictVersion();
 
+                // Cache is conflict-enabled.
+                if (cctx.conflictNeedResolve()) {
+                    // Get new value, optionally unmarshalling and/or transforming it.
                     if (writeObj == null && valBytes != null)
                         writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
 
                     if (op == GridCacheOperation.TRANSFORM) {
                         transformClo = writeObj;
 
-                        writeObj = ((IgniteClosure<V, V>)writeObj).apply(oldVal);
-                    }
-
-                    K k = key();
-
-                    if (conflictTtl >= 0L) {
-                        // DR TTL is set explicitly
-                        assert conflictExpireTime >= 0L;
-
-                        newTtl = conflictTtl;
-                        newExpireTime = conflictExpireTime;
+                        writeObj = ((IgniteClosure<V, V>)writeObj).apply(rawGetOrUnmarshalUnlocked(true));
+                        valBytes = null;
                     }
-                    else {
-                        long ttl = expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L;
 
-                        newTtl = ttl < 0 ? ttlExtras() : ttl;
-                        newExpireTime = CU.toExpireTime(newTtl);
-                    }
+                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc, explicitTtl,
+                        explicitExpireTime);
 
+                    // Prepare old and new entries for conflict resolution.
                     GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry();
-                    GridCacheVersionedEntryEx<K, V> newEntry =
-                        new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, conflictVer);
+                    GridCacheVersionedEntryEx<K, V> newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj,
+                        expiration.get1(), expiration.get2(), conflictVer != null ? conflictVer : newVer);
+
+                    // Resolve conflict.
+                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
 
-                    drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+                    assert conflictCtx != null;
 
-                    assert drRes != null;
+                    // Use old value?
+                    if (conflictCtx.isUseOld()) {
+                        GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
 
-                    if (drRes.isUseOld()) {
                         // Handle special case with atomic comparator.
-                        if (!isNew() &&                                                  // Not initial value,
-                            verCheck &&                                                  // and atomic version check,
-                            oldDrVer.dataCenterId() == conflictVer.dataCenterId() &&     // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldDrVer, conflictVer) == 0 && // and both versions are equal,
-                            cctx.writeThrough() &&                                       // and store is enabled,
-                            primary)                                                     // and we are primary.
+                        if (!isNew() &&                                                           // Not initial value,
+                            verCheck &&                                                           // and atomic version check,
+                            oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&     // and data centers are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal,
+                            cctx.writeThrough() &&                                                // and store is enabled,
+                            primary)                                                              // and we are primary.
                         {
                             V val = rawGetOrUnmarshalUnlocked(false);
 
@@ -1671,47 +1648,39 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                                 cctx.store().putToStore(null, key(), val, ver);
                         }
 
-                        old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
-
                         return new GridCacheUpdateAtomicResult<>(false,
-                            old,
+                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
                             null,
                             invokeRes,
-                            0L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
                     }
-                    else if (drRes.isUseNew())
-                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+                    // Will update something.
                     else {
-                        assert drRes.isMerge();
-
-                        writeObj = drRes.mergeValue();
-                        valBytes = null;
+                        // Merge is a local update which override passed value bytes.
+                        if (conflictCtx.isMerge()) {
+                            writeObj = conflictCtx.mergeValue();
+                            valBytes = null;
 
-                        conflictVer = null; // Update will be considered as local.
+                            conflictVer = null;
+                        }
+                        else
+                            assert conflictCtx.isUseNew();
 
+                        // Update value is known at this point, so update operation type.
                         op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
                     }
-
-                    newTtl = drRes.ttl();
-                    newExpireTime = drRes.expireTime();
-
-                    // Explicit DR expire time will be passed to remote node only in that case.
-                    if (!drRes.explicitTtl() && !drRes.isMerge()) {
-                        if (drRes.isUseNew() && newEntry.dataCenterId() != cctx.dataCenterId() ||
-                            drRes.isUseOld() && oldEntry.dataCenterId() != cctx.dataCenterId())
-                            newDrExpireTime = drRes.expireTime();
-                    }
                 }
                 else
-                    // Nullify DR version on this update, so that we will use regular version during next updates.
+                    // Nullify conflict version on this update, so that we will use regular version during next updates.
                     conflictVer = null;
             }
 
-            if (drRes == null) { // Perform version check only in case there will be no explicit conflict resolution.
+            // Perform version check only in case there was no explicit conflict resolution.
+            if (conflictCtx == null) {
                 if (verCheck) {
                     if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer) >= 0) {
                         if (ATOMIC_VER_COMPARATOR.compare(ver, newVer) == 0 && cctx.writeThrough() && primary) {
@@ -1735,14 +1704,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                                     "[entry=" + this + ", newVer=" + newVer + ']');
                         }
 
-                        old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
-
                         return new GridCacheUpdateAtomicResult<>(false,
-                            old,
+                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
                             null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -1753,46 +1720,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']';
             }
 
-            // Possibly get old value form store.
-            old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
-
-            GridCacheValueBytes oldBytes = valueBytesUnlocked();
+            // Prepare old value and value bytes.
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            GridCacheValueBytes oldValBytes = valueBytesUnlocked();
 
+            // Possibly read value from store.
             boolean readThrough = false;
 
-            if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old = readThrough(null, key, false, subjId, taskName);
+            if (needVal && oldVal == null && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+                oldVal = readThrough(null, key, false, subjId, taskName);
 
                 readThrough = true;
 
                 // Detach value before index update.
                 if (cctx.portableEnabled())
-                    old = (V)cctx.kernalContext().portable().detachPortable(old);
+                    oldVal = (V)cctx.kernalContext().portable().detachPortable(oldVal);
 
-                long ttl = 0;
-                long expireTime = 0;
+                // Calculate initial TTL and expire time.
+                long initTtl;
+                long initExpireTime;
 
-                if (expiryPlc != null && old != null) {
-                    ttl = expiryPlc.forCreate();
+                if (expiryPlc != null && oldVal != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
 
-                    if (ttl == CU.TTL_ZERO) {
-                        ttl = 1;
-                        expireTime = U.currentTimeMillis() - 1;
-                    }
-                    else if (ttl == CU.TTL_NOT_CHANGED)
-                        ttl = 0;
-                    else
-                        expireTime = CU.toExpireTime(ttl);
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
                 }
 
-                if (old != null)
-                    updateIndex(old, null, expireTime, ver, null);
+                if (oldVal != null)
+                    updateIndex(oldVal, null, initExpireTime, ver, null);
                 else
                     clearIndex(null);
 
-                update(old, null, expireTime, ttl, ver);
+                update(oldVal, null, initExpireTime, initTtl, ver);
 
-                if (deletedUnlocked() && old != null && !isInternal())
+                if (deletedUnlocked() && oldVal != null && !isInternal())
                     deletedUnlocked(false);
             }
 
@@ -1800,7 +1767,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
                 // PutIfAbsent methods mustn't update hit/miss statistics
                 if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray())
-                    cctx.cache().metrics0().onRead(old != null);
+                    cctx.cache().metrics0().onRead(oldVal != null);
             }
 
             // Check filter inside of synchronization.
@@ -1808,38 +1775,30 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 boolean pass = cctx.isAll(wrapFilterLocked(), filter);
 
                 if (!pass) {
-                    if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray()) {
-                        newTtl = expiryPlc.forAccess();
-
-                        if (newTtl != CU.TTL_NOT_CHANGED) {
-                            updateTtl(newTtl);
-
-                            expiryPlc.ttlUpdated(key,
-                                getOrMarshalKeyBytes(),
-                                version(),
-                                hasReaders() ? ((GridDhtCacheEntry<K, V>)this).readers() : null);
-                        }
-                    }
+                    if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray())
+                        updateTtl(expiryPlc);
 
                     return new GridCacheUpdateAtomicResult<>(false,
-                        retval ? old : null,
+                        retval ? oldVal : null,
                         null,
                         invokeRes,
-                        -1L,
-                        -1L,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
                         false);
                 }
             }
 
-            // Calculate new value.
+            // Calculate new value in case we met transform.
             if (op == GridCacheOperation.TRANSFORM) {
+                assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier.";
+
                 transformClo = writeObj;
 
                 EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
 
-                CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, old);
+                CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, oldVal);
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);
@@ -1857,31 +1816,21 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 catch (Exception e) {
                     invokeRes = new CacheInvokeResult<>(e);
 
-                    updated = old;
+                    updated = oldVal;
 
-                    valBytes = oldBytes.getIfMarshaled();
+                    valBytes = oldValBytes.getIfMarshaled();
                 }
 
                 if (!entry.modified()) {
-                    if (expiryPlc != null && !readThrough && hasValueUnlocked()) {
-                        newTtl = expiryPlc.forAccess();
-
-                        if (newTtl != CU.TTL_NOT_CHANGED) {
-                            updateTtl(newTtl);
-
-                            expiryPlc.ttlUpdated(key,
-                                getOrMarshalKeyBytes(),
-                                version(),
-                                hasReaders() ? ((GridDhtCacheEntry<K, V>)this).readers() : null);
-                        }
-                    }
+                    if (expiryPlc != null && !readThrough && hasValueUnlocked())
+                        updateTtl(expiryPlc);
 
                     return new GridCacheUpdateAtomicResult<>(false,
-                        retval ? old : null,
+                        retval ? oldVal : null,
                         null,
                         invokeRes,
-                        -1L,
-                        -1L,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
                         false);
@@ -1896,7 +1845,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
             boolean hadVal = hasValueUnlocked();
 
-            // Incorporate DR version into new version if needed.
+            // Incorporate conflict version into new version if needed.
             if (conflictVer != null && conflictVer != newVer)
                 newVer = new GridCacheVersionEx(newVer.topologyVersion(),
                     newVer.globalTime(),
@@ -1905,58 +1854,82 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     newVer.dataCenterId(),
                     conflictVer);
 
-            IgniteBiTuple<Boolean, V> interceptRes = null;
-
-            long ttl0 = newTtl;
 
             if (op == GridCacheOperation.UPDATE) {
-                if (drRes == null) {
+                // Conflict context is null if there were no explicit conflict resolution.
+                if (conflictCtx == null) {
                     // Calculate TTL and expire time for local update.
-                    if (conflictTtl >= 0L) {
-                        assert conflictExpireTime >= 0L : conflictExpireTime;
+                    if (explicitTtl != CU.TTL_NOT_CHANGED) {
+                        // If conflict existed, expire time must be explicit.
+                        assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
+                        newSysTtl = newTtl = explicitTtl;
+                        newSysExpireTime = explicitExpireTime;
 
-                        ttl0 = conflictTtl;
-                        newExpireTime = conflictExpireTime;
+                        newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
+                            explicitExpireTime : CU.toExpireTime(explicitTtl);
                     }
                     else {
-                        assert conflictExpireTime == CU.TTL_NOT_CHANGED : conflictExpireTime;
+                        newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
+                            hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
 
-                        if (expiryPlc != null)
-                            newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
-                        else
-                            newTtl = CU.TTL_NOT_CHANGED;
-
-                        if (newTtl == CU.TTL_NOT_CHANGED) {
-                            ttl0 = ttlExtras();
+                        if (newSysTtl == CU.TTL_NOT_CHANGED) {
+                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                            newTtl = ttlExtras();
                             newExpireTime = expireTimeExtras();
                         }
+                        else if (newSysTtl == CU.TTL_ZERO) {
+                            op = GridCacheOperation.DELETE;
+
+                            newSysTtl = CU.TTL_NOT_CHANGED;
+                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+                            newTtl = CU.TTL_ETERNAL;
+                            newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+
+                            updated = null;
+                            valBytes = null;
+                        }
                         else {
-                            ttl0 = newTtl;
-                            newExpireTime = CU.toExpireTime(ttl0);
+                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                            newTtl = newSysTtl;
+                            newExpireTime = CU.toExpireTime(newTtl);
                         }
                     }
                 }
-                else if (newTtl == CU.TTL_NOT_CHANGED)
-                    ttl0 = ttlExtras();
+                else {
+                    newSysTtl = newTtl = conflictCtx.ttl();
+                    newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+                }
             }
+            else {
+                assert op == GridCacheOperation.DELETE;
 
-            if (ttl0 == CU.TTL_ZERO) {
-                op = GridCacheOperation.DELETE;
+                newSysTtl = CU.TTL_NOT_CHANGED;
+                newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
 
-                updated = null;
+                newTtl = CU.TTL_ETERNAL;
+                newExpireTime = CU.EXPIRE_TIME_ETERNAL;
             }
 
+            // TTL and expire time must be resolved at this point.
+            assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
+            assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
+
+            IgniteBiTuple<Boolean, V> interceptRes = null;
+
+            // Actual update.
             if (op == GridCacheOperation.UPDATE) {
                 if (intercept) {
-                    V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
+                    V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, oldVal, updated);
 
                     if (interceptorVal == null)
                         return new GridCacheUpdateAtomicResult<>(false,
-                            retval ? old : null,
+                            retval ? oldVal : null,
                             null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -1992,9 +1965,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                updateIndex(updated, valBytes, newExpireTime, newVer, old);
+                updateIndex(updated, valBytes, newExpireTime, newVer, oldVal);
 
-                update(updated, valBytes, newExpireTime, ttl0, newVer);
+                update(updated, valBytes, newExpireTime, newTtl, newVer);
 
                 drReplicate(drType, updated, valBytes, newVer);
 
@@ -2004,7 +1977,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     V evtOld = null;
 
                     if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(old);
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
@@ -2013,7 +1986,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
                         if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(old);
+                            evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
@@ -2023,15 +1996,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             }
             else {
                 if (intercept) {
-                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
+                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, oldVal);
 
                     if (cctx.cancelRemove(interceptRes))
                         return new GridCacheUpdateAtomicResult<>(false,
                             cctx.<V>unwrapTemporary(interceptRes.get2()),
                             null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -2043,7 +2016,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                clearIndex(old);
+                clearIndex(oldVal);
 
                 if (hadVal) {
                     assert !deletedUnlocked();
@@ -2068,7 +2041,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 boolean hasValPtr = valPtr != 0;
 
                 // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, null, 0, 0, newVer);
+                update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer);
+
+                assert newSysTtl == CU.TTL_NOT_CHANGED;
+                assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
 
                 if (cctx.offheapTiered() && hasValPtr) {
                     boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes());
@@ -2086,7 +2062,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     V evtOld = null;
 
                     if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(old);
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
@@ -2095,7 +2071,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
                         if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(old);
+                            evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
                             EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
@@ -2104,17 +2080,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 }
 
                 res = hadVal;
-
-                // Do not propagate zeroed TTL and expire time.
-                newTtl = -1L;
-                newDrExpireTime = -1L;
             }
 
             if (res)
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(),
+                    oldVal, oldValBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -2122,28 +2095,94 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 if (op == GridCacheOperation.UPDATE)
                     cctx.config().getInterceptor().onAfterPut(key, val);
                 else
-                    cctx.config().getInterceptor().onAfterRemove(key, old);
+                    cctx.config().getInterceptor().onAfterRemove(key, oldVal);
 
                 if (interceptRes != null)
-                    old = cctx.unwrapTemporary(interceptRes.get2());
+                    oldVal = cctx.unwrapTemporary(interceptRes.get2());
             }
         }
 
         if (log.isDebugEnabled())
-            log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']');
+            log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
 
         return new GridCacheUpdateAtomicResult<>(res,
-            old,
+            oldVal,
             updated,
             invokeRes,
-            newTtl,
-            newDrExpireTime,
+            newSysTtl,
+            newSysExpireTime,
             enqueueVer,
-            drRes,
+            conflictCtx,
             true);
     }
 
     /**
+     * @param expiry Expiration policy.
+     * @return Tuple holding initial TTL and expire time with the given expiry.
+     */
+    private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+        assert expiry != null;
+
+        long initTtl = expiry.forCreate();
+        long initExpireTime;
+
+        if (initTtl == CU.TTL_ZERO) {
+            initTtl = CU.TTL_MINIMUM;
+            initExpireTime = CU.expireTimeInPast();
+        }
+        else if (initTtl == CU.TTL_NOT_CHANGED) {
+            initTtl = CU.TTL_ETERNAL;
+            initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+        }
+        else
+            initExpireTime = CU.toExpireTime(initTtl);
+
+        return F.t(initTtl, initExpireTime);
+    }
+
+    /**
+     * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time.
+     *
+     * @param expiry Expiration policy.
+     * @param ttl Explicit TTL.
+     * @param expireTime Explicit expire time.
+     * @return Result.
+     */
+    private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime)
+        throws GridCacheEntryRemovedException {
+        boolean rmv = false;
+
+        // 1. If TTL is not changed, then calculate it based on expiry.
+        if (ttl == CU.TTL_NOT_CHANGED) {
+            if (expiry != null)
+                ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate();
+        }
+
+        // 2. If TTL is zero, then set delete marker.
+        if (ttl == CU.TTL_ZERO) {
+            rmv = true;
+
+            ttl = CU.TTL_ETERNAL;
+        }
+
+        // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
+        if (ttl == CU.TTL_NOT_CHANGED) {
+            if (isNew())
+                ttl = CU.TTL_ETERNAL;
+            else {
+                ttl = ttlExtras();
+                expireTime = expireTimeExtras();
+            }
+        }
+
+        // 4 If expire time was not set explicitly, then calculate it.
+        if (expireTime == CU.EXPIRE_TIME_CALCULATE)
+            expireTime = CU.toExpireTime(ttl);
+
+        return F.t(ttl, expireTime, rmv);
+    }
+
+    /**
      * Perform DR if needed.
      *
      * @param drType DR type.
@@ -2517,7 +2556,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         GridCacheVersion ver) {
         assert ver != null;
         assert Thread.holdsLock(this);
-        assert ttl >= 0 : ttl;
+        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
 
         long oldExpireTime = expireTimeExtras();
 
@@ -2535,6 +2574,39 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /**
+     * Update TTL if it is changed.
+     *
+     * @param expiryPlc Expiry policy.
+     */
+    private void updateTtl(ExpiryPolicy expiryPlc) {
+        long ttl = CU.toTtl(expiryPlc.getExpiryForAccess());
+
+        if (ttl != CU.TTL_NOT_CHANGED)
+            updateTtl(ttl);
+    }
+
+    /**
+     * Update TTL is it is changed.
+     *
+     * @param expiryPlc Expiry policy.
+     * @throws IgniteCheckedException If failed.
+     * @throws GridCacheEntryRemovedException If failed.
+     */
+    private void updateTtl(IgniteCacheExpiryPolicy expiryPlc)
+        throws IgniteCheckedException, GridCacheEntryRemovedException {
+        long ttl = expiryPlc.forAccess();
+
+        if (ttl != CU.TTL_NOT_CHANGED) {
+            updateTtl(ttl);
+
+            expiryPlc.ttlUpdated(key(),
+                getOrMarshalKeyBytes(),
+                version(),
+                hasReaders() ? ((GridDhtCacheEntry)this).readers() : null);
+        }
+    }
+
+    /**
      * @param ttl Time to live.
      */
     private void updateTtl(long ttl) {
@@ -2913,18 +2985,8 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     ver = this.ver;
                     val = rawGetOrUnmarshalUnlocked(false);
 
-                    if (val != null && expiryPlc != null) {
-                        long ttl = expiryPlc.forAccess();
-
-                        if (ttl != CU.TTL_NOT_CHANGED) {
-                            updateTtl(ttl);
-
-                            expiryPlc.ttlUpdated(key(),
-                                getOrMarshalKeyBytes(),
-                                version(),
-                                hasReaders() ? ((GridDhtCacheEntry)this).readers() : null);
-                        }
-                    }
+                    if (val != null && expiryPlc != null)
+                        updateTtl(expiryPlc);
                 }
 
                 if (!cctx.isAll(wrap(), filter))
@@ -3393,30 +3455,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional"})
-    @Override public long expireTime() throws GridCacheEntryRemovedException {
-        IgniteTxLocalAdapter<K, V> tx;
-
-        if (cctx.isDht())
-            tx = cctx.dht().near().context().tm().localTx();
-        else
-            tx = cctx.tm().localTx();
-
-        if (tx != null) {
-            long time = tx.entryExpireTime(txKey());
-
-            if (time > 0)
-                return time;
-        }
-
-        synchronized (this) {
-            checkObsolete();
-
-            return expireTimeExtras();
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public long expireTimeUnlocked() {
         assert Thread.holdsLock(this);
 
@@ -3491,13 +3529,27 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
     /** {@inheritDoc} */
     @SuppressWarnings({"IfMayBeConditional"})
-    @Override public long ttl() throws GridCacheEntryRemovedException {
-        IgniteTxLocalAdapter<K, V> tx;
+    @Override public long expireTime() throws GridCacheEntryRemovedException {
+        IgniteTxLocalAdapter<K, V> tx = currentTx();
 
-        if (cctx.isDht())
-            tx = cctx.dht().near().context().tm().localTx();
-        else
-            tx = cctx.tm().localTx();
+        if (tx != null) {
+            long time = tx.entryExpireTime(txKey());
+
+            if (time > 0)
+                return time;
+        }
+
+        synchronized (this) {
+            checkObsolete();
+
+            return expireTimeExtras();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional"})
+    @Override public long ttl() throws GridCacheEntryRemovedException {
+        IgniteTxLocalAdapter<K, V> tx = currentTx();
 
         if (tx != null) {
             long entryTtl = tx.entryTtl(txKey());
@@ -3513,6 +3565,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         }
     }
 
+    /**
+     * @return Current transaction.
+     */
+    private IgniteTxLocalAdapter<K, V> currentTx() {
+        if (cctx.isDht())
+            return cctx.dht().near().context().tm().localTx();
+        else
+            return cctx.tm().localTx();
+    }
+
     /** {@inheritDoc} */
     @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) {
         synchronized (this) {
@@ -4224,7 +4286,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
      * @param expireTime Expire time.
      */
     protected void ttlAndExpireTimeExtras(long ttl, long expireTime) {
-        extras = (extras != null) ? extras.ttlAndExpireTime(ttl, expireTime) : ttl != 0 ?
+        assert ttl != CU.TTL_NOT_CHANGED && ttl != CU.TTL_ZERO;
+
+        extras = (extras != null) ? extras.ttlAndExpireTime(ttl, expireTime) : ttl != CU.TTL_ETERNAL ?
             new GridCacheTtlEntryExtras<K>(ttl, expireTime) : null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index e36947e..dbf82dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -506,38 +506,38 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                                     GridCacheVersion explicitVer = txEntry.conflictVersion();
 
+                                    if (explicitVer == null)
+                                        explicitVer = writeVersion();
+
                                     if (txEntry.ttl() == CU.TTL_ZERO)
                                         op = DELETE;
 
-                                    boolean drNeedResolve = cacheCtx.conflictNeedResolve();
+                                    boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
+
+                                    GridCacheVersionConflictContext<K, V> conflictCtx = null;
 
-                                    if (drNeedResolve) {
+                                    if (conflictNeedResolve) {
                                         IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
-                                            drRes = conflictResolve(op, txEntry.key(), val, valBytes,
-                                            txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached);
+                                            drRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, cached);
 
                                         assert drRes != null;
 
-                                        GridCacheVersionConflictContext<K, V> drCtx = drRes.get2();
+                                        conflictCtx = drRes.get2();
 
-                                        if (drCtx.isUseOld())
+                                        if (conflictCtx.isUseOld())
                                             op = NOOP;
-                                        else if (drCtx.isUseNew()) {
-                                            txEntry.ttl(drCtx.ttl());
-
-                                            if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId())
-                                                txEntry.conflictExpireTime(drCtx.expireTime());
-                                            else
-                                                txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+                                        else if (conflictCtx.isUseNew()) {
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
                                         }
-                                        else if (drCtx.isMerge()) {
+                                        else if (conflictCtx.isMerge()) {
                                             op = drRes.get1();
-                                            val = drCtx.mergeValue();
+                                            val = conflictCtx.mergeValue();
                                             valBytes = null;
                                             explicitVer = writeVersion();
 
-                                            txEntry.ttl(drCtx.ttl());
-                                            txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
                                         }
                                     }
                                     else
@@ -610,26 +610,28 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             "Transaction does not own lock for group lock entry during  commit [tx=" +
                                                 this + ", txEntry=" + txEntry + ']';
 
-                                        if (txEntry.ttl() != -1L)
-                                            cached.updateTtl(null, txEntry.ttl());
+                                        if (conflictCtx == null || !conflictCtx.isUseOld()) {
+                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                                cached.updateTtl(null, txEntry.ttl());
 
-                                        if (nearCached != null) {
-                                            V val0 = null;
-                                            byte[] valBytes0 = null;
+                                            if (nearCached != null) {
+                                                V val0 = null;
+                                                byte[] valBytes0 = null;
 
-                                            GridCacheValueBytes valBytesTuple = cached.valueBytes();
+                                                GridCacheValueBytes valBytesTuple = cached.valueBytes();
 
-                                            if (!valBytesTuple.isNull()) {
-                                                if (valBytesTuple.isPlain())
-                                                    val0 = (V)valBytesTuple.get();
+                                                if (!valBytesTuple.isNull()) {
+                                                    if (valBytesTuple.isPlain())
+                                                        val0 = (V) valBytesTuple.get();
+                                                    else
+                                                        valBytes0 = valBytesTuple.get();
+                                                }
                                                 else
-                                                    valBytes0 = valBytesTuple.get();
-                                            }
-                                            else
-                                                val0 = cached.rawGet();
+                                                    val0 = cached.rawGet();
 
-                                            nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
-                                                cached.ttl(), nodeId);
+                                                nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
+                                                    cached.ttl(), nodeId);
+                                            }
                                         }
                                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c408394..9f9af31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1118,8 +1118,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         dhtFut = createDhtFuture(ver, req, res, completionCb, false);
 
-                        boolean replicate = ctx.isDrEnabled();
-
                         expiry = expiryPolicy(req.expiry());
 
                         GridCacheReturn<Object> retVal = null;
@@ -1127,7 +1125,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (keys.size() > 1 &&                             // Several keys ...
                             writeThrough() &&                              // and store is enabled ...
                             !ctx.store().isLocalStore() &&                 // and this is not local store ...
-                            !ctx.dr().receiveEnabled()  // and no DR.
+                            !ctx.dr().receiveEnabled()                     // and no DR.
                         ) {
                             // This method can only be used when there are no replicated entries in the batch.
                             UpdateBatchResult<K, V> updRes = updateWithBatch(node,
@@ -1138,7 +1136,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ver,
                                 dhtFut,
                                 completionCb,
-                                replicate,
+                                ctx.isDrEnabled(),
                                 taskName,
                                 expiry);
 
@@ -1157,7 +1155,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ver,
                                 dhtFut,
                                 completionCb,
-                                replicate,
+                                ctx.isDrEnabled(),
                                 taskName,
                                 expiry);
 
@@ -1670,10 +1668,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
 
-                assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; // Plain version is expected here.
-
-                if (newConflictVer == null)
-                    newConflictVer = ver;
+                assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
 
                 boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
                     req.topologyVersion());
@@ -1723,15 +1718,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
-                        GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult();
+                        GridCacheVersionConflictContext<K, V> conflictCtx = updRes.conflictResolveResult();
 
-                        long ttl = updRes.newTtl();
-                        long expireTime = updRes.conflictExpireTime();
-
-                        if (ctx == null)
+                        if (conflictCtx == null)
                             newConflictVer = null;
-                        else if (ctx.isMerge()) {
-                            newConflictVer = null; // DR version is discarded in case of merge.
+                        else if (conflictCtx.isMerge()) {
+                            newConflictVer = null; // Conflict version is discarded in case of merge.
                             newValBytes = null; // Value has been changed.
                         }
 
@@ -1746,7 +1738,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 newValBytes,
                                 entryProcessor,
                                 updRes.newTtl(),
-                                expireTime,
+                                updRes.conflictExpireTime(),
                                 newConflictVer);
                         }
 
@@ -1756,8 +1748,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 updRes.newValue(),
                                 newValBytes,
                                 entryProcessor,
-                                ttl,
-                                expireTime);
+                                updRes.newTtl(),
+                                updRes.conflictExpireTime());
                     }
                     else {
                         if (log.isDebugEnabled())
@@ -1771,9 +1763,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
                             GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult();
 
-                            long ttl = updRes.newTtl();
-                            long expireTime = updRes.conflictExpireTime();
-
                             if (ctx != null && ctx.isMerge())
                                 newValBytes = null;
 
@@ -1782,11 +1771,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 res.addNearValue(i,
                                     updRes.newValue(),
                                     newValBytes,
-                                    ttl,
-                                    expireTime);
+                                    updRes.newTtl(),
+                                    updRes.conflictExpireTime());
                             }
                             else
-                                res.addNearTtl(i, ttl, expireTime);
+                                res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null || newValBytes != null) {
                                 IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
@@ -1880,7 +1869,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         assert putMap == null ^ rmvKeys == null;
 
-        assert req.conflictVersions() == null : "updatePartialBatch cannot be called when there are DR entries in the batch.";
+        assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch.";
 
         long topVer = req.topologyVersion();
 
@@ -2480,9 +2469,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
-                        if (ttl != -1L && expireTime == -1L)
-                            expireTime = CU.toExpireTime(ttl);
-
                         GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
                             ver,
                             nodeId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index ef3de55..c3cc50a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -520,35 +520,39 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
             K key = F.first(keys);
 
             Object val;
-            long drTtl;
-            long drExpireTime;
-            GridCacheVersion drVer;
+            GridCacheVersion conflictVer;
+            long conflictTtl;
+            long conflictExpireTime;
 
             if (vals != null) {
+                // Regular PUT.
                 val = F.first(vals);
-                drTtl = -1;
-                drExpireTime = -1;
-                drVer = null;
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
             else if (conflictPutVals != null) {
-                GridCacheDrInfo<V> drPutVal =  F.first(conflictPutVals);
+                // Conflict PUT.
+                GridCacheDrInfo<V> conflictPutVal =  F.first(conflictPutVals);
 
-                val = drPutVal.value();
-                drTtl = drPutVal.ttl();
-                drExpireTime = drPutVal.expireTime();
-                drVer = drPutVal.version();
+                val = conflictPutVal.value();
+                conflictVer = conflictPutVal.version();
+                conflictTtl = conflictPutVal.ttl();
+                conflictExpireTime = conflictPutVal.expireTime();
             }
             else if (conflictRmvVals != null) {
+                // Conflict REMOVE.
                 val = null;
-                drTtl = -1;
-                drExpireTime = -1;
-                drVer = F.first(conflictRmvVals);
+                conflictVer = F.first(conflictRmvVals);
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
             else {
+                // Regular REMOVE.
                 val = null;
-                drTtl = -1;
-                drExpireTime = -1;
-                drVer = null;
+                conflictVer = null;
+                conflictTtl = CU.TTL_NOT_CHANGED;
+                conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
             }
 
             // We still can get here if user pass map with single element.
@@ -599,7 +603,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 subjId,
                 taskNameHash);
 
-            req.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, true);
+            req.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, true);
 
             single = true;
 
@@ -614,15 +618,15 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         if (vals != null)
             it = vals.iterator();
 
-        Iterator<GridCacheDrInfo<V>> drPutValsIt = null;
+        Iterator<GridCacheDrInfo<V>> conflictPutValsIt = null;
 
         if (conflictPutVals != null)
-            drPutValsIt = conflictPutVals.iterator();
+            conflictPutValsIt = conflictPutVals.iterator();
 
-        Iterator<GridCacheVersion> drRmvValsIt = null;
+        Iterator<GridCacheVersion> conflictRmvValsIt = null;
 
         if (conflictRmvVals != null)
-            drRmvValsIt = conflictRmvVals.iterator();
+            conflictRmvValsIt = conflictRmvVals.iterator();
 
         Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
 
@@ -643,15 +647,15 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 }
 
                 Object val;
-                long drTtl;
-                long drExpireTime;
-                GridCacheVersion drVer;
+                GridCacheVersion conflictVer;
+                long conflictTtl;
+                long conflictExpireTime;
 
                 if (vals != null) {
                     val = it.next();
-                    drTtl = -1;
-                    drExpireTime = -1;
-                    drVer = null;
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
 
                     if (val == null) {
                         NullPointerException err = new NullPointerException("Null value.");
@@ -662,24 +666,24 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                     }
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo<V> drPutVal =  drPutValsIt.next();
+                    GridCacheDrInfo<V> conflictPutVal =  conflictPutValsIt.next();
 
-                    val = drPutVal.value();
-                    drTtl = drPutVal.ttl();
-                    drExpireTime = drPutVal.expireTime();
-                    drVer = drPutVal.version();
+                    val = conflictPutVal.value();
+                    conflictVer = conflictPutVal.version();
+                    conflictTtl =  conflictPutVal.ttl();
+                    conflictExpireTime = conflictPutVal.expireTime();
                 }
                 else if (conflictRmvVals != null) {
                     val = null;
-                    drTtl = -1;
-                    drExpireTime = -1;
-                    drVer = drRmvValsIt.next();
+                    conflictVer = conflictRmvValsIt.next();
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
                 }
                 else {
                     val = null;
-                    drTtl = -1;
-                    drExpireTime = -1;
-                    drVer = null;
+                    conflictVer = null;
+                    conflictTtl = CU.TTL_NOT_CHANGED;
+                    conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
                 }
 
                 if (val == null && op != GridCacheOperation.DELETE)
@@ -727,7 +731,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                             "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
                     }
 
-                    mapped.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, i == 0);
+                    mapped.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
 
                     i++;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 7fb962c..22403ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -317,9 +317,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         long ttl = req.nearTtl(i);
                         long expireTime = req.nearExpireTime(i);
 
-                        if (ttl != -1L && expireTime == -1L)
-                            expireTime = CU.toExpireTime(ttl);
-
                         GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
                             ver,
                             nodeId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 93f4cf9..abdb99c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1287,21 +1287,65 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
      * Resolve DR conflict.
      *
      * @param op Initially proposed operation.
-     * @param key Key.
+     * @param txEntry TX entry being updated.
      * @param newVal New value.
      * @param newValBytes New value bytes.
-     * @param newTtl New TTL.
-     * @param newDrExpireTime New explicit DR expire time.
      * @param newVer New version.
      * @param old Old entry.
      * @return Tuple with adjusted operation type and conflict context.
      * @throws IgniteCheckedException In case of eny exception.
      * @throws GridCacheEntryRemovedException If entry got removed.
      */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
     protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> conflictResolve(
-        GridCacheOperation op, K key, V newVal, byte[] newValBytes, long newTtl, long newDrExpireTime,
-        GridCacheVersion newVer, GridCacheEntryEx<K, V> old)
+        GridCacheOperation op, IgniteTxEntry txEntry, V newVal, byte[] newValBytes, GridCacheVersion newVer,
+        GridCacheEntryEx<K, V> old)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
+        assert newVer != null;
+
+        // 1. Calculate TTL and expire time.
+        long newTtl = txEntry.ttl();
+        long newExpireTime = txEntry.conflictExpireTime();
+
+        // 1.1. If TTL is not changed, then calculate it based on expiry.
+        if (newTtl == CU.TTL_NOT_CHANGED) {
+            ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry);
+
+            if (expiry != null) {
+                if (op == CREATE)
+                    newTtl = CU.toTtl(expiry.getExpiryForCreation());
+                else if (op == UPDATE)
+                    newTtl = CU.toTtl(expiry.getExpiryForUpdate());
+            }
+        }
+
+        // 1.2. If TTL is set to zero, then mark operation as "DELETE".
+        if (newTtl == CU.TTL_ZERO) {
+            op = DELETE;
+
+            newTtl = CU.TTL_ETERNAL;
+        }
+
+        // 1.3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
+        if (newTtl == CU.TTL_NOT_CHANGED) {
+            if (old.isNewLocked())
+                newTtl = CU.TTL_ETERNAL;
+            else {
+                newTtl = old.rawTtl();
+                newExpireTime = old.rawExpireTime();
+            }
+        }
+
+        // TTL must be resolved at this point.
+        assert newTtl != CU.TTL_ZERO && newTtl != CU.TTL_NOT_CHANGED;
+
+        // 1.4 If expire time was not set explicitly, then calculate it.
+        if (newExpireTime == CU.EXPIRE_TIME_CALCULATE)
+            newExpireTime = CU.toExpireTime(newTtl);
+
+        // Expire time must be resolved at this point.
+        assert newExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
         // Construct old entry info.
         GridCacheVersionedEntryEx<K, V> oldEntry = old.versionedEntry();
 
@@ -1309,10 +1353,8 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
         if (newVal == null && newValBytes != null)
             newVal = cctx.marshaller().unmarshal(newValBytes, cctx.deploy().globalLoader());
 
-        long newExpireTime = newDrExpireTime >= 0L ? newDrExpireTime : CU.toExpireTime(newTtl);
-
         GridCacheVersionedEntryEx<K, V> newEntry =
-            new GridCachePlainVersionedEntry<>(key, newVal, newTtl, newExpireTime, newVer);
+            new GridCachePlainVersionedEntry<>((K)txEntry.key(), newVal, newTtl, newExpireTime, newVer);
 
         GridCacheVersionConflictContext<K, V> ctx = old.context().conflictResolve(oldEntry, newEntry, false);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 1416d8c..0f1366b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -852,10 +852,17 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         val.writeTo(out);
 
         out.writeLong(ttl);
-        out.writeLong(conflictExpireTime);
 
         CU.writeVersion(out, explicitVer);
         out.writeBoolean(grpLock);
+
+        if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) {
+            out.writeBoolean(true);
+            out.writeLong(conflictExpireTime);
+        }
+        else
+            out.writeBoolean(false);
+
         CU.writeVersion(out, conflictVer);
 
         out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null);
@@ -882,10 +889,11 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
         val.readFrom(in);
 
         ttl = in.readLong();
-        conflictExpireTime = in.readLong();
 
         explicitVer = CU.readVersion(in);
         grpLock = in.readBoolean();
+
+        conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE;
         conflictVer = CU.readVersion(in);
 
         expiryPlc = (ExpiryPolicy)in.readObject();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2da1bb7..e3f777d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -741,7 +741,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     V val = res.get2();
                                     byte[] valBytes = res.get3();
 
-                                    // Deal with DR conflicts.
+                                    // Deal with conflicts.
                                     GridCacheVersion explicitVer = txEntry.conflictVersion() != null ?
                                         txEntry.conflictVersion() : writeVersion();
 
@@ -762,45 +762,43 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         }
                                     }
 
-                                    boolean drNeedResolve = cacheCtx.conflictNeedResolve();
+                                    boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
 
-                                    if (drNeedResolve) {
+                                    GridCacheVersionConflictContext<K, V> conflictCtx = null;
+
+                                    if (conflictNeedResolve) {
                                         IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
-                                            drRes = conflictResolve(op, txEntry.key(), val, valBytes, txEntry.ttl(),
-                                                txEntry.conflictExpireTime(), explicitVer, cached);
+                                            conflictRes = conflictResolve(op, txEntry, val, valBytes, explicitVer,
+                                                cached);
 
-                                        assert drRes != null;
+                                        assert conflictRes != null;
 
-                                        GridCacheVersionConflictContext<K, V> conflictCtx = drRes.get2();
+                                        conflictCtx = conflictRes.get2();
 
                                         if (conflictCtx.isUseOld())
                                             op = NOOP;
                                         else if (conflictCtx.isUseNew()) {
                                             txEntry.ttl(conflictCtx.ttl());
-
-                                            if (conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId())
-                                                txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                            else
-                                                txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
                                         }
                                         else {
                                             assert conflictCtx.isMerge();
 
-                                            op = drRes.get1();
+                                            op = conflictRes.get1();
                                             val = conflictCtx.mergeValue();
                                             valBytes = null;
                                             explicitVer = writeVersion();
 
                                             txEntry.ttl(conflictCtx.ttl());
-                                            txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
                                         }
                                     }
                                     else
                                         // Nullify explicit version so that innerSet/innerRemove will work as usual.
                                         explicitVer = null;
 
-                                    if (sndTransformedVals || drNeedResolve) {
-                                        assert sndTransformedVals && cacheCtx.isReplicated() || drNeedResolve;
+                                    if (sndTransformedVals || conflictNeedResolve) {
+                                        assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve;
 
                                         txEntry.value(val, true, false);
                                         txEntry.valueBytes(valBytes);
@@ -905,8 +903,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             "Transaction does not own lock for group lock entry during  commit [tx=" +
                                                 this + ", txEntry=" + txEntry + ']';
 
-                                        if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
-                                            cached.updateTtl(null, txEntry.ttl());
+                                        if (conflictCtx == null || !conflictCtx.isUseOld()) {
+                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
+                                                cached.updateTtl(null, txEntry.ttl());
+                                        }
 
                                         if (log.isDebugEnabled())
                                             log.debug("Ignoring NOOP entry when committing: " + txEntry);
@@ -3257,8 +3257,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
         IgniteTxEntry<K, V> e = entry(key);
 
-        if (e != null)
+        if (e != null) {
             e.expiry(expiryPlc);
+            e.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
index 5e20ba7..8c37f5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java
@@ -21,7 +21,6 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
@@ -150,7 +149,7 @@ public class GridCacheVersion implements Message, Comparable<GridCacheVersion>,
     /**
      * @return Conflict version.
      */
-    @Nullable public GridCacheVersion conflictVersion() {
+    public GridCacheVersion conflictVersion() {
         return this; // Use current version.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
index b813803..94a611e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java
@@ -43,9 +43,6 @@ public class GridCacheVersionConflictContext<K, V> {
     /** TTL. */
     private long ttl;
 
-    /** Explicit TTL flag. */
-    private boolean explicitTtl;
-
     /** Manual resolve flag. */
     private boolean manualResolve;
 
@@ -94,24 +91,11 @@ public class GridCacheVersionConflictContext<K, V> {
 
     /**
      * Force cache to apply new entry overwriting old (existing) entry.
-     * <p>
-     * Note that updates from remote data centers always have explicit TTL , while local data center
-     * updates will only have explicit TTL in case {@link CacheEntry#timeToLive(long)} was called
-     * before update. In the latter case new entry will pick TTL of the old (existing) entry, even
-     * if it was set through update from remote data center. it means that depending on concurrent
-     * update timings new update might pick unexpected TTL. For example, consider that three updates
-     * of the same key are performed: local update with explicit TTL (1) followed by another local
-     * update without explicit TTL (2) and one remote update (3). In this case you might expect that
-     * update (2) will pick TTL set during update (1). However, in case update (3) occurrs between (1)
-     * and (2) and it overwrites (1) during conflict resolution, then update (2) will pick TTL of
-     * update (3). To have predictable TTL in such cases you should either always set it explicitly
-     * through {@code GridCacheEntry.timeToLive(long)} or use {@link #merge(Object, long)}.
      */
     public void useNew() {
         state = State.USE_NEW;
 
-        if (!explicitTtl)
-            ttl = newEntry.ttl();
+        ttl = newEntry.ttl();
     }
 
     /**
@@ -121,15 +105,16 @@ public class GridCacheVersionConflictContext<K, V> {
      * Also in case of merge you have to specify new TTL explicitly. For unlimited TTL use {@code 0}.
      *
      * @param mergeVal Merge value or {@code null} to force remove.
-     * @param ttl Time to live in milliseconds.
+     * @param ttl Time to live in milliseconds (must be non-negative).
      */
     public void merge(@Nullable V mergeVal, long ttl) {
+        if (ttl < 0)
+            throw new IllegalArgumentException("TTL must be non-negative: " + ttl);
+
         state = State.MERGE;
 
         this.mergeVal = mergeVal;
         this.ttl = ttl;
-
-        explicitTtl = true;
     }
 
     /**
@@ -185,15 +170,7 @@ public class GridCacheVersionConflictContext<K, V> {
      * @return Expire time.
      */
     public long expireTime() {
-        return explicitTtl ? CU.toExpireTime(ttl) : isUseNew() ? newEntry.expireTime() :
-            isUseOld() ? oldEntry.expireTime() : 0L;
-    }
-
-    /**
-     * @return Explicit TTL flag.
-     */
-    public boolean explicitTtl() {
-        return explicitTtl;
+        return isUseNew() ? newEntry.expireTime() : isUseOld() ? oldEntry.expireTime() : CU.toExpireTime(ttl);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
index 4f135ca..9a6cbd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java
@@ -77,7 +77,7 @@ public class GridCacheVersionEx extends GridCacheVersion {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion conflictVersion() {
+    public GridCacheVersion conflictVersion() {
         return drVer;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45ce8147/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
index ed43149..b8cfe77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -61,7 +61,11 @@ public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Update
 
                 K key = entry.key();
 
-                GridCacheDrInfo<V> val = entry.value() != null ? entry.expireTime() != 0 ?
+                // Ensure that updater to not receive special-purpose values for TTL and expire time.
+                assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0;
+                assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0;
+
+                GridCacheDrInfo<V> val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ?
                     new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) :
                     new GridCacheDrInfo<>(entry.value(), entry.version()) : null;
 


Mime
View raw message