ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [32/36] incubator-ignite git commit: # ignite-283: Reworked innerUpdate.
Date Wed, 18 Feb 2015 10:07:00 GMT
# ignite-283: Reworked innerUpdate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b4045ce1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b4045ce1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b4045ce1

Branch: refs/heads/ignite-283
Commit: b4045ce14d28b6e72bdda057ac4953cf29280309
Parents: 362bf16
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Feb 18 11:48:28 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Feb 18 11:48:28 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  14 +-
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheEntryEx.java      |  12 +-
 .../processors/cache/GridCacheMapEntry.java     | 357 +++++++++----------
 .../processors/cache/GridCacheProjectionEx.java |   8 +-
 .../cache/GridCacheProjectionImpl.java          |  16 +-
 .../processors/cache/GridCacheProxyImpl.java    |  16 +-
 .../cache/GridCacheUpdateAtomicResult.java      |  42 ++-
 .../processors/cache/GridCacheUtils.java        |  14 +-
 .../cache/conflict/GridCacheConflictInfo.java   |  53 +++
 .../conflict/GridCacheConflictInnerUpdate.java  |  88 +++++
 .../conflict/GridCacheNoTtlConflictInfo.java    |  60 ++++
 .../conflict/GridCacheTtlConflictInfo.java      |  78 ++++
 .../GridDistributedTxRemoteAdapter.java         |   3 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 195 +++++-----
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  12 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |   4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 105 +++---
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 144 ++------
 .../distributed/near/GridNearAtomicCache.java   |  34 +-
 .../processors/cache/dr/GridCacheDrInfo.java    |   5 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/version/GridCacheVersion.java         |   4 +-
 .../GridCacheVersionConflictContext.java        |  23 +-
 .../cache/version/GridCacheVersionEx.java       |   2 +-
 .../dr/GridDrDataLoadCacheUpdater.java          |  10 +-
 .../ignite/internal/util/IgniteUtils.java       |   2 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   7 +-
 28 files changed, 739 insertions(+), 575 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fa3cb17..1daec2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2472,7 +2472,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllDr(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public void putAllConflict(final Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
         if (F.isEmpty(drMap))
             return;
 
@@ -2486,13 +2486,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             }
 
             @Override public String toString() {
-                return "putAllDr [drMap=" + drMap + ']';
+                return "putAllConflict [drMap=" + drMap + ']';
             }
         });
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllDrAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap)
         throws IgniteCheckedException {
         if (F.isEmpty(drMap))
             return new GridFinishedFuture<Object>(ctx.kernalContext());
@@ -2507,7 +2507,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             }
 
             @Override public String toString() {
-                return "putAllDrAsync [drMap=" + drMap + ']';
+                return "putAllConflictAsync [drMap=" + drMap + ']';
             }
         });
     }
@@ -3379,7 +3379,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllDr(final Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public void removeAllConflict(final Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
         ctx.denyOnLocalRead();
 
         if (F.isEmpty(drMap))
@@ -3393,13 +3393,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
             }
 
             @Override public String toString() {
-                return "removeAllDr [drMap=" + drMap + ']';
+                return "removeAllConflict [drMap=" + drMap + ']';
             }
         });
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllDrAsync(final Map<? extends K, GridCacheVersion> drMap)
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<? extends K, GridCacheVersion> drMap)
         throws IgniteCheckedException {
         ctx.denyOnLocalRead();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 02624d7..060a825 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1582,11 +1582,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /**
      * Check whether conflict resolution is required.
      *
-     * @param oldVer Old version.
-     * @param newVer New version.
      * @return {@code True} in case DR is required.
      */
-    public boolean conflictNeedResolve(GridCacheVersion oldVer, GridCacheVersion newVer) {
+    public boolean conflictNeedResolve() {
         return conflictRslvr != null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index f044347..0502e2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.eviction.*;
+import org.apache.ignite.internal.processors.cache.conflict.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -423,9 +424,9 @@ public interface GridCacheEntryEx<K, V> {
      *      greater than passed in.
      * @param filter Optional filter to check.
      * @param drType DR type.
-     * @param drTtl DR TTL (if any).
-     * @param drExpireTime DR expire time (if any).
-     * @param drVer DR version (if any).
+     * @param conflictTtl Conflict TTL (if any).
+     * @param conflictExpireTime Conflict expire time (if any).
+     * @param conflictVer DR version (if any).
      * @param drResolve If {@code true} then performs DR conflicts resolution.
      * @param intercept If {@code true} then calls cache interceptor.
      * @param subjId Subject ID initiated this update.
@@ -455,10 +456,7 @@ public interface GridCacheEntryEx<K, V> {
         boolean checkVer,
         @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
-        long drTtl,
-        long drExpireTime,
-        @Nullable GridCacheVersion drVer,
-        boolean drResolve,
+        GridCacheConflictInnerUpdate conflict,
         boolean intercept,
         @Nullable UUID subjId,
         String taskName

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index cc879cb..e1a0189 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.eviction.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.conflict.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.extras.*;
 import org.apache.ignite.internal.processors.cache.query.*;
@@ -164,7 +165,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         this.hash = hash;
         this.cctx = cctx;
 
-        ttlAndExpireTimeExtras(ttl, toExpireTime(ttl));
+        ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl));
 
         if (cctx.portableEnabled())
             val = (V)cctx.kernalContext().portable().detachPortable(val);
@@ -844,7 +845,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     V prevVal = rawGetOrUnmarshalUnlocked(false);
 
-                    long expTime = toExpireTime(ttl);
+                    long expTime = CU.toExpireTime(ttl);
 
                     if (loadedFromStore)
                         // Update indexes before actual write to entry.
@@ -918,7 +919,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     V old = rawGetOrUnmarshalUnlocked(false);
 
-                    long expTime = toExpireTime(ttl);
+                    long expTime = CU.toExpireTime(ttl);
 
                     // Detach value before index update.
                     if (cctx.portableEnabled())
@@ -1048,7 +1049,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     expireTime = expireTimeExtras();
                 }
                 else
-                    expireTime = toExpireTime(ttl);
+                    expireTime = CU.toExpireTime(ttl);
             }
 
             assert ttl >= 0 : ttl;
@@ -1433,7 +1434,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         expireTime = expireTimeExtras();
                     }
                     else
-                        expireTime = toExpireTime(ttl);
+                        expireTime = CU.toExpireTime(ttl);
                 }
                 else {
                     ttl = ttlExtras();
@@ -1534,6 +1535,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public GridCacheUpdateAtomicResult<K, V> innerUpdate(
         GridCacheVersion newVer,
         UUID evtNodeId,
@@ -1551,31 +1553,26 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         boolean verCheck,
         @Nullable IgnitePredicate<Cache.Entry<K, V>>[] filter,
         GridDrType drType,
-        long drTtl,
-        long drExpireTime,
-        @Nullable GridCacheVersion drVer,
-        boolean drResolve,
+        GridCacheConflictInnerUpdate conflict,
         boolean intercept,
         @Nullable UUID subjId,
         String taskName
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
-        V old;
-
         boolean res = true;
 
+        V oldVal = null;
         V updated;
 
         GridCacheVersion enqueueVer = null;
 
-        GridCacheVersionConflictContext<K, V> drRes = null;
+        GridCacheVersionConflictContext<K, V> conflictCtx = null;
 
         EntryProcessorResult<?> invokeRes = null;
 
-        long newTtl = -1L;
-        long newExpireTime = 0L;
-        long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node.
+        long newTtl = CU.TTL_NOT_CHANGED;
+        long newExpireTime = CU.EXPIRE_TIME_CALCULATE;
 
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1588,56 +1585,46 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
             Object transformClo = null;
 
-            if (drResolve) {
-                GridCacheVersion oldDrVer = version().drVersion();
-
-                boolean drNeedResolve = cctx.conflictNeedResolve(oldDrVer, drVer);
-
-                if (drNeedResolve) {
-                    // Get old value.
-                    V oldVal = rawGetOrUnmarshalUnlocked(true);
+            // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
+            if (conflict.resolve()) {
+                GridCacheVersion oldConflictVer = version().conflictVersion();
 
+                // Cache is conflict-enabled.
+                if (cctx.conflictNeedResolve()) {
+                    // Get new value, optionally unmarshalling and/or transforming it.
                     if (writeObj == null && valBytes != null)
                         writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
 
                     if (op == GridCacheOperation.TRANSFORM) {
                         transformClo = writeObj;
 
-                        writeObj = ((IgniteClosure<V, V>)writeObj).apply(oldVal);
-                    }
-
-                    K k = key();
-
-                    if (drTtl >= 0L) {
-                        // DR TTL is set explicitly
-                        assert drExpireTime >= 0L;
-
-                        newTtl = drTtl;
-                        newExpireTime = drExpireTime;
+                        writeObj = ((IgniteClosure<V, V>)writeObj).apply(rawGetOrUnmarshalUnlocked(true));
+                        valBytes = null;
                     }
-                    else {
-                        long ttl = expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L;
 
-                        newTtl = ttl < 0 ? ttlExtras() : ttl;
-                        newExpireTime = CU.toExpireTime(newTtl);
-                    }
+                    // Get TTL and expire time (no special-purpose TTL values can be set for conflict).
+                    assert conflict.ttl() != CU.TTL_ZERO && conflict.ttl() != CU.TTL_NOT_CHANGED && conflict.ttl() >= 0;
+                    assert conflict.expireTime() != CU.EXPIRE_TIME_CALCULATE && conflict.expireTime() >= 0;
 
+                    // Prepare old and new entries for conflict resolution.
                     GridCacheVersionedEntryEx<K, V> oldEntry = versionedEntry();
-                    GridCacheVersionedEntryEx<K, V> newEntry =
-                        new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, drVer);
+                    GridCacheVersionedEntryEx<K, V> newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj,
+                        conflict.ttl(), conflict.expireTime(), conflict.version());
 
-                    drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+                    // Resolve conflict.
+                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
 
-                    assert drRes != null;
+                    assert conflictCtx != null;
 
-                    if (drRes.isUseOld()) {
+                    // Use old value?
+                    if (conflictCtx.isUseOld()) {
                         // Handle special case with atomic comparator.
-                        if (!isNew() &&                                            // Not initial value,
-                            verCheck &&                                            // and atomic version check,
-                            oldDrVer.dataCenterId() == drVer.dataCenterId() &&     // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldDrVer, drVer) == 0 && // and both versions are equal,
-                            cctx.writeThrough() &&                                 // and store is enabled,
-                            primary)                                               // and we are primary.
+                        if (!isNew() &&                                                               // Not initial value,
+                            verCheck &&                                                               // and atomic version check,
+                            oldConflictVer.dataCenterId() == conflict.version().dataCenterId() &&     // and data centers are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, conflict.version()) == 0 && // and both versions are equal,
+                            cctx.writeThrough() &&                                                    // and store is enabled,
+                            primary)                                                                  // and we are primary.
                         {
                             V val = rawGetOrUnmarshalUnlocked(false);
 
@@ -1650,47 +1637,43 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                                 cctx.store().putToStore(null, key(), val, ver);
                         }
 
-                        old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
-
                         return new GridCacheUpdateAtomicResult<>(false,
-                            old,
+                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
+                            null,
                             null,
                             invokeRes,
-                            0L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
                     }
-                    else if (drRes.isUseNew())
-                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+                    // Will update something.
                     else {
-                        assert drRes.isMerge();
-
-                        writeObj = drRes.mergeValue();
-                        valBytes = null;
+                        // Merge is a local update which override passed value bytes.
+                        if (conflictCtx.isMerge()) {
+                            writeObj = conflictCtx.mergeValue();
+                            valBytes = null;
 
-                        drVer = null; // Update will be considered as local.
+                            conflict.clearVersion(); // Update will be considered as local.
+                        }
+                        else
+                            assert conflictCtx.isUseNew();
 
+                        // Update value is known at this point, so update operation type.
                         op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
                     }
 
-                    newTtl = drRes.ttl();
-                    newExpireTime = drRes.expireTime();
-
-                    // Explicit DR expire time will be passed to remote node only in that case.
-                    if (!drRes.explicitTtl() && !drRes.isMerge()) {
-                        if (drRes.isUseNew() && newEntry.dataCenterId() != cctx.dataCenterId() ||
-                            drRes.isUseOld() && oldEntry.dataCenterId() != cctx.dataCenterId())
-                            newDrExpireTime = drRes.expireTime();
-                    }
+                    newTtl = conflictCtx.ttl();
+                    newExpireTime = conflictCtx.expireTime();
                 }
                 else
-                    // Nullify DR version on this update, so that we will use regular version during next updates.
-                    drVer = null;
+                    // Nullify conflict version on this update, so that we will use regular version during next updates.
+                    conflict.clearVersion();
             }
 
-            if (drRes == null) { // Perform version check only in case there will be no explicit conflict resolution.
+            // Perform version check only in case there was no explicit conflict resolution.
+            if (conflictCtx == null) {
                 if (verCheck) {
                     if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer) >= 0) {
                         if (ATOMIC_VER_COMPARATOR.compare(ver, newVer) == 0 && cctx.writeThrough() && primary) {
@@ -1714,14 +1697,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                                     "[entry=" + this + ", newVer=" + newVer + ']');
                         }
 
-                        old = retval ? rawGetOrUnmarshalUnlocked(false) : val;
-
                         return new GridCacheUpdateAtomicResult<>(false,
-                            old,
+                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
+                            null,
                             null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -1732,46 +1714,48 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']';
             }
 
-            // Possibly get old value form store.
-            old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
-
-            GridCacheValueBytes oldBytes = valueBytesUnlocked();
+            // Prepare old value and value bytes.
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            GridCacheValueBytes oldValBytes = valueBytesUnlocked();
 
+            // Possibly read value from store.
             boolean readThrough = false;
 
-            if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old = readThrough(null, key, false, subjId, taskName);
+            if (needVal && oldVal == null && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+                oldVal = readThrough(null, key, false, subjId, taskName);
 
                 readThrough = true;
 
                 // Detach value before index update.
                 if (cctx.portableEnabled())
-                    old = (V)cctx.kernalContext().portable().detachPortable(old);
+                    oldVal = (V)cctx.kernalContext().portable().detachPortable(oldVal);
 
-                long ttl = 0;
-                long expireTime = 0;
+                // Calculate initial TTL and expire time.
+                long initTtl = 0;
+                long initExpireTime = 0;
 
-                if (expiryPlc != null && old != null) {
-                    ttl = expiryPlc.forCreate();
+                if (expiryPlc != null && oldVal != null) {
+                    initTtl = expiryPlc.forCreate();
 
-                    if (ttl == CU.TTL_ZERO) {
-                        ttl = CU.TTL_MINIMUM;
-                        expireTime = CU.expireTimeInPast();
+                    if (initTtl == CU.TTL_ZERO) {
+                        initTtl = CU.TTL_MINIMUM;
+                        initExpireTime = CU.expireTimeInPast();
                     }
-                    else if (ttl == CU.TTL_NOT_CHANGED)
-                        ttl = 0;
+                    else if (initTtl == CU.TTL_NOT_CHANGED)
+                        initTtl = CU.TTL_ETERNAL;
                     else
-                        expireTime = CU.toExpireTime(ttl);
+                        initExpireTime = CU.toExpireTime(initTtl);
                 }
 
-                if (old != null)
-                    updateIndex(old, null, expireTime, ver, null);
+                if (oldVal != null)
+                    updateIndex(oldVal, null, initExpireTime, ver, null);
                 else
                     clearIndex(null);
 
-                update(old, null, expireTime, ttl, ver);
+                update(oldVal, null, initExpireTime, initTtl, ver);
 
-                if (deletedUnlocked() && old != null && !isInternal())
+                if (deletedUnlocked() && oldVal != null && !isInternal())
                     deletedUnlocked(false);
             }
 
@@ -1779,7 +1763,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
                 // PutIfAbsent methods mustn't update hit/miss statistics
                 if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray())
-                    cctx.cache().metrics0().onRead(old != null);
+                    cctx.cache().metrics0().onRead(oldVal != null);
             }
 
             // Check filter inside of synchronization.
@@ -1791,24 +1775,27 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         updateTtl(expiryPlc);
 
                     return new GridCacheUpdateAtomicResult<>(false,
-                        retval ? old : null,
+                        retval ? oldVal : null,
+                        null,
                         null,
                         invokeRes,
-                        -1L,
-                        -1L,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
                         false);
                 }
             }
 
-            // Calculate new value.
+            // Calculate new value in case we met transform.
             if (op == GridCacheOperation.TRANSFORM) {
+                assert conflictCtx == null : "Cannot be TRANSFORM here is conflict resolution was performed earlier.";
+
                 transformClo = writeObj;
 
                 EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)writeObj;
 
-                CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, old);
+                CacheInvokeEntry<K, V> entry = new CacheInvokeEntry<>(cctx, key, oldVal);
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);
@@ -1823,9 +1810,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 catch (Exception e) {
                     invokeRes = new CacheInvokeResult<>(e);
 
-                    updated = old;
+                    updated = oldVal;
 
-                    valBytes = oldBytes.getIfMarshaled();
+                    valBytes = oldValBytes.getIfMarshaled();
                 }
 
                 if (!entry.modified()) {
@@ -1833,11 +1820,12 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                         updateTtl(expiryPlc);
 
                     return new GridCacheUpdateAtomicResult<>(false,
-                        retval ? old : null,
+                        retval ? oldVal : null,
+                        null,
                         null,
                         invokeRes,
-                        -1L,
-                        -1L,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
                         false);
@@ -1852,30 +1840,29 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
             boolean hadVal = hasValueUnlocked();
 
-            // Incorporate DR version into new version if needed.
-            if (drVer != null && drVer != newVer)
+            // Incorporate conflict version into new version if needed.
+            if (conflict.version() != null && conflict.version() != newVer)
                 newVer = new GridCacheVersionEx(newVer.topologyVersion(),
                     newVer.globalTime(),
                     newVer.order(),
                     newVer.nodeOrder(),
                     newVer.dataCenterId(),
-                    drVer);
+                    conflict.version());
 
-            IgniteBiTuple<Boolean, V> interceptRes = null;
-
-            long ttl0 = newTtl;
 
             if (op == GridCacheOperation.UPDATE) {
-                if (drRes == null) {
+                // Conflict context is null if there were no explicit conflict resolution.
+                if (conflictCtx == null) {
                     // Calculate TTL and expire time for local update.
-                    if (drTtl >= 0L) {
-                        assert drExpireTime >= 0L : drExpireTime;
+                    if (conflict.hasExplicitTtl()) {
+                        // TTL/expireTime was sent to us from node where conflict had been resolved.
+                        assert conflict.hasExplicitExpireTime() : conflict.expireTime();
 
-                        ttl0 = drTtl;
-                        newExpireTime = drExpireTime;
+                        newTtl = conflict.ttl();
+                        newExpireTime = conflict.expireTime();
                     }
                     else {
-                        assert drExpireTime == CU.TTL_NOT_CHANGED : drExpireTime;
+                        assert !conflict.hasExplicitExpireTime() : conflict.expireTime();
 
                         if (expiryPlc != null)
                             newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
@@ -1883,36 +1870,50 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                             newTtl = CU.TTL_NOT_CHANGED;
 
                         if (newTtl == CU.TTL_NOT_CHANGED) {
-                            ttl0 = ttlExtras();
+                            newTtl = ttlExtras();
                             newExpireTime = expireTimeExtras();
                         }
-                        else {
-                            ttl0 = newTtl;
-                            newExpireTime = toExpireTime(ttl0);
+                        else if (newTtl == CU.TTL_ZERO) {
+                            op = GridCacheOperation.DELETE;
+
+                            // This is delete, so make TTL and expire time eternal.
+                            newTtl = CU.TTL_ETERNAL;
+                            newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+
+                            updated = null;
+                            valBytes = null;
                         }
+                        else
+                            newExpireTime = CU.toExpireTime(newTtl);
                     }
                 }
-                else if (newTtl == CU.TTL_NOT_CHANGED)
-                    ttl0 = ttlExtras();
             }
+            else {
+                assert op == GridCacheOperation.DELETE;
 
-            if (ttl0 == CU.TTL_ZERO) {
-                op = GridCacheOperation.DELETE;
-
-                updated = null;
+                newTtl = CU.TTL_ETERNAL;
+                newExpireTime = CU.EXPIRE_TIME_ETERNAL;
             }
 
+            // TTL and expire time must be resolved at this point.
+            assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
+            assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
+
+            IgniteBiTuple<Boolean, V> interceptRes = null;
+
+            // Actual update.
             if (op == GridCacheOperation.UPDATE) {
                 if (intercept) {
-                    V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated);
+                    V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, oldVal, updated);
 
                     if (interceptorVal == null)
                         return new GridCacheUpdateAtomicResult<>(false,
-                            retval ? old : null,
+                            retval ? oldVal : null,
+                            null,
                             null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -1948,9 +1949,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                updateIndex(updated, valBytes, newExpireTime, newVer, old);
+                updateIndex(updated, valBytes, newExpireTime, newVer, oldVal);
 
-                update(updated, valBytes, newExpireTime, ttl0, newVer);
+                update(updated, valBytes, newExpireTime, newTtl, newVer);
 
                 drReplicate(drType, updated, valBytes, newVer);
 
@@ -1960,7 +1961,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     V evtOld = null;
 
                     if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(old);
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
@@ -1969,7 +1970,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
                         if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(old);
+                            evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
@@ -1979,15 +1980,16 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             }
             else {
                 if (intercept) {
-                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, old);
+                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, oldVal);
 
                     if (cctx.cancelRemove(interceptRes))
                         return new GridCacheUpdateAtomicResult<>(false,
                             cctx.<V>unwrapTemporary(interceptRes.get2()),
                             null,
+                            null,
                             invokeRes,
-                            -1L,
-                            -1L,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
                             false);
@@ -1999,7 +2001,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
-                clearIndex(old);
+                clearIndex(oldVal);
 
                 if (hadVal) {
                     assert !deletedUnlocked();
@@ -2024,7 +2026,10 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 boolean hasValPtr = valPtr != 0;
 
                 // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, null, 0, 0, newVer);
+                assert newTtl == CU.TTL_ETERNAL;
+                assert newExpireTime == CU.EXPIRE_TIME_ETERNAL;
+
+                update(null, null, newTtl, newExpireTime, newVer);
 
                 if (cctx.offheapTiered() && hasValPtr) {
                     boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes());
@@ -2042,7 +2047,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                     V evtOld = null;
 
                     if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(old);
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null,
                             newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
@@ -2051,7 +2056,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                     if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
                         if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(old);
+                            evtOld = cctx.unwrapTemporary(oldVal);
 
                         cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
                             EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
@@ -2060,17 +2065,14 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 }
 
                 res = hadVal;
-
-                // Do not propagate zeroed TTL and expire time.
-                newTtl = -1L;
-                newDrExpireTime = -1L;
             }
 
             if (res)
                 updateMetrics(op, metrics);
 
             if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+                cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(),
+                    oldVal, oldValBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -2078,24 +2080,31 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
                 if (op == GridCacheOperation.UPDATE)
                     cctx.config().getInterceptor().onAfterPut(key, val);
                 else
-                    cctx.config().getInterceptor().onAfterRemove(key, old);
+                    cctx.config().getInterceptor().onAfterRemove(key, oldVal);
 
                 if (interceptRes != null)
-                    old = cctx.unwrapTemporary(interceptRes.get2());
+                    oldVal = cctx.unwrapTemporary(interceptRes.get2());
             }
         }
 
         if (log.isDebugEnabled())
-            log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']');
+            log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
+
+        // Ensure that TTL / expire stuff is not sent over wire when not needed.
+        if (!res || op == GridCacheOperation.DELETE) {
+            newTtl = CU.TTL_NOT_CHANGED;
+            newExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
 
         return new GridCacheUpdateAtomicResult<>(res,
-            old,
+            oldVal,
             updated,
+            valBytes,
             invokeRes,
             newTtl,
-            newDrExpireTime,
+            newExpireTime,
             enqueueVer,
-            drRes,
+            conflictCtx,
             true);
     }
 
@@ -2111,7 +2120,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     private void drReplicate(GridDrType drType, @Nullable V val, @Nullable byte[] valBytes, GridCacheVersion ver)
         throws IgniteCheckedException {
         if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
-            cctx.dr().replicate(key, keyBytes, val, valBytes, rawTtl(), rawExpireTime(), ver.drVersion(), drType);
+            cctx.dr().replicate(key, keyBytes, val, valBytes, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType);
     }
 
     /**
@@ -2473,7 +2482,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         GridCacheVersion ver) {
         assert ver != null;
         assert Thread.holdsLock(this);
-        assert ttl >= 0 : ttl;
+        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl;
 
         long oldExpireTime = expireTimeExtras();
 
@@ -2537,7 +2546,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             expireTime = CU.expireTimeInPast();
         }
         else
-            expireTime = toExpireTime(ttl);
+            expireTime = CU.toExpireTime(ttl);
 
         long oldExpireTime = expireTimeExtras();
 
@@ -2565,22 +2574,6 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     }
 
     /**
-     * @param ttl Time to live.
-     * @return Expiration time.
-     */
-    public static long toExpireTime(long ttl) {
-        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0;
-
-        long expireTime = ttl == CU.TTL_ETERNAL ? CU.EXPIRE_TIME_ETERNAL : U.currentTimeMillis() + ttl;
-
-        // Account for overflow.
-        if (expireTime < 0)
-            expireTime = CU.EXPIRE_TIME_ETERNAL;
-
-        return expireTime;
-    }
-
-    /**
      * @throws GridCacheEntryRemovedException If entry is obsolete.
      */
     protected void checkObsolete() throws GridCacheEntryRemovedException {
@@ -3066,7 +3059,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
     @Override public synchronized V rawPut(V val, long ttl) {
         V old = this.val;
 
-        update(val, null, toExpireTime(ttl), ttl, nextVersion());
+        update(val, null, CU.toExpireTime(ttl), ttl, nextVersion());
 
         return old;
     }
@@ -3090,7 +3083,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
             checkObsolete();
 
             if (isNew() || (!preload && deletedUnlocked())) {
-                long expTime = expireTime < 0 ? toExpireTime(ttl) : expireTime;
+                long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
 
                 if (cctx.portableEnabled())
                     val = (V)cctx.kernalContext().portable().detachPortable(val);
@@ -3167,7 +3160,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
         boolean isNew = isStartVersion();
 
         return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false),
-            ttlExtras(), expireTimeExtras(), ver.drVersion(), isNew);
+            ttlExtras(), expireTimeExtras(), ver.conflictVersion(), isNew);
     }
 
     /** {@inheritDoc} */
@@ -3184,7 +3177,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V>
 
                 long ttl = ttlExtras();
 
-                long expTime = toExpireTime(ttl);
+                long expTime = CU.toExpireTime(ttl);
 
                 // Detach value before index update.
                 if (cctx.portableEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
index 4e93122..7cb3de5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
@@ -112,7 +112,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If put operation failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
+    public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
 
     /**
      * Store DR data asynchronously.
@@ -122,7 +122,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If put operation failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
+    public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
 
     /**
      * Internal method that is called from {@link GridCacheEntryImpl}.
@@ -154,7 +154,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If remove failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
+    public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
 
     /**
      * Removes DR data asynchronously.
@@ -164,7 +164,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If remove failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
+    public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
 
     /**
      * Internal method that is called from {@link GridCacheEntryImpl}.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index eb854d5..041d222 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -794,14 +794,14 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
-        cache.putAllDr(drMap);
+    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+        cache.putAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
         throws IgniteCheckedException {
-        return cache.putAllDrAsync(drMap);
+        return cache.putAllConflictAsync(drMap);
     }
 
     /** {@inheritDoc} */
@@ -1130,13 +1130,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
-        cache.removeAllDr(drMap);
+    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+        cache.removeAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
-        return cache.removeAllDrAsync(drMap);
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+        return cache.removeAllConflictAsync(drMap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 2a653af..2fc59b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -738,11 +738,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllDr(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            delegate.putAllDr(drMap);
+            delegate.putAllConflict(drMap);
         }
         finally {
             gate.leave(prev);
@@ -750,12 +750,12 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllDrAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
         throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.putAllDrAsync(drMap);
+            return delegate.putAllConflictAsync(drMap);
         }
         finally {
             gate.leave(prev);
@@ -1454,11 +1454,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllDr(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            delegate.removeAllDr(drMap);
+            delegate.removeAllConflict(drMap);
         }
         finally {
             gate.leave(prev);
@@ -1466,11 +1466,11 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllDrAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.removeAllDrAsync(drMap);
+            return delegate.removeAllConflictAsync(drMap);
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index cabfaa5..136e1bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -39,11 +39,14 @@ public class GridCacheUpdateAtomicResult<K, V> {
     @GridToStringInclude
     private final V newVal;
 
+    /** New value bytes. */
+    private final byte[] newValBytes;
+
     /** New TTL. */
     private final long newTtl;
 
     /** Explicit DR expire time (if any). */
-    private final long drExpireTime;
+    private final long conflictExpireTime;
 
     /** Version for deferred delete. */
     @GridToStringInclude
@@ -51,7 +54,7 @@ public class GridCacheUpdateAtomicResult<K, V> {
 
     /** DR resolution result. */
     @GridToStringInclude
-    private final GridCacheVersionConflictContext<K, V> drRes;
+    private final GridCacheVersionConflictContext<K, V> conflictRes;
 
     /** Whether update should be propagated to DHT node. */
     private final boolean sndToDht;
@@ -65,30 +68,33 @@ public class GridCacheUpdateAtomicResult<K, V> {
      * @param success Success flag.
      * @param oldVal Old value.
      * @param newVal New value.
+     * @param newValBytes New value bytes.
      * @param res Value computed by the {@link EntryProcessor}.
      * @param newTtl New TTL.
-     * @param drExpireTime Explicit DR expire time (if any).
+     * @param conflictExpireTime Explicit DR expire time (if any).
      * @param rmvVer Version for deferred delete.
-     * @param drRes DR resolution result.
+     * @param conflictRes DR resolution result.
      * @param sndToDht Whether update should be propagated to DHT node.
      */
     public GridCacheUpdateAtomicResult(boolean success,
         @Nullable V oldVal,
         @Nullable V newVal,
+        @Nullable byte[] newValBytes,
         @Nullable EntryProcessorResult<?> res,
         long newTtl,
-        long drExpireTime,
+        long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
-        @Nullable GridCacheVersionConflictContext<K, V> drRes,
+        @Nullable GridCacheVersionConflictContext<K, V> conflictRes,
         boolean sndToDht) {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
+        this.newValBytes = newValBytes;
         this.res = res;
         this.newTtl = newTtl;
-        this.drExpireTime = drExpireTime;
+        this.conflictExpireTime = conflictExpireTime;
         this.rmvVer = rmvVer;
-        this.drRes = drRes;
+        this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
     }
 
@@ -121,17 +127,25 @@ public class GridCacheUpdateAtomicResult<K, V> {
     }
 
     /**
-     * @return {@code -1} if TTL did not change, otherwise new TTL.
+     * @return New value bytes.
+     */
+    @Nullable public byte[] newValueBytes() {
+        return newValBytes;
+    }
+
+    /**
+     * @return {@link GridCacheUtils#TTL_NOT_CHANGED} if TTL did not change, otherwise new TTL.
      */
     public long newTtl() {
         return newTtl;
     }
 
     /**
-     * @return Explicit DR expire time (if any).
+     * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time
+     * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}.
      */
-    public long drExpireTime() {
-        return drExpireTime;
+    public long conflictExpireTime() {
+        return conflictExpireTime;
     }
 
     /**
@@ -144,8 +158,8 @@ public class GridCacheUpdateAtomicResult<K, V> {
     /**
      * @return DR conflict resolution context.
      */
-    @Nullable public GridCacheVersionConflictContext<K, V> drResolveResult() {
-        return drRes;
+    @Nullable public GridCacheVersionConflictContext<K, V> conflictResolveResult() {
+        return conflictRes;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index b4f306b..fc5130c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1614,15 +1614,15 @@ public class GridCacheUtils {
      * @return Expire time.
      */
     public static long toExpireTime(long ttl) {
-        assert ttl >= 0L : ttl;
+        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0;
 
-        if (ttl == 0L)
-            return 0L;
-        else {
-            long expireTime = U.currentTimeMillis() + ttl;
+        long expireTime = ttl == CU.TTL_ETERNAL ? CU.EXPIRE_TIME_ETERNAL : U.currentTimeMillis() + ttl;
 
-            return expireTime > 0L ? expireTime : 0L;
-        }
+        // Account for overflow.
+        if (expireTime < 0)
+            expireTime = CU.EXPIRE_TIME_ETERNAL;
+
+        return expireTime;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java
new file mode 100644
index 0000000..8f210cf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInfo.java
@@ -0,0 +1,53 @@
+package org.apache.ignite.internal.processors.cache.conflict;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Cache conflict info which is passed over the wire.
+ */
+public abstract class GridCacheConflictInfo implements Externalizable {
+    /**
+     * Create conflict info.
+     *
+     * @param ver Version.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     * @return Conflict info.
+     */
+    public static GridCacheConflictInfo create(GridCacheVersion ver, long ttl, long expireTime) {
+        if (ttl == CU.TTL_NOT_CHANGED) {
+            assert expireTime == CU.EXPIRE_TIME_CALCULATE;
+
+            return new GridCacheNoTtlConflictInfo(ver);
+        }
+        else {
+            assert ttl != CU.TTL_ZERO && ttl >= 0;
+            assert expireTime != CU.EXPIRE_TIME_CALCULATE && expireTime >= 0;
+
+            return new GridCacheTtlConflictInfo(ver, ttl, expireTime);
+        }
+    }
+
+    /**
+     * @return Version.
+     */
+    public abstract GridCacheVersion version();
+
+    /**
+     * @return TTL.
+     */
+    public abstract long ttl();
+
+    /**
+     * @return Expire time.
+     */
+    public abstract long expireTime();
+
+    /**
+     * @return {@code True} if has expiration info.
+     */
+    public abstract boolean hasExpirationInfo();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java
new file mode 100644
index 0000000..75b7e3b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheConflictInnerUpdate.java
@@ -0,0 +1,88 @@
+package org.apache.ignite.internal.processors.cache.conflict;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Conflict inner update info.
+ */
+public class GridCacheConflictInnerUpdate {
+    /** Resolve flag. */
+    private final boolean resolve;
+
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /** TTL. */
+    private final long ttl;
+
+    /** Expire time. */
+    private final long expireTime;
+
+    /**
+     * Conflict inner update info.
+     *
+     * @param resolve Resolve flag.
+     * @param ver Version.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    public GridCacheConflictInnerUpdate(boolean resolve, GridCacheVersion ver, long ttl, long expireTime) {
+        // TODO: IGNITE-283: Add assertion for invariants.
+
+        this.resolve = resolve;
+        this.ver = ver;
+        this.ttl = ttl;
+        this.expireTime = expireTime;
+    }
+
+    /**
+     * @return Resolve flag.
+     */
+    public boolean resolve() {
+        return resolve;
+    }
+
+    /**
+     * @return Version.
+     */
+    @Nullable public GridCacheVersion version() {
+        return ver;
+    }
+
+    /**
+     * Clear version so that update will be considered local.
+     */
+    public void clearVersion() {
+        ver = null;
+    }
+
+    /*
+     * @return TTL.
+     */
+    public long ttl() {
+        return ttl;
+    }
+
+    /**
+     * @return {@code True} if explicit TTL is set.
+     */
+    public boolean hasExplicitTtl() {
+        return ttl != CU.TTL_NOT_CHANGED;
+    }
+
+    /**
+     * @return {@code True} if explicit expire time is set.
+     */
+    public boolean hasExplicitExpireTime() {
+        return expireTime != CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /**
+     * @return Expire time.
+     */
+    public long expireTime() {
+        return expireTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java
new file mode 100644
index 0000000..926feef
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheNoTtlConflictInfo.java
@@ -0,0 +1,60 @@
+package org.apache.ignite.internal.processors.cache.conflict;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Conflict info without TTL.
+ */
+public class GridCacheNoTtlConflictInfo extends GridCacheConflictInfo {
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public GridCacheNoTtlConflictInfo() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ver Version.
+     */
+    public GridCacheNoTtlConflictInfo(GridCacheVersion ver) {
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ttl() {
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long expireTime() {
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasExpirationInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ver = (GridCacheVersion)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java
new file mode 100644
index 0000000..f4e6f29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/conflict/GridCacheTtlConflictInfo.java
@@ -0,0 +1,78 @@
+package org.apache.ignite.internal.processors.cache.conflict;
+
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Conflict info with TTL.
+ */
+public class GridCacheTtlConflictInfo extends GridCacheConflictInfo {
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /** TTL. */
+    private long ttl;
+
+    /** Expire time. */
+    private long expireTime;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public GridCacheTtlConflictInfo() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ver Version.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    public GridCacheTtlConflictInfo(GridCacheVersion ver, long ttl, long expireTime) {
+        assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED;
+        assert expireTime != CU.EXPIRE_TIME_CALCULATE;
+
+        this.ver = ver;
+        this.ttl = ttl;
+        this.expireTime = expireTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ttl() {
+        return ttl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long expireTime() {
+        return expireTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasExpirationInfo() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ver);
+        out.writeLong(ttl);
+        out.writeLong(expireTime);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ver = (GridCacheVersion)in.readObject();
+        ttl = in.readLong();
+        expireTime = in.readLong();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4045ce1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4f8357a..9c1d85f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -510,8 +510,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (txEntry.ttl() == CU.TTL_ZERO)
                                         op = DELETE;
 
-                                    boolean drNeedResolve =
-                                        cacheCtx.conflictNeedResolve(cached.version(), explicitVer);
+                                    boolean drNeedResolve = cacheCtx.conflictNeedResolve();
 
                                         if (drNeedResolve) {
                                             IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>


Mime
View raw message