ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] incubator-ignite git commit: # ignite-51
Date Thu, 05 Mar 2015 14:16:04 GMT
# ignite-51


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7253addd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7253addd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7253addd

Branch: refs/heads/ignite-51
Commit: 7253adddfa9f64c77a5a36b01658f31cbd35884e
Parents: 814fae3
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Mar 5 09:28:25 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Mar 5 16:15:26 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +
 .../internal/processors/cache/CacheObject.java  |  27 +-
 .../processors/cache/CacheObjectAdapter.java    |   6 -
 .../processors/cache/GridCacheAdapter.java      |  30 ++-
 .../processors/cache/GridCacheContext.java      |  28 +--
 .../processors/cache/GridCacheMapEntry.java     | 177 +++++++------
 .../processors/cache/GridCacheProjectionEx.java |   9 +-
 .../cache/GridCacheProjectionImpl.java          |   9 +-
 .../processors/cache/GridCacheProxyImpl.java    |  11 +-
 .../cache/GridCacheSharedContext.java           |   6 +-
 .../processors/cache/GridCacheTtlManager.java   |  18 +-
 .../cache/KeyCacheObjectTransferImpl.java       |   5 -
 .../GridDistributedTxRemoteAdapter.java         |  44 ++--
 .../dht/atomic/GridDhtAtomicCache.java          |  32 ++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  10 +-
 .../distributed/near/CacheVersionedValue.java   |  42 ++--
 .../distributed/near/GridNearAtomicCache.java   |  11 +-
 .../near/GridNearTxPrepareFuture.java           |   4 +-
 .../near/GridNearTxPrepareResponse.java         |  10 +-
 .../cache/dr/GridCacheDrExpirationInfo.java     |  21 +-
 .../processors/cache/dr/GridCacheDrInfo.java    |  29 ++-
 .../processors/cache/dr/GridCacheDrManager.java |  10 +-
 .../cache/dr/os/GridOsCacheDrManager.java       |  10 +-
 .../cache/jta/CacheJtaManagerAdapter.java       |   2 +-
 .../cache/jta/CacheNoopJtaManager.java          |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  25 +-
 .../transactions/IgniteTxLocalAdapter.java      |  93 ++++---
 .../cache/transactions/IgniteTxLocalEx.java     |   8 +-
 .../version/GridCacheRawVersionedEntry.java     | 248 +++++++++++++++----
 .../cache/version/GridCacheVersionEx.java       |  63 +++++
 .../cache/version/GridCacheVersionManager.java  |   2 +-
 .../dataload/GridDataLoadUpdateJob.java         |   1 +
 .../dataload/IgniteDataLoaderEntry.java         |   4 +-
 .../dataload/IgniteDataLoaderImpl.java          |   7 +
 .../dr/GridDrDataLoadCacheUpdater.java          |  25 +-
 .../cache/GridCacheTtlManagerLoadTest.java      |   2 +-
 .../loadtests/hashmap/GridCacheTestContext.java |   8 +-
 .../processors/cache/jta/CacheJtaManager.java   |   2 +-
 38 files changed, 628 insertions(+), 423 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5b606c4..f7f0fc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -559,6 +559,16 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 103:
+                msg = new GridCacheRawVersionedEntry<>();
+
+                break;
+
+            case 104:
+                msg = new GridCacheVersionEx();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index b231d89..686f7b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -33,18 +33,6 @@ public interface CacheObject extends Message {
     @Nullable public <T> T value(CacheObjectContext ctx, boolean cpy);
 
     /**
-     * @param name Field name.
-     * @return Field value.
-     */
-    @Nullable public <T> T getField(String name);
-
-    /**
-     * @param ctx Context.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException;
-
-    /**
      * @return {@code True} if value is byte array.
      */
     public boolean byteArray();
@@ -57,6 +45,14 @@ public interface CacheObject extends Message {
     public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException;
 
     /**
+     * Prepares cache object for cache (e.g. copies user-provided object if needed).
+     *
+     * @param ctx Cache context.
+     * @return Instance to store in cache.
+     */
+    public CacheObject prepareForCache(CacheObjectContext ctx);
+
+    /**
      * @param ctx Context.
      * @param ldr Class loader.
      * @throws IgniteCheckedException If failed.
@@ -64,9 +60,8 @@ public interface CacheObject extends Message {
     public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException;
 
     /**
-     * @param ctx Cache context.
-     *
-     * @return Instance to store in cache.
+     * @param ctx Context.
+     * @throws IgniteCheckedException If failed.
      */
-    public CacheObject prepareForCache(CacheObjectContext ctx);
+    public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 4d8572a..0e63506 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -36,12 +36,6 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
     /** */
     protected byte[] valBytes;
 
-    /** {@inheritDoc} */
-    @Nullable @Override public <T> T getField(String name) {
-        // TODO IGNITE-51.
-        return null;
-    }
-
     /**
      * @param ctx Context.
      * @return {@code True} need to copy value returned to user.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 44890f7..8037961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2570,7 +2570,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllConflict(final Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public void putAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> drMap)
         throws IgniteCheckedException {
         if (F.isEmpty(drMap))
             return;
@@ -2591,7 +2591,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> drMap)
         throws IgniteCheckedException {
         if (F.isEmpty(drMap))
             return new GridFinishedFuture<Object>(ctx.kernalContext());
@@ -3483,7 +3483,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllConflict(final Map<? extends K, GridCacheVersion> drMap)
+    @Override public void removeAllConflict(final Map<KeyCacheObject, GridCacheVersion> drMap)
         throws IgniteCheckedException {
         ctx.denyOnLocalRead();
 
@@ -3504,7 +3504,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<? extends K, GridCacheVersion> drMap)
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<KeyCacheObject, GridCacheVersion> drMap)
         throws IgniteCheckedException {
         ctx.denyOnLocalRead();
 
@@ -3977,10 +3977,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());
+            IgniteDataLoaderImpl ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());
 
             try {
-                ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
+                ldr.updater(new GridDrDataLoadCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc);
 
@@ -4193,10 +4193,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         });
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());
+            IgniteDataLoaderImpl ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());
 
             try {
-                ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
+                ldr.updater(new GridDrDataLoadCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0);
 
@@ -5760,7 +5760,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         /**
          * @param keys Keys involved.
          */
-        protected AsyncInOp(Collection<? extends K> keys) {
+        protected AsyncInOp(Collection<?> keys) {
             super(keys);
         }
 
@@ -6229,7 +6229,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final IgniteBiPredicate<K, V> p;
 
         /** */
-        final Collection<Map.Entry<K, V>> col;
+        final Collection<GridCacheRawVersionedEntry> col;
 
         /** */
         final IgniteDataLoaderImpl<K, V> ldr;
@@ -6272,20 +6272,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
                     ttl = 0;
             }
 
-            GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry<>(ctx.toCacheKeyObject(key),
-                null,
+            GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry(ctx.toCacheKeyObject(key),
                 ctx.toCacheObject(val),
-                null,
                 ttl,
                 0,
                 ver);
 
-            e.marshal(ctx.marshaller());
+            e.prepareDirectMarshal(ctx.cacheObjectContext());
 
             col.add(e);
 
             if (col.size() == ldr.perNodeBufferSize()) {
-                ldr.addData(col);
+                ldr.addDataInternal(col);
 
                 col.clear();
             }
@@ -6296,7 +6294,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
          */
         void onDone() {
             if (!col.isEmpty())
-                ldr.addData(col);
+                ldr.addDataInternal(col);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 0a386eb..78c0800 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -124,19 +124,19 @@ public class GridCacheContext<K, V> implements Externalizable {
     private CacheDataStructuresManager<K, V> dataStructuresMgr;
 
     /** Eager TTL manager. */
-    private GridCacheTtlManager<K, V> ttlMgr;
+    private GridCacheTtlManager ttlMgr;
 
     /** Store manager. */
     private GridCacheStoreManager storeMgr;
 
     /** Replication manager. */
-    private GridCacheDrManager<K, V> drMgr;
+    private GridCacheDrManager drMgr;
 
     /** Serialization manager. */
     private IgniteCacheSerializationManager<K, V> serMgr;
 
     /** JTA manager. */
-    private CacheJtaManagerAdapter<K, V> jtaMgr;
+    private CacheJtaManagerAdapter jtaMgr;
 
     /** Managers. */
     private List<GridCacheManager<K, V>> mgrs = new LinkedList<>();
@@ -238,9 +238,9 @@ public class GridCacheContext<K, V> implements Externalizable {
         CacheContinuousQueryManager<K, V> contQryMgr,
         GridCacheAffinityManager<K, V> affMgr,
         CacheDataStructuresManager<K, V> dataStructuresMgr,
-        GridCacheTtlManager<K, V> ttlMgr,
-        GridCacheDrManager<K, V> drMgr,
-        CacheJtaManagerAdapter<K, V> jtaMgr) {
+        GridCacheTtlManager ttlMgr,
+        GridCacheDrManager drMgr,
+        CacheJtaManagerAdapter jtaMgr) {
         assert ctx != null;
         assert sharedCtx != null;
         assert cacheCfg != null;
@@ -839,7 +839,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     /**
      * @return Lock order manager.
      */
-    public GridCacheVersionManager<K, V> versions() {
+    public GridCacheVersionManager versions() {
         return sharedCtx.versions();
     }
 
@@ -930,21 +930,21 @@ public class GridCacheContext<K, V> implements Externalizable {
     /**
      * @return DR manager.
      */
-    public GridCacheDrManager<K, V> dr() {
+    public GridCacheDrManager dr() {
         return drMgr;
     }
 
     /**
      * @return TTL manager.
      */
-    public GridCacheTtlManager<K, V> ttl() {
+    public GridCacheTtlManager ttl() {
         return ttlMgr;
     }
 
     /**
      * @return JTA manager.
      */
-    public CacheJtaManagerAdapter<K, V> jta() {
+    public CacheJtaManagerAdapter jta() {
         return jtaMgr;
     }
 
@@ -958,7 +958,7 @@ public class GridCacheContext<K, V> implements Externalizable {
 
         for (CacheEntryPredicate p0 : p) {
             if ((p0 instanceof CacheEntrySerializablePredicate) &&
-               ((CacheEntrySerializablePredicate) p0).predicate() instanceof CacheEntryPredicateNoValue)
+               ((CacheEntrySerializablePredicate)p0).predicate() instanceof CacheEntryPredicateNoValue)
             return true;
         }
 
@@ -1589,15 +1589,15 @@ public class GridCacheContext<K, V> implements Externalizable {
      *
      * @param oldEntry Old entry.
      * @param newEntry New entry.
-     * @param atomicVerComparator Whether to use atomic version comparator.
+     * @param atomicVerComp Whether to use atomic version comparator.
      * @return Conflict resolution result.
      * @throws IgniteCheckedException In case of exception.
      */
     public GridCacheVersionConflictContext<K, V> conflictResolve(GridCacheVersionedEntryEx<K, V> oldEntry,
-        GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws IgniteCheckedException {
+        GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComp) throws IgniteCheckedException {
         assert conflictRslvr != null : "Should not reach this place.";
 
-        GridCacheVersionConflictContext<K, V> ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComparator);
+        GridCacheVersionConflictContext<K, V> ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComp);
 
         if (ctx.isManualResolve())
             drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0caf1f9..a1c7ecf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
 import static org.apache.ignite.internal.processors.cache.CacheFlag.*;
+import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 import static org.apache.ignite.transactions.TransactionState.*;
 
 /**
@@ -142,8 +143,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      * @param ttl Time to live.
      * @param hdrId Header id.
      */
-    protected GridCacheMapEntry(GridCacheContext<?, ?> cctx, KeyCacheObject key, int hash, CacheObject val,
-        GridCacheMapEntry next, long ttl, int hdrId) {
+    protected GridCacheMapEntry(GridCacheContext<?, ?> cctx,
+        KeyCacheObject key,
+        int hash,
+        CacheObject val,
+        GridCacheMapEntry next,
+        long ttl,
+        int hdrId)
+    {
         log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);
 
         key = (KeyCacheObject)cctx.kernalContext().portable().prepareForCache(key, cctx);
@@ -1654,79 +1661,83 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
                 // Cache is conflict-enabled.
                 if (cctx.conflictNeedResolve()) {
-// TODO IGNITE-51.
-//                    // Get new value, optionally unmarshalling and/or transforming it.
-//                    if (writeObj == null && valBytes != null)
-//                        writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
-//
-//                    if (op == GridCacheOperation.TRANSFORM) {
-//                        transformClo = writeObj;
-//
-//                        writeObj = ((IgniteClosure<V, V>)writeObj).apply(rawGetOrUnmarshalUnlocked(true));
-//                        valBytes = null;
-//                    }
-//
-//                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc, explicitTtl,
-//                        explicitExpireTime);
-//
-//                    // Prepare old and new entries for conflict resolution.
-//                    GridCacheVersionedEntryEx oldEntry = versionedEntry();
-//                    GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj,
-//                        expiration.get1(), expiration.get2(), conflictVer != null ? conflictVer : newVer);
-//
-//                    // Resolve conflict.
-//                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
-//
-//                    assert conflictCtx != null;
-//
-//                    // Use old value?
-//                    if (conflictCtx.isUseOld()) {
-//                        GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
-//
-//                        // Handle special case with atomic comparator.
-//                        if (!isNew() &&                                                           // Not initial value,
-//                            verCheck &&                                                           // and atomic version check,
-//                            oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&     // and data centers are equal,
-//                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal,
-//                            cctx.writeThrough() &&                                                // and store is enabled,
-//                            primary)                                                              // and we are primary.
-//                        {
-//                            V val = rawGetOrUnmarshalUnlocked(false);
-//
-//                            if (val == null) {
-//                                assert deletedUnlocked();
-//
-//                                cctx.store().removeFromStore(null, key());
-//                            }
-//                            else
-//                                cctx.store().putToStore(null, key(), val, ver);
-//                        }
-//
-//                        return new GridCacheUpdateAtomicResult<>(false,
-//                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
-//                            null,
-//                            invokeRes,
-//                            CU.TTL_ETERNAL,
-//                            CU.EXPIRE_TIME_ETERNAL,
-//                            null,
-//                            null,
-//                            false);
-//                    }
-//                    // Will update something.
-//                    else {
-//                        // Merge is a local update which override passed value bytes.
-//                        if (conflictCtx.isMerge()) {
-//                            writeObj = conflictCtx.mergeValue();
-//                            valBytes = null;
-//
-//                            conflictVer = null;
-//                        }
-//                        else
-//                            assert conflictCtx.isUseNew();
-//
-//                        // Update value is known at this point, so update operation type.
-//                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
-//                    }
+                    // Get new value, optionally unmarshalling and/or transforming it.
+                    Object writeObj0;
+
+                    if (op == GridCacheOperation.TRANSFORM) {
+                        transformClo = writeObj;
+
+                        // TODO IGNITE-51
+                        writeObj0 = ((IgniteClosure)writeObj).apply(rawGetOrUnmarshalUnlocked(true));
+                    }
+                    else
+                        writeObj0 = CU.value((CacheObject)writeObj, cctx, false);
+
+                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
+                        explicitTtl,
+                        explicitExpireTime);
+
+                    // Prepare old and new entries for conflict resolution.
+                    GridCacheVersionedEntryEx oldEntry = versionedEntry();
+                    GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(
+                        oldEntry.key(),
+                        writeObj0,
+                        expiration.get1(),
+                        expiration.get2(),
+                        conflictVer != null ? conflictVer : newVer);
+
+                    // Resolve conflict.
+                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+                    assert conflictCtx != null;
+
+                    // Use old value?
+                    if (conflictCtx.isUseOld()) {
+                        GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+
+                        // Handle special case with atomic comparator.
+                        if (!isNew() &&                                                           // Not initial value,
+                            verCheck &&                                                           // and atomic version check,
+                            oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&     // and data centers are equal,
+                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal,
+                            cctx.writeThrough() &&                                                // and store is enabled,
+                            primary)                                                              // and we are primary.
+                        {
+                            CacheObject val = rawGetOrUnmarshalUnlocked(false);
+
+                            if (val == null) {
+                                assert deletedUnlocked();
+
+                                cctx.store().removeFromStore(null, key());
+                            }
+                            else
+                                cctx.store().putToStore(null, key(), val, ver);
+                        }
+
+                        return new GridCacheUpdateAtomicResult(false,
+                            retval ? rawGetOrUnmarshalUnlocked(false) : null,
+                            null,
+                            invokeRes,
+                            CU.TTL_ETERNAL,
+                            CU.EXPIRE_TIME_ETERNAL,
+                            null,
+                            null,
+                            false);
+                    }
+                    // Will update something.
+                    else {
+                        // Merge is a local update which override passed value bytes.
+                        if (conflictCtx.isMerge()) {
+                            writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
+
+                            conflictVer = null;
+                        }
+                        else
+                            assert conflictCtx.isUseNew();
+
+                        // Update value is known at this point, so update operation type.
+                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+                    }
                 }
                 else
                     // Nullify conflict version on this update, so that we will use regular version during next updates.
@@ -1915,7 +1926,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
                     newVer.dataCenterId(),
                     conflictVer);
 
-
             if (op == GridCacheOperation.UPDATE) {
                 // Conflict context is null if there were no explicit conflict resolution.
                 if (conflictCtx == null) {
@@ -2274,9 +2284,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
      */
     private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver)
         throws IgniteCheckedException {
-// TODO IGNITE-51.
-//        if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
-//            cctx.dr().replicate(key, null, val, valBytes, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType);
+        if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal())
+            cctx.dr().replicate(key, val, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType);
     }
 
     /**
@@ -3292,8 +3301,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
     @Override public synchronized GridCacheVersionedEntryEx versionedEntry() throws IgniteCheckedException {
         boolean isNew = isStartVersion();
 
-        return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false),
-            ttlExtras(), expireTimeExtras(), ver.conflictVersion(), isNew);
+        CacheObject val = isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false);
+
+        return new GridCachePlainVersionedEntry<>(key.value(cctx.cacheObjectContext(), true),
+            CU.value(val, cctx, true),
+            ttlExtras(),
+            expireTimeExtras(),
+            ver.conflictVersion(),
+            isNew);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
index 08a3fb1..a417525 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java
@@ -112,7 +112,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If put operation failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
+    public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> drMap) throws IgniteCheckedException;
 
     /**
      * Store DR data asynchronously.
@@ -122,7 +122,8 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If put operation failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException;
+    public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> drMap)
+        throws IgniteCheckedException;
 
     /**
      * Internal method that is called from {@link GridCacheEntryImpl}.
@@ -154,7 +155,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If remove failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
+    public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> drMap) throws IgniteCheckedException;
 
     /**
      * Removes DR data asynchronously.
@@ -164,7 +165,7 @@ public interface GridCacheProjectionEx<K, V> extends CacheProjection<K, V> {
      * @throws IgniteCheckedException If remove failed.
      * @throws CacheFlagException If projection flags validation failed.
      */
-    public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException;
+    public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap) throws IgniteCheckedException;
 
     /**
      * Internal method that is called from {@link GridCacheEntryImpl}.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 7a27cff..19aa08e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -620,12 +620,12 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> drMap) throws IgniteCheckedException {
         cache.putAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> drMap)
         throws IgniteCheckedException {
         return cache.putAllConflictAsync(drMap);
     }
@@ -962,12 +962,13 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> drMap) throws IgniteCheckedException {
         cache.removeAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap)
+        throws IgniteCheckedException {
         return cache.removeAllConflictAsync(drMap);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 88200bb..88af3dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -709,7 +709,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> drMap) throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -721,7 +721,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap)
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> drMap)
         throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -1425,7 +1425,8 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> drMap)
+        throws IgniteCheckedException {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -1437,7 +1438,9 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap)
+        throws IgniteCheckedException
+    {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 44775ae..99e1285 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -56,7 +56,7 @@ public class GridCacheSharedContext<K, V> {
     private GridCachePartitionExchangeManager<K, V> exchMgr;
 
     /** Version manager. */
-    private GridCacheVersionManager<K, V> verMgr;
+    private GridCacheVersionManager verMgr;
 
     /** Lock manager. */
     private GridCacheMvccManager mvccMgr;
@@ -84,7 +84,7 @@ public class GridCacheSharedContext<K, V> {
     public GridCacheSharedContext(
         GridKernalContext kernalCtx,
         IgniteTxManager txMgr,
-        GridCacheVersionManager<K, V> verMgr,
+        GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
@@ -254,7 +254,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @return Lock order manager.
      */
-    public GridCacheVersionManager<K, V> versions() {
+    public GridCacheVersionManager versions() {
         return verMgr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 335ac63..7bfdbfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -32,9 +32,9 @@ import java.util.*;
  * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set.
  */
 @SuppressWarnings("NakedNotify")
-public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
+public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
-    private final GridConcurrentSkipListSet<EntryWrapper<K, V>> pendingEntries = new GridConcurrentSkipListSet<>();
+    private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>();
 
     /** Cleanup worker thread. */
     private CleanupWorker cleanupWorker;
@@ -68,7 +68,7 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
      * @param entry Entry to add.
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
-        EntryWrapper<K, V> wrapper = new EntryWrapper<>(entry);
+        EntryWrapper wrapper = new EntryWrapper(entry);
 
         pendingEntries.add(wrapper);
 
@@ -86,7 +86,7 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
     public void removeTrackedEntry(GridCacheMapEntry entry) {
         // Remove must be called while holding lock on entry before updating expire time.
         // No need to wake up waiting thread in this case.
-        pendingEntries.remove(new EntryWrapper<>(entry));
+        pendingEntries.remove(new EntryWrapper(entry));
     }
 
     /** {@inheritDoc} */
@@ -114,8 +114,8 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
 
                 GridCacheVersion obsoleteVer = null;
 
-                for (Iterator<EntryWrapper<K, V>> it = pendingEntries.iterator(); it.hasNext(); ) {
-                    EntryWrapper<K, V> wrapper = it.next();
+                for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext(); ) {
+                    EntryWrapper wrapper = it.next();
 
                     if (wrapper.expireTime <= now) {
                         if (log.isDebugEnabled())
@@ -142,7 +142,7 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
                         // synchronization block, so we don't miss out
                         // on thread notification events sent from
                         // 'addTrackedEntry(..)' method.
-                        EntryWrapper<K, V> first = pendingEntries.firstx();
+                        EntryWrapper first = pendingEntries.firstx();
 
                         if (first != null) {
                             long waitTime = first.expireTime - U.currentTimeMillis();
@@ -163,7 +163,7 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
     /**
      * Entry wrapper.
      */
-    private static class EntryWrapper<K, V> implements Comparable<EntryWrapper<K, V>> {
+    private static class EntryWrapper implements Comparable<EntryWrapper> {
         /** Entry expire time. */
         private final long expireTime;
 
@@ -182,7 +182,7 @@ public class GridCacheTtlManager<K, V> extends GridCacheManagerAdapter<K, V> {
         }
 
         /** {@inheritDoc} */
-        @Override public int compareTo(EntryWrapper<K, V> o) {
+        @Override public int compareTo(EntryWrapper o) {
             if (expireTime == o.expireTime) {
                 if (entry.startVersion() == o.entry.startVersion())
                     return 0;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java
index b243e62..7122f66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java
@@ -50,11 +50,6 @@ public class KeyCacheObjectTransferImpl implements KeyCacheObject {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> T getField(String name) {
-        throw new IllegalStateException();
-    }
-
-    /** {@inheritDoc} */
     @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index a867c2a..9425457 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -519,29 +519,27 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     GridCacheVersionConflictContext conflictCtx = null;
 
                                     if (conflictNeedResolve) {
-// TODO IGNITE-51.
-//                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
-//                                            drRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, cached);
-//
-//                                        assert drRes != null;
-//
-//                                        conflictCtx = drRes.get2();
-//
-//                                        if (conflictCtx.isUseOld())
-//                                            op = NOOP;
-//                                        else if (conflictCtx.isUseNew()) {
-//                                            txEntry.ttl(conflictCtx.ttl());
-//                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-//                                        }
-//                                        else if (conflictCtx.isMerge()) {
-//                                            op = drRes.get1();
-//                                            val = conflictCtx.mergeValue();
-//                                            valBytes = null;
-//                                            explicitVer = writeVersion();
-//
-//                                            txEntry.ttl(conflictCtx.ttl());
-//                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
-//                                        }
+                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
+                                            drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
+
+                                        assert drRes != null;
+
+                                        conflictCtx = drRes.get2();
+
+                                        if (conflictCtx.isUseOld())
+                                            op = NOOP;
+                                        else if (conflictCtx.isUseNew()) {
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
+                                        else if (conflictCtx.isMerge()) {
+                                            op = drRes.get1();
+                                            val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
+                                            explicitVer = writeVersion();
+
+                                            txEntry.ttl(conflictCtx.ttl());
+                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
+                                        }
                                     }
                                     else
                                         // Nullify explicit version so that innerSet/innerRemove will work as usual.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index ef5e7cc..9f42e91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -434,7 +434,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
         A.notNull(key, "key", val, "val");
 
-        return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsValArray(val));
+        return removeAllAsync0(F.asList(key), null, true, true, ctx.equalsValArray(val));
     }
 
     /** {@inheritDoc} */
@@ -472,13 +472,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> conflictMap)
+    @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap)
         throws IgniteCheckedException {
         putAllConflictAsync(conflictMap).get();
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> conflictMap) {
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
         return updateAllAsync0(null,
@@ -504,7 +504,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable CacheEntryPredicate... filter) {
         A.notNull(key, "key");
 
-        return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter);
+        return removeAllAsync0(Collections.singletonList(key), null, true, false, filter);
     }
 
     /** {@inheritDoc} */
@@ -518,7 +518,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CacheEntryPredicate[] filter) {
         A.notNull(keys, "keys");
 
-        return removeAllAsync0(keys, null, null, false, false, filter);
+        return removeAllAsync0(keys, null, false, false, filter);
     }
 
     /** {@inheritDoc} */
@@ -533,7 +533,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable CacheEntryPredicate... filter) {
         A.notNull(key, "key");
 
-        return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter);
+        return removeAllAsync0(Collections.singletonList(key), null, false, false, filter);
     }
 
     /** {@inheritDoc} */
@@ -559,16 +559,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> conflictMap)
+    @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap)
         throws IgniteCheckedException {
         removeAllConflictAsync(conflictMap).get();
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> conflictMap) {
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
         ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
 
-        return removeAllAsync0(null, conflictMap, null, false, false, null);
+        return removeAllAsync0(null, conflictMap, false, false, null);
     }
 
     /**
@@ -769,8 +769,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final Map<? extends K, ? extends V> map,
         @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
         @Nullable Object[] invokeArgs,
-        @Nullable final Map<? extends K, GridCacheDrInfo<V>> conflictPutMap,
-        @Nullable final Map<? extends K, GridCacheVersion> conflictRmvMap,
+        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
         final boolean retval,
         final boolean rawRetval,
         @Nullable GridCacheEntryEx cached,
@@ -819,7 +819,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      *
      * @param keys Keys to remove.
      * @param conflictMap Conflict map.
-     * @param cached Cached cache entry for key. May be passed if and only if keys size is {@code 1}.
      * @param retval Return value required flag.
      * @param rawRetval Return {@code GridCacheReturn} instance.
      * @param filter Cache entry filter for atomic removes.
@@ -827,8 +826,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     private IgniteInternalFuture removeAllAsync0(
         @Nullable final Collection<? extends K> keys,
-        @Nullable final Map<? extends K, GridCacheVersion> conflictMap,
-        @Nullable GridCacheEntryEx cached,
+        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap,
         final boolean retval,
         boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter
@@ -2249,7 +2247,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             log.debug("Remapping near update request locally: " + req);
 
         Collection<?> vals;
-        Collection<GridCacheDrInfo<?>> drPutVals;
+        Collection<GridCacheDrInfo> drPutVals;
         Collection<GridCacheVersion> drRmvVals;
 
         if (req.conflictVersions() == null) {
@@ -2267,9 +2265,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 long ttl = req.conflictTtl(i);
 
                 if (ttl == CU.TTL_NOT_CHANGED)
-                    drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.conflictVersion(i)));
+                    drPutVals.add(new GridCacheDrInfo(req.value(i), req.conflictVersion(i)));
                 else
-                    drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.conflictVersion(i), ttl,
+                    drPutVals.add(new GridCacheDrExpirationInfo(req.value(i), req.conflictVersion(i), ttl,
                         req.conflictExpireTime(i)));
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 45e9455..a53a730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -86,7 +86,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
 
     /** Conflict put values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Collection<GridCacheDrInfo<?>> conflictPutVals;
+    private Collection<GridCacheDrInfo> conflictPutVals;
 
     /** Conflict remove values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -187,7 +187,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         Collection<?> keys,
         @Nullable Collection<?> vals,
         @Nullable Object[] invokeArgs,
-        @Nullable Collection<GridCacheDrInfo<?>> conflictPutVals,
+        @Nullable Collection<GridCacheDrInfo> conflictPutVals,
         @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final boolean rawRetval,
@@ -543,7 +543,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
             }
             else if (conflictPutVals != null) {
                 // Conflict PUT.
-                GridCacheDrInfo<?> conflictPutVal =  F.first(conflictPutVals);
+                GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
 
                 val = conflictPutVal.value();
                 conflictVer = conflictPutVal.version();
@@ -631,7 +631,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
         if (vals != null)
             it = vals.iterator();
 
-        Iterator<GridCacheDrInfo<?>> conflictPutValsIt = null;
+        Iterator<GridCacheDrInfo> conflictPutValsIt = null;
 
         if (conflictPutVals != null)
             conflictPutValsIt = conflictPutVals.iterator();
@@ -679,7 +679,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
                     }
                 }
                 else if (conflictPutVals != null) {
-                    GridCacheDrInfo<?> conflictPutVal =  conflictPutValsIt.next();
+                    GridCacheDrInfo conflictPutVal =  conflictPutValsIt.next();
 
                     val = conflictPutVal.value();
                     conflictVer = conflictPutVal.version();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
index 08b43e0..74c83e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java
@@ -28,11 +28,11 @@ import java.nio.*;
  * Cache object and version.
  */
 public class CacheVersionedValue implements Message {
-    /** Cache version. */
-    private GridCacheVersion vers;
+    /** Value. */
+    private CacheObject val;
 
-    /** Cache object. */
-    private CacheObject obj;
+    /** Cache version. */
+    private GridCacheVersion ver;
 
     /** */
     public CacheVersionedValue() {
@@ -40,26 +40,26 @@ public class CacheVersionedValue implements Message {
     }
 
     /**
-     * @param vers Cache version.
-     * @param obj Cache object.
+     * @param val Cache value.
+     * @param ver Cache version.
      */
-    CacheVersionedValue(GridCacheVersion vers, CacheObject obj) {
-        this.vers = vers;
-        this.obj = obj;
+    CacheVersionedValue(CacheObject val, GridCacheVersion ver) {
+        this.val = val;
+        this.ver = ver;
     }
 
     /**
      * @return Cache version.
      */
     public GridCacheVersion version() {
-        return vers;
+        return ver;
     }
 
     /**
      * @return Cache object.
      */
-    public CacheObject cacheObject() {
-        return obj;
+    public CacheObject value() {
+        return val;
     }
 
     /**
@@ -70,12 +70,12 @@ public class CacheVersionedValue implements Message {
      * @throws IgniteCheckedException If failed.
      */
     public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
-        if (obj != null)
-            obj.prepareMarshal(ctx);
+        if (val != null)
+            val.prepareMarshal(ctx);
     }
 
     /**
-     * This method is called after the whole message is recived
+     * This method is called after the whole message is received
      * and is responsible for unmarshalling state.
      *
      * @param ctx Context.
@@ -83,8 +83,8 @@ public class CacheVersionedValue implements Message {
      * @throws IgniteCheckedException If failed.
      */
     public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        if (obj != null)
-            obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+        if (val != null)
+            val.finishUnmarshal(ctx.cacheObjectContext(), ldr);
     }
 
     /** {@inheritDoc} */
@@ -100,13 +100,13 @@ public class CacheVersionedValue implements Message {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeMessage("obj", obj))
+                if (!writer.writeMessage("val", val))
                     return false;
 
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage("vers", vers))
+                if (!writer.writeMessage("ver", ver))
                     return false;
 
                 writer.incrementState();
@@ -125,7 +125,7 @@ public class CacheVersionedValue implements Message {
 
         switch (reader.state()) {
             case 0:
-                obj = reader.readMessage("obj");
+                val = reader.readMessage("obj");
 
                 if (!reader.isLastRead())
                     return false;
@@ -133,7 +133,7 @@ public class CacheVersionedValue implements Message {
                 reader.incrementState();
 
             case 1:
-                vers = reader.readMessage("vers");
+                ver = reader.readMessage("ver");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 9aa407f..a5050e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -517,12 +517,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void putAllConflict(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> drMap) throws IgniteCheckedException {
         dht.putAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<? extends K, GridCacheDrInfo<V>> drMap) throws IgniteCheckedException {
+    @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> drMap)
+        throws IgniteCheckedException {
         return dht.putAllConflictAsync(drMap);
     }
 
@@ -641,12 +642,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> drMap)
+        throws IgniteCheckedException {
         dht.removeAllConflict(drMap);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap) throws IgniteCheckedException {
+    @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> drMap)
+        throws IgniteCheckedException {
         return dht.removeAllConflictAsync(drMap);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 557331c..3a86888 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -948,7 +948,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                                     CacheVersionedValue tup = entry.getValue();
 
-                                    nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(),
+                                    nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
                                         tup.version(), m.node().id());
                                 }
                                 else if (txEntry.cached().detached()) {
@@ -956,7 +956,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                                     CacheVersionedValue tup = entry.getValue();
 
-                                    detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion());
+                                    detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion());
                                 }
 
                                 break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index f88d363..711c01f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -174,7 +174,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         if (ownedVals == null)
             ownedVals = new HashMap<>();
 
-        CacheVersionedValue oVal = new CacheVersionedValue(ver, val);
+        CacheVersionedValue oVal = new CacheVersionedValue(val, ver);
 
         ownedVals.put(key, oVal);
     }
@@ -266,20 +266,20 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
             Iterator<IgniteTxKey> keyIter = ownedValKeys.iterator();
 
-            Iterator<CacheVersionedValue> valueIter = ownedValVals.iterator();
+            Iterator<CacheVersionedValue> valIter = ownedValVals.iterator();
 
             while (keyIter.hasNext()) {
                 IgniteTxKey key = keyIter.next();
 
                 GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 
-                CacheVersionedValue value = valueIter.next();
+                CacheVersionedValue val = valIter.next();
 
                 key.finishUnmarshal(cctx, ldr);
 
-                value.finishUnmarshal(cctx, ldr);
+                val.finishUnmarshal(cctx, ldr);
 
-                ownedVals.put(key, value);
+                ownedVals.put(key, val);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
index e2feb13..f003e84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.dr;
 
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -25,7 +26,7 @@ import java.io.*;
 /**
  * Cache DR info used as argument in PUT cache internal interfaces with expiration info added.
  */
-public class GridCacheDrExpirationInfo<V> extends GridCacheDrInfo<V> {
+public class GridCacheDrExpirationInfo extends GridCacheDrInfo {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -50,7 +51,7 @@ public class GridCacheDrExpirationInfo<V> extends GridCacheDrInfo<V> {
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
-    public GridCacheDrExpirationInfo(V val, GridCacheVersion ver, long ttl, long expireTime) {
+    public GridCacheDrExpirationInfo(CacheObject val, GridCacheVersion ver, long ttl, long expireTime) {
         super(val, ver);
 
         this.ttl = ttl;
@@ -71,20 +72,4 @@ public class GridCacheDrExpirationInfo<V> extends GridCacheDrInfo<V> {
     @Override public String toString() {
         return S.toString(GridCacheDrExpirationInfo.class, this);
     }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        super.writeExternal(out);
-
-        out.writeLong(ttl);
-        out.writeLong(expireTime);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        super.readExternal(in);
-
-        ttl = in.readLong();
-        expireTime = in.readLong();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 6c875d8..b3411e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.dr;
 
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
@@ -25,12 +27,12 @@ import java.io.*;
 /**
  * Cache DR info used as argument in PUT cache internal interfaces.
  */
-public class GridCacheDrInfo<V> implements Externalizable {
+public class GridCacheDrInfo implements Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Value. */
-    private V val;
+    private CacheObject val;
 
     /** DR version. */
     private GridCacheVersion ver;
@@ -48,7 +50,7 @@ public class GridCacheDrInfo<V> implements Externalizable {
      * @param val Value.
      * @param ver Version.
      */
-    public GridCacheDrInfo(V val, GridCacheVersion ver) {
+    public GridCacheDrInfo(CacheObject val, GridCacheVersion ver) {
         assert val != null;
         assert ver != null;
 
@@ -59,7 +61,7 @@ public class GridCacheDrInfo<V> implements Externalizable {
     /**
      * @return Value.
      */
-    public V value() {
+    public CacheObject value() {
         return val;
     }
 
@@ -84,21 +86,18 @@ public class GridCacheDrInfo<V> implements Externalizable {
         return CU.EXPIRE_TIME_ETERNAL;
     }
 
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridCacheDrInfo.class, this);
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        assert false;
     }
 
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(val);
-        CU.writeVersion(out, ver);
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        assert false;
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        val = (V)in.readObject();
-        ver = CU.readVersion(in);
+    @Override public String toString() {
+        return S.toString(GridCacheDrInfo.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
index 85831a8..dbf90c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
@@ -26,7 +26,7 @@ import org.jetbrains.annotations.*;
 /**
  * Replication manager class which processes all replication events.
  */
-public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> {
+public interface GridCacheDrManager extends GridCacheManager {
     /**
      * @return Data center ID.
      */
@@ -41,19 +41,15 @@ public interface GridCacheDrManager<K, V> extends GridCacheManager<K, V> {
      * Performs replication.
      *
      * @param key Key.
-     * @param keyBytes Key bytes.
      * @param val Value.
-     * @param valBytes Value bytes.
      * @param ttl TTL.
      * @param expireTime Expire time.
      * @param ver Version.
      * @param drType Replication type.
      * @throws IgniteCheckedException If failed.
      */
-    public void replicate(K key,
-        @Nullable byte[] keyBytes,
-        @Nullable V val,
-        @Nullable byte[] valBytes,
+    public void replicate(KeyCacheObject key,
+        @Nullable CacheObject val,
         long ttl,
         long expireTime,
         GridCacheVersion ver,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
index 49f617b..7cec4eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
@@ -27,14 +27,14 @@ import org.jetbrains.annotations.*;
 /**
  * No-op implementation for {@link GridCacheDrManager}.
  */
-public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> {
+public class GridOsCacheDrManager implements GridCacheDrManager {
     /** {@inheritDoc} */
     @Override public boolean enabled() {
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public void start(GridCacheContext<K, V> cctx) throws IgniteCheckedException {
+    @Override public void start(GridCacheContext cctx) throws IgniteCheckedException {
         // No-op.
     }
 
@@ -69,10 +69,8 @@ public class GridOsCacheDrManager<K, V> implements GridCacheDrManager<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public void replicate(K key,
-        @Nullable byte[] keyBytes,
-        @Nullable V val,
-        @Nullable byte[] valBytes,
+    @Override public void replicate(KeyCacheObject key,
+        @Nullable CacheObject val,
         long ttl,
         long expireTime,
         GridCacheVersion ver,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java
index 1ff174f..ef9204d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.*;
 /**
  * Provides possibility to integrate cache transactions with JTA.
  */
-public abstract class CacheJtaManagerAdapter<K, V> extends GridCacheManagerAdapter<K, V> {
+public abstract class CacheJtaManagerAdapter extends GridCacheManagerAdapter {
     /**
      * Creates transaction manager finder.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java
index 4590031..451357f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.*;
 /**
  * No-op implementation of {@link CacheJtaManagerAdapter}.
  */
-public class CacheNoopJtaManager<K, V> extends CacheJtaManagerAdapter<K, V> {
+public class CacheNoopJtaManager extends CacheJtaManagerAdapter {
     /** {@inheritDoc} */
     @Override public void checkJta() throws IgniteCheckedException {
         // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7253addd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0e37349..b46bc59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1298,8 +1298,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
      * @throws GridCacheEntryRemovedException If entry got removed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    protected <K, V> IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>> conflictResolve(
-        GridCacheOperation op, IgniteTxEntry txEntry, V newVal, byte[] newValBytes, GridCacheVersion newVer,
+    protected IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictResolve(
+        GridCacheOperation op,
+        IgniteTxEntry txEntry,
+        CacheObject newVal,
+        GridCacheVersion newVer,
         GridCacheEntryEx old)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert newVer != null;
@@ -1351,19 +1354,19 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         GridCacheVersionedEntryEx oldEntry = old.versionedEntry();
 
         // Construct new entry info.
-        if (newVal == null && newValBytes != null)
-            newVal = cctx.marshaller().unmarshal(newValBytes, cctx.deploy().globalLoader());
-
-        GridCacheVersionedEntryEx newEntry =
-            new GridCachePlainVersionedEntry<>((K)txEntry.key(), newVal, newTtl, newExpireTime, newVer);
+        Object newVal0 = CU.value(newVal, txEntry.context(), false);
 
-        GridCacheVersionConflictContext<K, V> ctx = null;
+        GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry(
+            oldEntry.key(),
+            newVal0,
+            newTtl,
+            newExpireTime,
+            newVer);
 
-        // TODO IGNITE-51.
-        //GridCacheVersionConflictContext<K, V> ctx = old.context().conflictResolve(oldEntry, newEntry, false);
+        GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false);
 
         if (ctx.isMerge()) {
-            V resVal = ctx.mergeValue();
+            Object resVal = ctx.mergeValue();
 
             if ((op == CREATE || op == UPDATE) && resVal == null)
                 op = DELETE;


Mime
View raw message