ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/17] incubator-ignite git commit: # ignite-51
Date Tue, 24 Feb 2015 14:36:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbc8ed32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2da1bb7..79b4c7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -52,22 +52,22 @@ import static org.apache.ignite.transactions.TransactionState.*;
 /**
  * Transaction adapter for cache transactions.
  */
-public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
-    implements IgniteTxLocalEx<K, V> {
+public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
+    implements IgniteTxLocalEx {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Per-transaction read map. */
     @GridToStringExclude
-    protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> txMap;
+    protected Map<IgniteTxKey, IgniteTxEntry> txMap;
 
     /** Read view on transaction map. */
     @GridToStringExclude
-    protected IgniteTxMap<K, V> readView;
+    protected IgniteTxMap readView;
 
     /** Write view on transaction map. */
     @GridToStringExclude
-    protected IgniteTxMap<K, V> writeView;
+    protected IgniteTxMap writeView;
 
     /** Minimal version encountered (either explicit lock or XID of this transaction). */
     protected GridCacheVersion minVer;
@@ -100,7 +100,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     protected boolean needRetVal;
 
     /** Implicit transaction result. */
-    protected GridCacheReturn<V> implicitRes = new GridCacheReturn<>(false);
+    protected GridCacheReturn<CacheObject> implicitRes = new GridCacheReturn<>(false);
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -124,7 +124,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
      */
     protected IgniteTxLocalAdapter(
-        GridCacheSharedContext<K, V> cctx,
+        GridCacheSharedContext cctx,
         GridCacheVersion xidVer,
         boolean implicit,
         boolean implicitSingle,
@@ -186,7 +186,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
+    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
         assert false;
         return false;
     }
@@ -206,61 +206,61 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasWriteKey(IgniteTxKey<K> key) {
+    @Override public boolean hasWriteKey(IgniteTxKey key) {
         return writeView.containsKey(key);
     }
 
     /**
      * @return Transaction read set.
      */
-    @Override public Set<IgniteTxKey<K>> readSet() {
-        return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : readView.keySet();
+    @Override public Set<IgniteTxKey> readSet() {
+        return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet();
     }
 
     /**
      * @return Transaction write set.
      */
-    @Override public Set<IgniteTxKey<K>> writeSet() {
-        return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : writeView.keySet();
+    @Override public Set<IgniteTxKey> writeSet() {
+        return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean removed(IgniteTxKey<K> key) {
+    @Override public boolean removed(IgniteTxKey key) {
         if (txMap == null)
             return false;
 
-        IgniteTxEntry<K, V> e = txMap.get(key);
+        IgniteTxEntry e = txMap.get(key);
 
         return e != null && e.op() == DELETE;
     }
 
     /** {@inheritDoc} */
-    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() {
-        return readView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : readView;
+    @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
+        return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView;
     }
 
     /** {@inheritDoc} */
-    @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() {
-        return writeView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : writeView;
+    @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
+        return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView;
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgniteTxEntry<K, V>> allEntries() {
-        return txMap == null ? Collections.<IgniteTxEntry<K, V>>emptySet() : txMap.values();
+    @Override public Collection<IgniteTxEntry> allEntries() {
+        return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values();
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgniteTxEntry<K, V>> readEntries() {
-        return readView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : readView.values();
+    @Override public Collection<IgniteTxEntry> readEntries() {
+        return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values();
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgniteTxEntry<K, V>> writeEntries() {
-        return writeView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : writeView.values();
+    @Override public Collection<IgniteTxEntry> writeEntries() {
+        return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values();
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) {
+    @Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) {
         return txMap == null ? null : txMap.get(key);
     }
 
@@ -274,14 +274,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheReturn<V> implicitSingleResult() {
+    @Override public GridCacheReturn<CacheObject> implicitSingleResult() {
         return implicitRes;
     }
 
     /**
      * @param ret Result.
      */
-    public void implicitSingleResult(GridCacheReturn<V> ret) {
+    public void implicitSingleResult(GridCacheReturn<CacheObject> ret) {
         if (ret.invokeResult())
             implicitRes.mergeEntryProcessResults(ret);
         else
@@ -319,20 +319,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
     /** {@inheritDoc} */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable @Override public GridTuple<V> peek(
-        GridCacheContext<K, V> cacheCtx,
+    @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+        GridCacheContext cacheCtx,
         boolean failFast,
-        K key,
+        KeyCacheObject key,
         IgnitePredicate<Cache.Entry<K, V>>[] filter
     ) throws GridCacheFilterFailedException {
-        IgniteTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
+        IgniteTxEntry e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
 
         if (e != null) {
             // We should look at tx entry previous value. If this is a user peek then previous
             // value is the same as value. If this is a filter evaluation peek then previous value holds
             // value visible to filter while value contains value enlisted for write.
-            if (!F.isEmpty(filter) && !F.isAll(e.cached().wrapLazyValue(), filter))
-                return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null;
+            if (!F.isEmpty(filter) && !F.isAll(e.cached().<K, V>wrapLazyValue(), filter))
+                return e.hasPreviousValue() ? F.t(CU.<CacheObject>failed(failFast, e.previousValue())) : null;
 
             return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
         }
@@ -342,18 +342,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Boolean> loadMissing(
-        final GridCacheContext<K, V> cacheCtx,
+        final GridCacheContext cacheCtx,
         final boolean readThrough,
         boolean async,
-        final Collection<? extends K> keys,
+        final Collection<KeyCacheObject> keys,
         boolean deserializePortable,
         boolean skipVals,
-        final IgniteBiInClosure<K, V> c
+        final IgniteBiInClosure<KeyCacheObject, Object> c
     ) {
         if (!async) {
             try {
                 if (!readThrough || !cacheCtx.readThrough()) {
-                    for (K key : keys)
+                    for (KeyCacheObject key : keys)
                         c.apply(key, null);
 
                     return new GridFinishedFuture<>(cctx.kernalContext(), false);
@@ -371,7 +371,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 new GPC<Boolean>() {
                     @Override public Boolean call() throws Exception {
                         if (!readThrough || !cacheCtx.readThrough()) {
-                            for (K key : keys)
+                            for (KeyCacheObject key : keys)
                                 c.apply(key, null);
 
                             return false;
@@ -443,7 +443,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      *
      * @param entry Cache entry to check.
      */
-    private void checkCommitLocks(GridCacheEntryEx<K, V> entry) {
+    private void checkCommitLocks(GridCacheEntryEx entry) {
         assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
             ", tx=" + this + ']';
     }
@@ -455,7 +455,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param key Key.
      * @return Cache entry.
      */
-    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key) {
+    protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) {
         return cacheCtx.cache().entryEx(key.key());
     }
 
@@ -467,7 +467,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param topVer Topology version.
      * @return Cache entry.
      */
-    protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) {
+    protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key, long topVer) {
         return cacheCtx.cache().entryEx(key.key(), topVer);
     }
 
@@ -480,20 +480,20 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @throws IgniteCheckedException If batch update failed.
      */
     @SuppressWarnings({"CatchGenericClass"})
-    protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
-        GridCacheStoreManager<K, V> store = store();
+    protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+        GridCacheStoreManager store = store();
 
         if (store != null && store.writeThrough() && storeEnabled() &&
             (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
             try {
                 if (writeEntries != null) {
-                    Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
-                    List<K> rmvCol = null;
-                    GridCacheStoreManager<K, V> writeStore = null;
+                    Map<KeyCacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>> putMap = null;
+                    List<KeyCacheObject> rmvCol = null;
+                    GridCacheStoreManager writeStore = null;
 
                     boolean skipNear = near() && store.writeToStoreFromDht();
 
-                    for (IgniteTxEntry<K, V> e : writeEntries) {
+                    for (IgniteTxEntry e : writeEntries) {
                         if (skipNear && e.cached().isNear())
                             continue;
 
@@ -502,13 +502,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         if (intercept || !F.isEmpty(e.entryProcessors()))
                             e.cached().unswap(true, false);
 
-                        GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
+                        // TODO IGNITE-51 (do not need convert to CacheObject to pass to store?).
+                        GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(e, false);
 
-                        GridCacheContext<K, V> cacheCtx = e.context();
+                        GridCacheContext cacheCtx = e.context();
 
                         GridCacheOperation op = res.get1();
-                        K key = e.key();
-                        V val = res.get2();
+                        KeyCacheObject key = e.key();
+                        CacheObject val = res.get2();
                         GridCacheVersion ver = writeVersion();
 
                         if (op == CREATE || op == UPDATE) {
@@ -535,14 +536,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             }
 
                             if (intercept) {
-                                V old = e.cached().rawGetOrUnmarshal(true);
+                                Object oldVal = CU.value(e.cached().rawGetOrUnmarshal(true), cacheCtx);
 
-                                val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
+                                Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut(key, oldVal, val);
 
-                                if (val == null)
+                                if (interceptorVal == null)
                                     continue;
 
-                                val = cacheCtx.unwrapTemporary(val);
+                                val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
                             }
 
                             if (putMap == null)
@@ -575,10 +576,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             }
 
                             if (intercept) {
-                                V old = e.cached().rawGetOrUnmarshal(true);
+                                Object oldVal = CU.value(e.cached().rawGetOrUnmarshal(true), cacheCtx);
 
-                                IgniteBiTuple<Boolean, V> t = cacheCtx.config().getInterceptor()
-                                    .onBeforeRemove(key, old);
+                                IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor()
+                                    .onBeforeRemove(key, oldVal);
 
                                 if (cacheCtx.cancelRemove(t))
                                     continue;
@@ -677,8 +678,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 /*
                  * Commit to cache. Note that for 'near' transaction we loop through all the entries.
                  */
-                for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {
-                    GridCacheContext<K, V> cacheCtx = txEntry.context();
+                for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
+                    GridCacheContext cacheCtx = txEntry.context();
 
                     GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
 
@@ -687,7 +688,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     try {
                         while (true) {
                             try {
-                                GridCacheEntryEx<K, V> cached = txEntry.cached();
+                                GridCacheEntryEx cached = txEntry.cached();
 
                                 // Must try to evict near entries before committing from
                                 // transaction manager to make sure locks are held.
@@ -701,7 +702,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (cached.detached())
                                         break;
 
-                                    GridCacheEntryEx<K, V> nearCached = null;
+                                    GridCacheEntryEx nearCached = null;
 
                                     boolean metrics = true;
 
@@ -715,14 +716,14 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters()))
                                         txEntry.cached().unswap(true, false);
 
-                                    GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
+                                    GridTuple3<GridCacheOperation, CacheObject, byte[]> res = applyTransformClosures(txEntry,
                                         true);
 
                                     // For near local transactions we must record DHT version
                                     // in order to keep near entries on backup nodes until
                                     // backup remote transaction completes.
                                     if (cacheCtx.isNear()) {
-                                        ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
+                                        ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
 
                                         if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
                                             txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
@@ -738,7 +739,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     }
 
                                     GridCacheOperation op = res.get1();
-                                    V val = res.get2();
+                                    CacheObject val = res.get2();
                                     byte[] valBytes = res.get3();
 
                                     // Deal with DR conflicts.
@@ -765,35 +766,36 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     boolean drNeedResolve = cacheCtx.conflictNeedResolve();
 
                                     if (drNeedResolve) {
-                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
-                                            drRes = conflictResolve(op, txEntry.key(), val, valBytes, txEntry.ttl(),
-                                                txEntry.conflictExpireTime(), explicitVer, cached);
-
-                                        assert drRes != null;
-
-                                        GridCacheVersionConflictContext<K, V> conflictCtx = drRes.get2();
-
-                                        if (conflictCtx.isUseOld())
-                                            op = NOOP;
-                                        else if (conflictCtx.isUseNew()) {
-                                            txEntry.ttl(conflictCtx.ttl());
-
-                                            if (conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId())
-                                                txEntry.conflictExpireTime(conflictCtx.expireTime());
-                                            else
-                                                txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
-                                        }
-                                        else {
-                                            assert conflictCtx.isMerge();
-
-                                            op = drRes.get1();
-                                            val = conflictCtx.mergeValue();
-                                            valBytes = null;
-                                            explicitVer = writeVersion();
-
-                                            txEntry.ttl(conflictCtx.ttl());
-                                            txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
-                                        }
+// TODO IGNITE-51.
+//                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
+//                                            drRes = conflictResolve(op, txEntry.key(), val, valBytes, txEntry.ttl(),
+//                                                txEntry.conflictExpireTime(), explicitVer, cached);
+//
+//                                        assert drRes != null;
+//
+//                                        GridCacheVersionConflictContext conflictCtx = drRes.get2();
+//
+//                                        if (conflictCtx.isUseOld())
+//                                            op = NOOP;
+//                                        else if (conflictCtx.isUseNew()) {
+//                                            txEntry.ttl(conflictCtx.ttl());
+//
+//                                            if (conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId())
+//                                                txEntry.conflictExpireTime(conflictCtx.expireTime());
+//                                            else
+//                                                txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+//                                        }
+//                                        else {
+//                                            assert conflictCtx.isMerge();
+//
+//                                            op = drRes.get1();
+//                                            val = conflictCtx.mergeValue();
+//                                            valBytes = null;
+//                                            explicitVer = writeVersion();
+//
+//                                            txEntry.ttl(conflictCtx.ttl());
+//                                            txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
+//                                        }
                                     }
                                     else
                                         // Nullify explicit version so that innerSet/innerRemove will work as usual.
@@ -803,14 +805,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         assert sndTransformedVals && cacheCtx.isReplicated() || drNeedResolve;
 
                                         txEntry.value(val, true, false);
-                                        txEntry.valueBytes(valBytes);
                                         txEntry.op(op);
                                         txEntry.entryProcessors(null);
                                         txEntry.conflictVersion(explicitVer);
                                     }
 
                                     if (op == CREATE || op == UPDATE) {
-                                        GridCacheUpdateTxResult<V> updRes = cached.innerSet(
+                                        GridCacheUpdateTxResult updRes = cached.innerSet(
                                             this,
                                             eventNodeId(),
                                             txEntry.nodeId(),
@@ -842,7 +843,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                                 false,
                                                 metrics,
                                                 topVer,
-                                                CU.<K, V>empty(),
+                                                CU.empty(),
                                                 DR_NONE,
                                                 txEntry.conflictExpireTime(),
                                                 null,
@@ -850,7 +851,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                                 resolveTaskName());
                                     }
                                     else if (op == DELETE) {
-                                        GridCacheUpdateTxResult<V> updRes = cached.innerRemove(
+                                        GridCacheUpdateTxResult updRes = cached.innerRemove(
                                             this,
                                             eventNodeId(),
                                             txEntry.nodeId(),
@@ -875,7 +876,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                                 false,
                                                 metrics,
                                                 topVer,
-                                                CU.<K, V>empty(),
+                                                CU.empty(),
                                                 DR_NONE,
                                                 null,
                                                 CU.subjectId(this, cctx),
@@ -928,7 +929,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 if (log.isDebugEnabled())
                                     log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
 
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes());
+                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), null);
                             }
                         }
                     }
@@ -978,7 +979,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             }
         }
         else {
-            GridCacheStoreManager<K, V> store = store();
+            GridCacheStoreManager store = store();
 
             if (store != null && (!internal() || groupLock())) {
                 try {
@@ -1078,12 +1079,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 if (near())
                     // Must evict near entries before rolling back from
                     // transaction manager, so they will be removed from cache.
-                    for (IgniteTxEntry<K, V> e : allEntries())
+                    for (IgniteTxEntry e : allEntries())
                         evictNearEntry(e, false);
 
                 cctx.tm().rollbackTx(this);
 
-                GridCacheStoreManager<K, V> store = store();
+                GridCacheStoreManager store = store();
 
                 if (store != null && (near() || store.writeToStoreFromDht())) {
                     if (!internal() || groupLock())
@@ -1115,13 +1116,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @return Enlisted keys.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
-    private Collection<K> enlistRead(
-        final GridCacheContext<K, V> cacheCtx,
+    private <K, V> Collection<KeyCacheObject> enlistRead(
+        final GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         Map<K, V> map,
-        Map<K, GridCacheVersion> missed,
+        Map<KeyCacheObject, GridCacheVersion> missed,
         int keysCnt,
         boolean deserializePortable,
         boolean skipVals
@@ -1136,7 +1137,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
         boolean single = keysCnt == 1;
 
-        Collection<K> lockKeys = null;
+        Collection<KeyCacheObject> lockKeys = null;
 
         long topVer = topologyVersion();
 
@@ -1150,14 +1151,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             if (pessimistic() && !readCommitted() && !skipVals)
                 addActiveCache(cacheCtx);
 
-            IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+            KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+            IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
 
             // Check write map (always check writes first).
-            IgniteTxEntry<K, V> txEntry = entry(txKey);
+            IgniteTxEntry txEntry = entry(txKey);
 
             // Either non-read-committed or there was a previous write.
             if (txEntry != null) {
-                V val = txEntry.value();
+                CacheObject val = txEntry.value();
 
                 // Read value from locked entry in group-lock transaction as well.
                 if (txEntry.hasValue()) {
@@ -1165,10 +1168,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         val = txEntry.applyEntryProcessors(val);
 
                     if (val != null) {
-                        V val0 = val;
+                        Object val0 = val.value(cacheCtx);
 
                         if (cacheCtx.portableEnabled())
-                            val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+                            val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
 
                         map.put(key, (V)CU.skipValue(val0, skipVals));
                     }
@@ -1202,15 +1205,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 if (!F.isEmpty(txEntry.entryProcessors()))
                                     val = txEntry.applyEntryProcessors(val);
 
-                                V val0 = val;
+                                Object val0 = val.value(cacheCtx);
 
                                 if (cacheCtx.portableEnabled())
-                                    val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+                                    val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
 
                                 map.put(key, (V)CU.skipValue(val0, skipVals));
                             }
                             else
-                                missed.put(key, txEntry.cached().version());
+                                missed.put(cacheKey, txEntry.cached().version());
 
                             break;
                         }
@@ -1222,7 +1225,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 txEntry.readValue(e.<V>value());
                         }
                         catch (GridCacheEntryRemovedException ignored) {
-                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes());
+                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), null);
                         }
                     }
                 }
@@ -1230,13 +1233,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             // First time access within transaction.
             else {
                 if (lockKeys == null && !skipVals)
-                    lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt);
+                    lockKeys = single ? Collections.singleton(cacheKey) : new ArrayList<KeyCacheObject>(keysCnt);
 
                 if (!single && !skipVals)
-                    lockKeys.add(key);
+                    lockKeys.add(cacheKey);
 
                 while (true) {
-                    GridCacheEntryEx<K, V> entry;
+                    GridCacheEntryEx entry;
 
                     if (cached != null) {
                         entry = cached;
@@ -1249,7 +1252,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     try {
                         GridCacheVersion ver = entry.version();
 
-                        V val = null;
+                        CacheObject val = null;
 
                         if (!pessimistic() || readCommitted() || groupLock() && !skipVals) {
                             IgniteCacheExpiryPolicy accessPlc =
@@ -1270,19 +1273,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 accessPlc);
 
                             if (val != null) {
-                                V val0 = val;
+                                Object val0 = val.value(cacheCtx);
 
                                 if (cacheCtx.portableEnabled())
-                                    val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+                                    val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
 
                                 map.put(key, (V)CU.skipValue(val0, skipVals));
                             }
                             else
-                                missed.put(key, ver);
+                                missed.put(cacheKey, ver);
                         }
                         else
                             // We must wait for the lock in pessimistic mode.
-                            missed.put(key, ver);
+                            missed.put(cacheKey, ver);
 
                         if (!readCommitted() && !skipVals) {
                             txEntry = addEntry(READ,
@@ -1318,7 +1321,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                         if (!readCommitted()) {
                             // Value for which failure occurred.
-                            V val = e.<V>value();
+                            CacheObject val = e.value();
 
                             txEntry = addEntry(READ,
                                 val,
@@ -1326,7 +1329,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 null,
                                 entry,
                                 expiryPlc,
-                                CU.<K, V>empty(),
+                                CU.empty(),
                                 false,
                                 -1L,
                                 -1L,
@@ -1343,7 +1346,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             }
         }
 
-        return lockKeys != null ? lockKeys : Collections.<K>emptyList();
+        return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
     }
 
     /**
@@ -1354,7 +1357,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      */
     protected IgniteCacheExpiryPolicy accessPolicy(
         GridCacheContext ctx,
-        IgniteTxKey<K> key,
+        IgniteTxKey key,
         @Nullable ExpiryPolicy expiryPlc
     ) {
         return null;
@@ -1367,7 +1370,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param key Key to add.
      * @return Skipped set.
      */
-    private Set<K> skip(Set<K> skipped, K key) {
+    private Set<KeyCacheObject> skip(Set<KeyCacheObject> skipped, KeyCacheObject key) {
         if (skipped == null)
             skipped = new GridLeanSet<>();
 
@@ -1390,10 +1393,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param deserializePortable Deserialize portable flag.
      * @return Loaded key-value pairs.
      */
-    private IgniteInternalFuture<Map<K, V>> checkMissed(
-        final GridCacheContext<K, V> cacheCtx,
+    private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
+        final GridCacheContext cacheCtx,
         final Map<K, V> map,
-        final Map<K, GridCacheVersion> missedMap,
+        final Map<KeyCacheObject, GridCacheVersion> missedMap,
         @Nullable final Collection<K> redos,
         final boolean deserializePortable,
         final boolean skipVals
@@ -1403,16 +1406,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         if (log.isDebugEnabled())
             log.debug("Loading missed values for missed map: " + missedMap);
 
-        final Collection<K> loaded = new HashSet<>();
+        final Collection<KeyCacheObject> loaded = new HashSet<>();
 
         return new GridEmbeddedFuture<>(cctx.kernalContext(),
             loadMissing(
                 cacheCtx,
-                true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<K, V>() {
+                true, false, missedMap.keySet(), deserializePortable, skipVals, new CI2<KeyCacheObject, Object>() {
                 /** */
                 private GridCacheVersion nextVer;
 
-                @Override public void apply(K key, V val) {
+                @Override public void apply(KeyCacheObject key, Object val) {
                     if (isRollbackOnly()) {
                         if (log.isDebugEnabled())
                             log.debug("Ignoring loaded value for read because transaction was rolled back: " +
@@ -1430,15 +1433,17 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         return;
                     }
 
-                    V visibleVal = val;
+                    CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+                    Object visibleVal = val;
 
-                    IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+                    IgniteTxKey txKey = cacheCtx.txKey(key);
 
-                    IgniteTxEntry<K, V> txEntry = entry(txKey);
+                    IgniteTxEntry txEntry = entry(txKey);
 
                     if (txEntry != null) {
                         if (!readCommitted())
-                            txEntry.readValue(val);
+                            txEntry.readValue(cacheVal);
 
                         if (!F.isEmpty(txEntry.entryProcessors()))
                             visibleVal = txEntry.applyEntryProcessors(visibleVal);
@@ -1456,7 +1461,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     while (true) {
                         assert txEntry != null || readCommitted() || groupLock() || skipVals;
 
-                        GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+                        GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
 
                         try {
                             // Must initialize to true since even if filter didn't pass,
@@ -1464,7 +1469,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             boolean set;
 
                             try {
-                                set = e.versionedValue(val, ver, nextVer);
+                                set = e.versionedValue(cacheVal, ver, nextVer);
                             }
                             catch (GridCacheEntryRemovedException ignore) {
                                 if (log.isDebugEnabled())
@@ -1482,7 +1487,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 }
 
                                 if (txEntry != null)
-                                    txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+                                    txEntry.cached(entryEx(cacheCtx, txKey), null);
 
                                 continue; // While loop.
                             }
@@ -1494,15 +1499,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 cacheCtx.evicts().touch(e, topologyVersion());
 
                                 if (visibleVal != null)
-                                    map.put(key, (V)CU.skipValue(visibleVal, skipVals));
+                                    map.put(key.<K>value(cacheCtx), (V)CU.skipValue(visibleVal, skipVals));
                             }
                             else {
                                 assert txEntry != null;
 
-                                txEntry.setAndMarkValid(val);
+                                txEntry.setAndMarkValid(cacheVal);
 
                                 if (visibleVal != null)
-                                    map.put(key, visibleVal);
+                                    map.put(key.<K>value(cacheCtx), (V)visibleVal);
                             }
 
                             loaded.add(key);
@@ -1529,8 +1534,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                     if (!b && !readCommitted()) {
                         // There is no store - we must mark the entries.
-                        for (K key : missedMap.keySet()) {
-                            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+                        for (KeyCacheObject key : missedMap.keySet()) {
+                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
 
                             if (txEntry != null)
                                 txEntry.markValid();
@@ -1538,15 +1543,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     }
 
                     if (readCommitted()) {
-                        Collection<K> notFound = new HashSet<>(missedMap.keySet());
+                        Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
 
                         notFound.removeAll(loaded);
 
                         // In read-committed mode touch entries that have just been read.
-                        for (K key : notFound) {
-                            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+                        for (KeyCacheObject key : notFound) {
+                            IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
 
-                            GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+                            GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
                                 txEntry.cached();
 
                             if (entry != null)
@@ -1560,10 +1565,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
-        final GridCacheContext<K, V> cacheCtx,
+    @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
+        final GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         final boolean deserializePortable,
         final boolean skipVals) {
         if (F.isEmpty(keys))
@@ -1580,13 +1585,13 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
 
-            final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+            final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
 
             GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
 
             ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
 
-            final Collection<K> lockKeys = enlistRead(cacheCtx,
+            final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
                 keys,
                 cached,
                 expiryPlc,
@@ -1614,7 +1619,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     isolation,
                     isInvalidate(),
                     accessTtl,
-                    CU.<K, V>empty());
+                    CU.empty());
 
                 PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
                     @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
@@ -1622,20 +1627,22 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             log.debug("Acquired transaction lock for read on keys: " + lockKeys);
 
                         // Load keys only after the locks have been acquired.
-                        for (K key : lockKeys) {
-                            if (retMap.containsKey(key))
+                        for (KeyCacheObject cacheKey : lockKeys) {
+                            K keyVal = cacheKey.<K>value(cacheCtx);
+
+                            if (retMap.containsKey(keyVal))
                                 // We already have a return value.
                                 continue;
 
-                            IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+                            IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
 
-                            IgniteTxEntry<K, V> txEntry = entry(txKey);
+                            IgniteTxEntry txEntry = entry(txKey);
 
                             assert txEntry != null;
 
                             // Check if there is cached value.
                             while (true) {
-                                GridCacheEntryEx<K, V> cached = txEntry.cached();
+                                GridCacheEntryEx cached = txEntry.cached();
 
                                 try {
                                     Object transformClo =
@@ -1643,7 +1650,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
                                             F.first(txEntry.entryProcessors()) : null;
 
-                                    V val = cached.innerGet(IgniteTxLocalAdapter.this,
+                                    CacheObject val = cached.innerGet(IgniteTxLocalAdapter.this,
                                         cacheCtx.isSwapOrOffheapEnabled(),
                                         /*read-through*/false,
                                         /*fail-fast*/true,
@@ -1658,17 +1665,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                                     // If value is in cache and passed the filter.
                                     if (val != null) {
-                                        missed.remove(key);
+                                        missed.remove(cacheKey);
 
                                         txEntry.setAndMarkValid(val);
 
+                                        Object val0 = val.value(cacheCtx);
+
                                         if (!F.isEmpty(txEntry.entryProcessors()))
-                                            val = txEntry.applyEntryProcessors(val);
+                                            val0 = txEntry.applyEntryProcessors(val0);
 
                                         if (cacheCtx.portableEnabled())
-                                            val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+                                            val0 = cacheCtx.unwrapPortableIfNeeded(val0, !deserializePortable);
 
-                                        retMap.put(key, val);
+                                        retMap.put(keyVal, (V)val0);
                                     }
 
                                     // Even though we bring the value back from lock acquisition,
@@ -1682,16 +1691,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         log.debug("Got removed exception in get postLock (will retry): " +
                                             cached);
 
-                                    txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+                                    txEntry.cached(entryEx(cacheCtx, txKey), null);
                                 }
                                 catch (GridCacheFilterFailedException e) {
                                     // Failed value for the filter.
-                                    V val = e.value();
+                                    CacheObject val = e.value();
 
                                     if (val != null) {
                                         // If filter fails after lock is acquired, we don't reload,
                                         // regardless if value is null or not.
-                                        missed.remove(key);
+                                        missed.remove(cacheKey);
 
                                         txEntry.setAndMarkValid(val);
                                     }
@@ -1751,9 +1760,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                 if (!missed.isEmpty()) {
                     if (!readCommitted())
-                        for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); )
-                            if (retMap.containsKey(it.next()))
+                        for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
+                            K keyVal = it.next().value(cacheCtx);
+
+                            if (retMap.containsKey(keyVal))
                                 it.remove();
+                        }
 
                     if (missed.isEmpty())
                         return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
@@ -1780,12 +1792,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         new FinishClosure<Map<K, V>>() {
                             @Override Map<K, V> finish(Map<K, V> loaded) {
                                 for (Map.Entry<K, V> entry : loaded.entrySet()) {
-                                    IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey()));
+                                    // TODO IGNITE-51.
+                                    KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(entry.getKey());
+
+                                    IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey));
 
                                     V val = entry.getValue();
 
                                     if (!readCommitted())
-                                        txEntry.readValue(val);
+                                        txEntry.readValue(cacheCtx.toCacheObject(val));
 
                                     if (!F.isEmpty(txEntry.entryProcessors()))
                                         val = txEntry.applyEntryProcessors(val);
@@ -1811,15 +1826,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public IgniteInternalFuture<GridCacheReturn<V>> putAllAsync(
-        GridCacheContext<K, V> cacheCtx,
+    @Override public <K, V> IgniteInternalFuture<GridCacheReturn<CacheObject>> putAllAsync(
+        GridCacheContext cacheCtx,
         Map<? extends K, ? extends V> map,
         boolean retval,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         long ttl,
         IgnitePredicate<Cache.Entry<K, V>>[] filter
     ) {
-        return (IgniteInternalFuture<GridCacheReturn<V>>)putAllAsync0(cacheCtx,
+        return (IgniteInternalFuture<GridCacheReturn<CacheObject>>)putAllAsync0(cacheCtx,
             map,
             null,
             null,
@@ -1830,8 +1845,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> putAllDrAsync(
-        GridCacheContext<K, V> cacheCtx,
+    @Override public <K, V> IgniteInternalFuture<?> putAllDrAsync(
+        GridCacheContext cacheCtx,
         Map<? extends K, GridCacheDrInfo<V>> drMap
     ) {
         return putAllAsync0(cacheCtx,
@@ -1846,8 +1861,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <T> IgniteInternalFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
-        GridCacheContext<K, V> cacheCtx,
+    @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn<Map<K, EntryProcessorResult<T>>>> invokeAsync(
+        GridCacheContext cacheCtx,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
         Object... invokeArgs
     ) {
@@ -1862,8 +1877,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> removeAllDrAsync(
-        GridCacheContext<K, V> cacheCtx,
+    @Override public <K> IgniteInternalFuture<?> removeAllDrAsync(
+        GridCacheContext cacheCtx,
         Map<? extends K, GridCacheVersion> drMap
     ) {
         return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
@@ -1877,7 +1892,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @return {@code True} if passed or pessimistic.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean filter(GridCacheEntryEx<K, V> cached,
+    private <K, V> boolean filter(GridCacheEntryEx cached,
         IgnitePredicate<Cache.Entry<K, V>>[] filter) throws IgniteCheckedException {
         return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
     }
@@ -1902,10 +1917,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param drRmvMap DR remove map (optional).
      * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
      */
-    protected IgniteInternalFuture<Set<K>> enlistWrite(
-        final GridCacheContext<K, V> cacheCtx,
+    protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+        final GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         @Nullable ExpiryPolicy expiryPlc,
         boolean implicit,
         @Nullable Map<? extends K, ? extends V> lookup,
@@ -1913,9 +1928,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         @Nullable Object[] invokeArgs,
         boolean retval,
         boolean lockOnly,
-        IgnitePredicate<Cache.Entry<K, V>>[] filter,
-        final GridCacheReturn<V> ret,
-        Collection<K> enlisted,
+        IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
+        final GridCacheReturn<CacheObject> ret,
+        Collection<KeyCacheObject> enlisted,
         @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
         @Nullable Map<? extends K, GridCacheVersion> drRmvMap
     ) {
@@ -1929,11 +1944,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             return new GridFinishedFuture<>(cctx.kernalContext(), e);
         }
 
-        Set<K> skipped = null;
+        Set<KeyCacheObject> skipped = null;
 
         boolean rmv = lookup == null && invokeMap == null;
 
-        Set<K> missedForLoad = null;
+        Set<KeyCacheObject> missedForLoad = null;
 
         try {
             // Set transform flag for transaction.
@@ -1984,17 +1999,16 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     throw new NullPointerException("Null value.");
                 }
 
-                if (cacheCtx.portableEnabled())
-                    key = (K)cacheCtx.marshalToPortable(key);
+                KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
 
-                IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+                IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
 
-                IgniteTxEntry<K, V> txEntry = entry(txKey);
+                IgniteTxEntry txEntry = entry(txKey);
 
                 // First time access.
                 if (txEntry == null) {
                     while (true) {
-                        GridCacheEntryEx<K, V> entry;
+                        GridCacheEntryEx entry;
 
                         if (cached != null) {
                             entry = cached;
@@ -2016,7 +2030,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                     ", threadId=" + threadId +
                                     ", locNodeId=" + cctx.localNodeId() + ']');
 
-                            V old = null;
+                            CacheObject old = null;
 
                             boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
 
@@ -2051,7 +2065,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
 
                             if (!filter(entry, filter)) {
-                                skipped = skip(skipped, key);
+                                skipped = skip(skipped, cacheKey);
 
                                 ret.set(old, false);
 
@@ -2064,7 +2078,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         null,
                                         entry,
                                         null,
-                                        CU.<K, V>empty(),
+                                        CU.empty(),
                                         false,
                                         -1L,
                                         -1L,
@@ -2083,7 +2097,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                 entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
 
                             txEntry = addEntry(op,
-                                val,
+                                cacheCtx.toCacheObject(val),
                                 entryProcessor,
                                 invokeArgs,
                                 entry,
@@ -2100,7 +2114,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             if (groupLock() && !lockOnly)
                                 txEntry.groupLockEntry(true);
 
-                            enlisted.add(key);
+                            enlisted.add(cacheKey);
 
                             if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) {
                                 txEntry.markValid();
@@ -2112,7 +2126,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         if (missedForLoad == null)
                                             missedForLoad = new HashSet<>();
 
-                                        missedForLoad.add(key);
+                                        missedForLoad.add(cacheKey);
                                     }
                                     else {
                                         assert !transform;
@@ -2156,15 +2170,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
                             "transaction after transform closure is applied): " + key);
 
-                    GridCacheEntryEx<K, V> entry = txEntry.cached();
+                    GridCacheEntryEx entry = txEntry.cached();
 
-                    V v = txEntry.value();
+                    CacheObject v = txEntry.value();
 
                     boolean del = txEntry.op() == DELETE && rmv;
 
                     if (!del) {
                         if (!filter(entry, filter)) {
-                            skipped = skip(skipped, key);
+                            skipped = skip(skipped, cacheKey);
 
                             ret.set(v, false);
 
@@ -2175,7 +2189,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             v != null ? UPDATE : CREATE;
 
                         txEntry = addEntry(op,
-                            val,
+                            cacheCtx.toCacheObject(val),
                             entryProcessor,
                             invokeArgs,
                             entry,
@@ -2186,7 +2200,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             drExpireTime,
                             drVer);
 
-                        enlisted.add(key);
+                        enlisted.add(cacheKey);
 
                         if (txEntry.op() == TRANSFORM)
                             addInvokeResult(txEntry, txEntry.value(), ret);
@@ -2215,27 +2229,29 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 missedForLoad,
                 deserializePortables(cacheCtx),
                 /*skip values*/false,
-                new CI2<K, V>() {
-                    @Override public void apply(K key, V val) {
+                new CI2<KeyCacheObject, Object>() {
+                    @Override public void apply(KeyCacheObject key, Object val) {
                         if (log.isDebugEnabled())
                             log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
 
-                        IgniteTxEntry<K, V> e = entry(new IgniteTxKey<>(key, cacheCtx.cacheId()));
+                        IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
 
                         assert e != null;
 
+                        CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
                         if (e.op() == TRANSFORM)
-                            addInvokeResult(e, val, ret);
+                            addInvokeResult(e, cacheVal, ret);
                         else
-                            ret.set(val, true);
+                            ret.set(cacheVal, true);
                     }
                 });
 
             return new GridEmbeddedFuture<>(
                 cctx.kernalContext(),
                 fut,
-                new C2<Boolean, Exception, Set<K>>() {
-                    @Override public Set<K> apply(Boolean b, Exception e) {
+                new C2<Boolean, Exception, Set<KeyCacheObject>>() {
+                    @Override public Set<KeyCacheObject> apply(Boolean b, Exception e) {
                         if (e != null)
                             throw new GridClosureException(e);
 
@@ -2265,27 +2281,27 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param computeInvoke If {@code true} computes return value for invoke operation.
      */
     @SuppressWarnings("unchecked")
-    protected Set<K> postLockWrite(
-        GridCacheContext<K, V> cacheCtx,
-        Iterable<? extends K> keys,
-        Set<K> failed,
+    protected Set<KeyCacheObject> postLockWrite(
+        GridCacheContext cacheCtx,
+        Iterable<KeyCacheObject> keys,
+        Set<KeyCacheObject> failed,
         GridCacheReturn ret,
         boolean rmv,
         boolean retval,
         boolean read,
         long accessTtl,
-        IgnitePredicate<Cache.Entry<K, V>>[] filter,
+        IgnitePredicate<Cache.Entry<Object, Object>>[] filter,
         boolean computeInvoke
     ) throws IgniteCheckedException {
-        for (K k : keys) {
-            IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
+        for (KeyCacheObject k : keys) {
+            IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
 
             if (txEntry == null)
                 throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
                     "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
 
             while (true) {
-                GridCacheEntryEx<K, V> cached = txEntry.cached();
+                GridCacheEntryEx cached = txEntry.cached();
 
                 try {
                     assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
@@ -2295,7 +2311,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     if (log.isDebugEnabled())
                         log.debug("Post lock write entry: " + cached);
 
-                    V v = txEntry.previousValue();
+                    CacheObject v = txEntry.previousValue();
                     boolean hasPrevVal = txEntry.hasPreviousValue();
 
                     if (onePhaseCommit())
@@ -2343,7 +2359,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             ret.value(v);
                     }
 
-                    boolean pass = cacheCtx.isAll(cached, filter);
+                    boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached.wrapLazyValue(), filter);;
 
                     // For remove operation we return true only if we are removing s/t,
                     // i.e. cached value is not null.
@@ -2364,8 +2380,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         failed = skip(failed, k);
 
                         // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
-                        txEntry.setAndMarkValid(txEntry.previousOperation(), (V)ret.value());
-                        txEntry.filters(CU.<K, V>empty());
+                        txEntry.setAndMarkValid(txEntry.previousOperation(), (CacheObject)ret.value());
+                        txEntry.filters(CU.empty());
                         txEntry.filtersSet(false);
 
                         updateTtl = filter != cacheCtx.noPeekArray();
@@ -2389,7 +2405,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
 
-                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes());
+                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()), null);
                 }
             }
         }
@@ -2402,17 +2418,23 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
     /**
      * @param txEntry Entry.
-     * @param val Value.
+     * @param cacheVal Value.
      * @param ret Return value to update.
      */
-    private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn<?> ret) {
+    private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn<?> ret) {
+        GridCacheContext ctx = txEntry.context();
+
+        Object keyVal = txEntry.key().value(ctx);
+        Object val = CU.value(cacheVal, ctx);
+
         try {
             Object res = null;
 
-            for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) {
-                CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.context(), txEntry.key(), val);
+            for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
+                CacheInvokeEntry<Object, Object> invokeEntry =
+                    new CacheInvokeEntry<>(txEntry.context(), keyVal, val);
 
-                EntryProcessor<K, V, ?> entryProcessor = t.get1();
+                EntryProcessor<Object, Object, ?> entryProcessor = t.get1();
 
                 res = entryProcessor.process(invokeEntry, t.get2());
 
@@ -2420,10 +2442,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             }
 
             if (res != null)
-                ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res));
+                ret.addEntryProcessResult(keyVal, new CacheInvokeResult<>(res));
         }
         catch (Exception e) {
-            ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e));
+            ret.addEntryProcessResult(keyVal, new CacheInvokeResult(e));
         }
     }
 
@@ -2442,20 +2464,22 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @return Operation future.
      */
     @SuppressWarnings("unchecked")
-    private IgniteInternalFuture putAllAsync0(
-        final GridCacheContext<K, V> cacheCtx,
+    private <K, V> IgniteInternalFuture putAllAsync0(
+        final GridCacheContext cacheCtx,
         @Nullable Map<? extends K, ? extends V> map,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
         final boolean retval,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter
     ) {
         assert filter == null || invokeMap == null;
 
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
 
+        final IgnitePredicate<Cache.Entry<Object, Object>>[] filter0 = ((IgnitePredicate[])filter);
+
         if (retval)
             needReturnValue(true);
 
@@ -2474,42 +2498,43 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             invokeMap0 = null;
         }
-        else if (cacheCtx.portableEnabled()) {
-            if (map != null) {
-                map0 = U.newHashMap(map.size());
-
-                try {
-                    for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
-                        K key = (K)cacheCtx.marshalToPortable(e.getKey());
-                        V val = (V)cacheCtx.marshalToPortable(e.getValue());
-
-                        map0.put(key, val);
-                    }
-                }
-                catch (IgniteException e) {
-                    return new GridFinishedFuture<>(cctx.kernalContext(), e);
-                }
-            }
-            else
-                map0 = null;
-
-            if (invokeMap != null) {
-                invokeMap0 = U.newHashMap(invokeMap.size());
-
-                try {
-                    for (Map.Entry<? extends K, ? extends EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) {
-                        K key = (K)cacheCtx.marshalToPortable(e.getKey());
-
-                        invokeMap0.put(key, e.getValue());
-                    }
-                }
-                catch (IgniteException e) {
-                    return new GridFinishedFuture<>(cctx.kernalContext(), e);
-                }
-            }
-            else
-                invokeMap0 = null;
-        }
+// TODO IGNITE-51.
+//        else if (cacheCtx.portableEnabled()) {
+//            if (map != null) {
+//                map0 = U.newHashMap(map.size());
+//
+//                try {
+//                    for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+//                        K key = (K)cacheCtx.marshalToPortable(e.getKey());
+//                        V val = (V)cacheCtx.marshalToPortable(e.getValue());
+//
+//                        map0.put(key, val);
+//                    }
+//                }
+//                catch (IgniteException e) {
+//                    return new GridFinishedFuture<>(cctx.kernalContext(), e);
+//                }
+//            }
+//            else
+//                map0 = null;
+//
+//            if (invokeMap != null) {
+//                invokeMap0 = U.newHashMap(invokeMap.size());
+//
+//                try {
+//                    for (Map.Entry<? extends K, ? extends EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) {
+//                        K key = (K)cacheCtx.marshalToPortable(e.getKey());
+//
+//                        invokeMap0.put(key, e.getValue());
+//                    }
+//                }
+//                catch (IgniteException e) {
+//                    return new GridFinishedFuture<>(cctx.kernalContext(), e);
+//                }
+//            }
+//            else
+//                invokeMap0 = null;
+//        }
         else {
             map0 = (Map<K, V>)map;
             invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
@@ -2531,7 +2556,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
         init();
 
-        final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
+        final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(false);
 
         if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
             if (implicit())
@@ -2548,11 +2573,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         try {
             Set<? extends K> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
 
-            Collection<K> enlisted = new ArrayList<>();
+            Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
             GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
 
-            final IgniteInternalFuture<Set<K>> loadFut = enlistWrite(
+            final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
                 cacheCtx,
                 keySet,
                 cached,
@@ -2563,7 +2588,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 invokeArgs,
                 retval,
                 false,
-                filter,
+                filter0,
                 ret,
                 enlisted,
                 drMap,
@@ -2571,16 +2596,19 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             if (pessimistic() && !groupLock()) {
                 // Loose all skipped.
-                final Set<K> loaded = loadFut.get();
+                final Set<KeyCacheObject> loaded = loadFut.get();
 
-                final Collection<K> keys;
+                final Collection<KeyCacheObject> keys;
 
                 if (keySet != null ) {
                     keys = new ArrayList<>(keySet.size());
 
+                    // TODO IGNITE-51.
                     for (K k : keySet) {
-                        if (k != null && (loaded == null || !loaded.contains(k)))
-                            keys.add(k);
+                        KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(k);
+
+                        if (k != null && (loaded == null || !loaded.contains(cacheKey)))
+                            keys.add(cacheKey);
                     }
                 }
                 else
@@ -2599,8 +2627,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     -1L,
                     CU.<K, V>empty());
 
-                PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
-                    @Override public GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
+                PLC1<GridCacheReturn<CacheObject>> plc1 = new PLC1<GridCacheReturn<CacheObject>>(ret) {
+                    @Override public GridCacheReturn<CacheObject> postLock(GridCacheReturn<CacheObject> ret)
+                        throws IgniteCheckedException
+                    {
                         if (log.isDebugEnabled())
                             log.debug("Acquired transaction lock for put on keys: " + keys);
 
@@ -2612,7 +2642,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             retval,
                             /*read*/false,
                             -1L,
-                            filter,
+                            filter0,
                             /*computeInvoke*/true);
 
                         return ret;
@@ -2654,8 +2684,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         return new GridFinishedFutureEx<>(new GridCacheReturn<V>(), e);
                     }
 
-                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<V>>() {
-                        @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException {
+                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<CacheObject>>() {
+                        @Override public GridCacheReturn<CacheObject> applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException {
                             txFut.get();
 
                             return implicitRes;
@@ -2663,8 +2693,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     });
                 }
                 else
-                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<K>>, GridCacheReturn<V>>() {
-                        @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<Set<K>> f) throws IgniteCheckedException {
+                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn<CacheObject>>() {
+                        @Override public GridCacheReturn<CacheObject> applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
                             f.get();
 
                             return ret;
@@ -2680,10 +2710,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<GridCacheReturn<V>> removeAllAsync(
-        GridCacheContext<K, V> cacheCtx,
+    @Override public <K, V> IgniteInternalFuture<GridCacheReturn<CacheObject>> removeAllAsync(
+        GridCacheContext cacheCtx,
         Collection<? extends K> keys,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         boolean retval,
         IgnitePredicate<Cache.Entry<K, V>>[] filter
     ) {
@@ -2699,15 +2729,18 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param filter Filter.
      * @return Future for asynchronous remove.
      */
-    private IgniteInternalFuture<GridCacheReturn<V>> removeAllAsync0(
-        final GridCacheContext<K, V> cacheCtx,
+    @SuppressWarnings("unchecked")
+    private <K, V> IgniteInternalFuture<GridCacheReturn<CacheObject>> removeAllAsync0(
+        final GridCacheContext cacheCtx,
         @Nullable final Collection<? extends K> keys,
         @Nullable Map<? extends  K, GridCacheVersion> drMap,
-        @Nullable GridCacheEntryEx<K, V> cached,
+        @Nullable GridCacheEntryEx cached,
         final boolean retval,
         @Nullable final IgnitePredicate<Cache.Entry<K, V>>[] filter) {
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
 
+        final IgnitePredicate<Cache.Entry<Object, Object>>[] filter0 = ((IgnitePredicate[])filter);
+
         if (retval)
             needReturnValue(true);
 
@@ -2718,23 +2751,24 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
             keys0 = drMap.keySet();
         }
-        else if (cacheCtx.portableEnabled()) {
-            try {
-                if (keys != null) {
-                    Collection<K> pKeys = new ArrayList<>(keys.size());
-
-                    for (K key : keys)
-                        pKeys.add((K)cacheCtx.marshalToPortable(key));
-
-                    keys0 = pKeys;
-                }
-                else
-                    keys0 = null;
-            }
-            catch (IgniteException e) {
-                return new GridFinishedFuture<>(cctx.kernalContext(), e);
-            }
-        }
+// TODO IGNITE-51.
+//        else if (cacheCtx.portableEnabled()) {
+//            try {
+//                if (keys != null) {
+//                    Collection<K> pKeys = new ArrayList<>(keys.size());
+//
+//                    for (K key : keys)
+//                        pKeys.add((K)cacheCtx.marshalToPortable(key));
+//
+//                    keys0 = pKeys;
+//                }
+//                else
+//                    keys0 = null;
+//            }
+//            catch (IgniteException e) {
+//                return new GridFinishedFuture<>(cctx.kernalContext(), e);
+//            }
+//        }
         else
             keys0 = keys;
 
@@ -2752,7 +2786,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             return new GridFinishedFuture<>(cctx.kernalContext(), e);
         }
 
-        final GridCacheReturn<V> ret = new GridCacheReturn<>(false);
+        final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(false);
 
         if (F.isEmpty(keys0)) {
             if (implicit()) {
@@ -2770,7 +2804,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         init();
 
         try {
-            Collection<K> enlisted = new ArrayList<>();
+            Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
             ExpiryPolicy plc;
 
@@ -2782,7 +2816,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             else
                 plc = null;
 
-            final IgniteInternalFuture<Set<K>> loadFut = enlistWrite(
+            final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
                 cacheCtx,
                 keys0,
                 /** cached entry */null,
@@ -2793,7 +2827,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                 /** invoke arguments */null,
                 retval,
                 /** lock only */false,
-                filter,
+                filter0,
                 ret,
                 enlisted,
                 null,
@@ -2808,7 +2842,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             // to be rolled back.
             if (pessimistic() && !groupLock()) {
                 // Loose all skipped.
-                final Collection<? extends K> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+                final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
 
                 if (log.isDebugEnabled())
                     log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
@@ -2823,8 +2857,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     -1L,
                     CU.<K, V>empty());
 
-                PLC1<GridCacheReturn<V>> plc1 = new PLC1<GridCacheReturn<V>>(ret) {
-                    @Override protected GridCacheReturn<V> postLock(GridCacheReturn<V> ret) throws IgniteCheckedException {
+                PLC1<GridCacheReturn<CacheObject>> plc1 = new PLC1<GridCacheReturn<CacheObject>>(ret) {
+                    @Override protected GridCacheReturn<CacheObject> postLock(GridCacheReturn<CacheObject> ret)
+                        throws IgniteCheckedException
+                    {
                         if (log.isDebugEnabled())
                             log.debug("Acquired transaction lock for remove on keys: " + passedKeys);
 
@@ -2836,7 +2872,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             retval,
                             /*read*/false,
                             -1L,
-                            filter,
+                            filter0,
                             /*computeInvoke*/false);
 
                         return ret;
@@ -2871,8 +2907,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     // with prepare response, if required.
                     assert loadFut.isDone();
 
-                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<V>>() {
-                        @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException {
+                    return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<CacheObject>>() {
+                        @Override public GridCacheReturn<CacheObject> applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+                            throws IgniteCheckedException
+                        {
                             txFut.get();
 
                             return implicitRes;
@@ -2880,8 +2918,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                     });
                 }
                 else
-                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<K>>, GridCacheReturn<V>>() {
-                        @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<Set<K>> f) throws IgniteCheckedException {
+                    return loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn<CacheObject>>() {
+                        @Override public GridCacheReturn<CacheObject> applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
+                            throws IgniteCheckedException
+                        {
                             f.get();
 
                             return ret;
@@ -2902,8 +2942,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param cacheCtx Cache context.
      * @return {@code True} if portables should be deserialized, {@code false} otherwise.
      */
-    private boolean deserializePortables(GridCacheContext<K, V> cacheCtx) {
-        GridCacheProjectionImpl<K, V> prj = cacheCtx.projectionPerCall();
+    private boolean deserializePortables(GridCacheContext cacheCtx) {
+        GridCacheProjectionImpl prj = cacheCtx.projectionPerCall();
 
         return prj == null || prj.deserializePortables();
     }
@@ -2912,7 +2952,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * Adds key mapping to transaction.
      * @param keys Keys to add.
      */
-    protected void addGroupTxMapping(Collection<IgniteTxKey<K>> keys) {
+    protected void addGroupTxMapping(Collection<IgniteTxKey> keys) {
         // No-op. This method is overriden in transactions that store key to remote node mapping
         // for commit.
     }
@@ -2924,7 +2964,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      * @param keys Keys to check.
      * @throws IgniteCheckedException If sanity check failed.
      */
-    private void groupLockSanityCheck(GridCacheContext<K, V> cacheCtx, Iterable<? extends K> keys) throws IgniteCheckedException {
+    private <K> void groupLockSanityCheck(GridCacheContext cacheCtx, Iterable<? extends K> keys)
+        throws IgniteCheckedException
+    {
         if (groupLock() && cctx.kernalContext().config().isCacheSanityCheckEnabled()) {
             // Note that affinity is called without mapper on purpose.
             int affinityPart = cacheCtx.config().getAffinity().partition(grpLockKey.key());
@@ -2939,8 +2981,10 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                             ", part=" + part + ", groupLockKey=" + grpLockKey + ']');
                 }
                 else {
-                    IgniteTxKey affinityKey = cacheCtx.txKey(
-                        (K)cacheCtx.config().getAffinityMapper().affinityKey(key));
+                    KeyCacheObject cacheKey =
+                        cacheCtx.toCacheKeyObject(cacheCtx.config().getAffinityMapper().affinityKey(key));
+
+                    IgniteTxKey affinityKey = cacheCtx.txKey(cacheKey);
 
                     if (!grpLockKey.equals(affinityKey))
                         throw new IgniteCheckedException("Failed to enlist key into group-lock transaction (affinity key was " +
@@ -2955,17 +2999,17 @@ public abstract class IgniteTxLoc

<TRUNCATED>

Mime
View raw message