Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0657810F32 for ; Mon, 2 Mar 2015 13:30:45 +0000 (UTC) Received: (qmail 81909 invoked by uid 500); 2 Mar 2015 13:30:29 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 81876 invoked by uid 500); 2 Mar 2015 13:30:29 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 81867 invoked by uid 99); 2 Mar 2015 13:30:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Mar 2015 13:30:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 02 Mar 2015 13:29:56 +0000 Received: (qmail 81327 invoked by uid 99); 2 Mar 2015 13:29:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Mar 2015 13:29:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB9F1E0B41; Mon, 2 Mar 2015 13:29:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 02 Mar 2015 13:29:53 -0000 Message-Id: <3af82e9fb8b14749883eb278e4005af0@git.apache.org> In-Reply-To: <9c57cc6004fb4f63b10d29c033b2f881@git.apache.org> References: <9c57cc6004fb4f63b10d29c033b2f881@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] incubator-ignite git commit: # ignite-51 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-51 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a040311d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a040311d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a040311d Branch: refs/heads/ignite-51 Commit: a040311d2eb88d4d3e49aa9c885007c85a6a442b Parents: bdb0f55 Author: sboikov Authored: Mon Mar 2 10:13:46 2015 +0300 Committer: sboikov Committed: Mon Mar 2 16:03:57 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheObjectAdapter.java | 1 + .../processors/cache/GridCacheAdapter.java | 46 ++--- .../processors/cache/GridCacheContext.java | 45 ++-- .../processors/cache/GridCacheEntryEx.java | 15 +- .../processors/cache/GridCacheMapEntry.java | 206 +++++++++---------- .../processors/cache/GridCacheMessage.java | 28 ++- .../processors/cache/GridCacheSwapManager.java | 44 ++-- .../processors/cache/KeyCacheObjectImpl.java | 5 +- .../cache/UserKeyCacheObjectImpl.java | 31 ++- .../GridDistributedCacheAdapter.java | 14 +- .../GridDistributedTxPrepareRequest.java | 110 +++++++--- .../GridDistributedTxRemoteAdapter.java | 25 ++- .../distributed/dht/GridDhtCacheAdapter.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 30 ++- .../cache/distributed/dht/GridDhtGetFuture.java | 29 ++- .../distributed/dht/GridDhtLockFuture.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 14 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 38 +--- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../dht/GridDhtTxPrepareRequest.java | 50 ++--- .../dht/GridPartitionedGetFuture.java | 20 +- .../dht/atomic/GridDhtAtomicCache.java | 42 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 13 +- .../dht/colocated/GridDhtColocatedCache.java | 68 +++--- .../colocated/GridDhtColocatedLockFuture.java | 16 +- .../colocated/GridDhtDetachedCacheEntry.java | 12 +- .../dht/preloader/GridDhtForceKeysFuture.java | 1 - .../preloader/GridDhtPartitionDemandPool.java | 1 - .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 29 +-- .../distributed/near/GridNearLockFuture.java | 4 +- .../near/GridNearTransactionalCache.java | 13 +- .../cache/distributed/near/GridNearTxLocal.java | 126 ++++++------ .../near/GridNearTxPrepareFuture.java | 4 +- .../near/GridNearTxPrepareRequest.java | 42 ++-- .../near/GridNearTxPrepareResponse.java | 22 +- .../processors/cache/local/GridLocalCache.java | 10 +- .../cache/local/GridLocalLockFuture.java | 2 +- .../local/atomic/GridLocalAtomicCache.java | 14 +- .../cache/transactions/IgniteTxAdapter.java | 18 +- .../cache/transactions/IgniteTxEntry.java | 3 +- .../transactions/IgniteTxLocalAdapter.java | 170 ++++++++------- .../cache/transactions/IgniteTxLocalEx.java | 7 +- .../dataload/IgniteDataLoaderImpl.java | 9 +- .../portable/GridPortableProcessor.java | 2 +- .../portable/os/GridOsPortableProcessor.java | 7 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 12 +- .../processors/cache/GridCacheTestEntryEx.java | 5 +- .../GridCacheReplicatedFullApiSelfTest.java | 1 + 50 files changed, 725 insertions(+), 695 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 8edc6c8..5cf3521 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -43,6 +43,7 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable } /** + * @param ctx Context. * @return {@code True} need to copy value returned to user. */ protected boolean needCopy(GridCacheContext ctx) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7cb0b1f..27d0065 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -558,7 +558,7 @@ public abstract class GridCacheAdapter implements GridCache, * @return Locks future. */ public abstract IgniteInternalFuture txLockAsync( - Collection keys, + Collection keys, long timeout, IgniteTxLocalEx tx, boolean isRead, @@ -2233,16 +2233,7 @@ public abstract class GridCacheAdapter implements GridCache, if (keyCheck) validateCacheKeys(keys); - Collection keys0 = F.viewReadOnly(keys, new C1() { - @Override public KeyCacheObject apply(K key) { - if (key == null) - throw new NullPointerException("Null key."); - - return ctx.toCacheKeyObject(key); - } - }); - - return getAllAsync0(keys0, + return getAllAsync0(ctx.cacheKeysView(keys), readThrough, checkTx, subjId, @@ -2263,7 +2254,7 @@ public abstract class GridCacheAdapter implements GridCache, * @param expiry Expiry policy. * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects - * @return + * @return Future. */ public IgniteInternalFuture> getAllAsync0(@Nullable final Collection keys, boolean readThrough, @@ -2502,13 +2493,11 @@ public abstract class GridCacheAdapter implements GridCache, } } else { - return null; -// TODO IGNITE-51. -// return asyncOp(tx, new AsyncOp>(keys) { -// @Override public IgniteInternalFuture> op(IgniteTxLocalAdapter tx) { -// return ctx.wrapCloneMap(tx.getAllAsync(ctx, keys, cached0, deserializePortable, skipVals)); -// } -// }); + return asyncOp(tx, new AsyncOp>(keys) { + @Override public IgniteInternalFuture> op(IgniteTxLocalAdapter tx) { + return tx.getAllAsync(ctx, keys, null, deserializePortable, skipVals, false); + } + }); } } @@ -4166,7 +4155,7 @@ public abstract class GridCacheAdapter implements GridCache, GridCacheEntryEx entry = entryEx(key, false); try { - entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, + entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, replicate ? DR_LOAD : DR_NONE); } catch (IgniteCheckedException e) { @@ -4615,8 +4604,12 @@ public abstract class GridCacheAdapter implements GridCache, GridCacheEntryEx entry = peekEx(cacheKey); try { - if (entry == null || entry.obsolete() || entry.isNewLocked()) + if (entry == null || entry.obsolete() || entry.isNewLocked()) { + if (entry != null) + cacheKey = entry.key(); + unswap.add(cacheKey); + } } catch (GridCacheEntryRemovedException ignored) { // No-op. @@ -5856,7 +5849,7 @@ public abstract class GridCacheAdapter implements GridCache, private final boolean single; /** Keys. */ - private final Collection keys; + private final Collection keys; /** * @param key Key. @@ -5870,7 +5863,7 @@ public abstract class GridCacheAdapter implements GridCache, /** * @param keys Keys involved. */ - protected AsyncOp(Collection keys) { + protected AsyncOp(Collection keys) { this.keys = keys; single = keys.size() == 1; @@ -5884,13 +5877,6 @@ public abstract class GridCacheAdapter implements GridCache, } /** - * @return Keys. - */ - Collection keys() { - return keys; - } - - /** * @param tx Transaction. * @return Operation return value. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3b4e3df..3cbe8a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1322,26 +1322,6 @@ public class GridCacheContext implements Externalizable { } /** - * @param f Target future. - * @return Wrapped future that is aware of cloning behaviour. - */ - public IgniteInternalFuture> wrapCloneMap(IgniteInternalFuture> f) { - if (!hasFlag(CLONE)) - return f; - - return f.chain(new CX1>, Map>() { - @Override public Map applyx(IgniteInternalFuture> f) throws IgniteCheckedException { - Map map = new GridLeanMap<>(); - - for (Map.Entry e : f.get().entrySet()) - map.put(e.getKey(), cloneValue(e.getValue())); - - return map; - } - }); - } - - /** * Creates Runnable that can be executed safely in a different thread inheriting * the same thread local projection as for the current thread. If no projection is * set for current thread then there's no need to create new object and method simply @@ -1780,7 +1760,15 @@ public class GridCacheContext implements Externalizable { * @return Cache object. */ @Nullable public CacheObject toCacheObject(@Nullable Object obj) { - return portable().toCacheObject(cacheObjCtx, obj); + return portable().toCacheObject(cacheObjCtx, obj, null); + } + + /** + * @param obj Object. + * @return Cache object. + */ + @Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) { + return portable().toCacheObject(cacheObjCtx, obj, bytes); } /** @@ -1946,6 +1934,21 @@ public class GridCacheContext implements Externalizable { mgr.printMemoryStats(); } + /** + * @param keys Keys. + * @return Co + */ + public Collection cacheKeysView(Collection keys) { + return F.viewReadOnly(keys, new C1() { + @Override public KeyCacheObject apply(Object key) { + if (key == null) + throw new NullPointerException("Null key."); + + return toCacheKeyObject(key); + } + }); + }; + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, gridName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 8ed070e..30df242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -244,7 +244,6 @@ public interface GridCacheEntryEx { * @return Swap entry if this entry was marked obsolete, {@code null} if entry was not evicted. * @throws IgniteCheckedException If failed. */ - // TODO IGNITE-51 public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException; /** @@ -333,7 +332,6 @@ public interface GridCacheEntryEx { * @param evtNodeId ID of node responsible for this change. * @param affNodeId Partitioned node iD. * @param val Value to set. - * @param valBytes Value bytes to set. * @param writeThrough If {@code true} then persist to storage. * @param retval {@code True} if value should be returned (and unmarshalled if needed). * @param ttl Time to live. @@ -356,7 +354,6 @@ public interface GridCacheEntryEx { UUID evtNodeId, UUID affNodeId, @Nullable CacheObject val, - @Nullable byte[] valBytes, boolean writeThrough, boolean retval, long ttl, @@ -412,7 +409,6 @@ public interface GridCacheEntryEx { * @param affNodeId Affinity node ID. * @param op Update operation. * @param val Value. Type depends on operation. - * @param valBytes Value bytes. Can be non-null only if operation is UPDATE. * @param invokeArgs Optional arguments for entry processor. * @param writeThrough Write through flag. * @param retval Return value flag. @@ -445,7 +441,6 @@ public interface GridCacheEntryEx { UUID affNodeId, GridCacheOperation op, @Nullable Object val, - @Nullable byte[] valBytes, @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @@ -485,7 +480,7 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If update failed. * @throws GridCacheEntryRemovedException If entry is obsolete. */ - public GridTuple3> innerUpdateLocal( + public GridTuple3> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, @@ -501,7 +496,6 @@ public interface GridCacheEntryEx { String taskName ) throws IgniteCheckedException, GridCacheEntryRemovedException; - /** * Marks entry as obsolete and, if possible or required, removes it * from swap storage. @@ -662,7 +656,6 @@ public interface GridCacheEntryEx { * Sets new value if current version is 0 * * @param val New value. - * @param valBytes Value bytes. * @param ver Version to use. * @param ttl Time to live. * @param expireTime Expiration time. @@ -674,7 +667,6 @@ public interface GridCacheEntryEx { * @throws GridCacheEntryRemovedException If entry was removed. */ public boolean initialValue(CacheObject val, - @Nullable byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime, @@ -692,7 +684,6 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException In case of error. * @throws GridCacheEntryRemovedException If entry was removed. */ - // TODO IGNITE-51 public boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws IgniteCheckedException, GridCacheEntryRemovedException; @@ -715,7 +706,9 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException If index could not be updated. * @throws GridCacheEntryRemovedException If entry was removed. */ - public boolean versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer) + public boolean versionedValue(CacheObject val, + @Nullable GridCacheVersion curVer, + @Nullable GridCacheVersion newVer) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 74f337b..77a7343 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -120,7 +120,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { private final int hash; /** Off-heap value pointer. */ - private long valPtr; + protected long valPtr; /** Extras */ @GridToStringInclude @@ -162,7 +162,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); synchronized (this) { - value(val, null); + value(val); } next(hdrId, next); @@ -181,9 +181,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * Sets entry value. If off-heap value storage is enabled, will serialize value to off-heap. * * @param val Value to store. - * @param valBytes Value bytes to store. */ - protected void value(@Nullable CacheObject val, @Nullable byte[] valBytes) { + protected void value(@Nullable CacheObject val) { assert Thread.holdsLock(this); // In case we deal with IGFS cache, count updated data @@ -205,17 +204,18 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { else { try { if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { - if (val != null || valBytes != null) { - if (val == null) - val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); + Object val0 = null; - if (val != null) - cctx.gridDeploy().deploy(val.getClass(), val.getClass().getClassLoader()); + if (val != null) { + val0 = val.value(cctx, false); + + if (val0 != null) + cctx.gridDeploy().deploy(val0.getClass(), val0.getClass().getClassLoader()); } - if (U.p2pLoader(val)) { + if (U.p2pLoader(val0)) { cctx.deploy().addDeploymentContext( - new GridDeploymentInfoBean((GridDeploymentInfo)val.getClass().getClassLoader())); + new GridDeploymentInfoBean((GridDeploymentInfo)val0.getClass().getClassLoader())); } } @@ -527,7 +527,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); // Set unswapped value. - update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version()); + update(val, e.expireTime(), e.ttl(), e.version()); // Must update valPtr again since update() will reset it. if (cctx.offheapTiered() && e.offheapPointer() > 0) @@ -598,7 +598,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** * @return Value bytes and flag indicating whether value is byte array. */ - private IgniteBiTuple valueBytes0() { + protected IgniteBiTuple valueBytes0() { if (valPtr != 0) { assert isOffHeapValuesOnly() || cctx.offheapTiered(); @@ -784,7 +784,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (expired) { expiredVal = val; - value(null, null); + value(null); } if (old == null && !hasOldBytes) { @@ -879,12 +879,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (loadedFromStore) // Update indexes before actual write to entry. - updateIndex(ret, null, expTime, nextVer, prevVal); + updateIndex(ret, expTime, nextVer, prevVal); boolean hadValPtr = valPtr != 0; // Don't change version for read-through. - update(ret, null, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer); if (hadValPtr && cctx.offheapTiered()) cctx.swap().removeOffheap(key); @@ -956,7 +956,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Update indexes. if (ret != null) { - updateIndex(ret, null, expTime, nextVer, old); + updateIndex(ret, expTime, nextVer, old); if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked()) deletedUnlocked(false); @@ -968,7 +968,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { deletedUnlocked(true); } - update(ret, null, expTime, ttl, nextVer); + update(ret, expTime, ttl, nextVer); touch = true; @@ -1000,7 +1000,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { UUID evtNodeId, UUID affNodeId, CacheObject val, - @Nullable byte[] valBytes, boolean writeThrough, boolean retval, long ttl, @@ -1065,11 +1064,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (interceptorVal == null) return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old)); - else if (interceptorVal != val0) { + else if (interceptorVal != val0) val = cctx.toCacheKeyObject(cctx.unwrapTemporary(interceptorVal)); - - valBytes = null; - } } // Determine new ttl and expire time. @@ -1097,16 +1093,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - if (val != null || valBytes != null) { - updateIndex(val, valBytes, expireTime, newVer, old); + if (val != null) { + updateIndex(val, expireTime, newVer, old); if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); } - update(val, valBytes, expireTime, ttl, newVer); + update(val, expireTime, ttl, newVer); - drReplicate(drType, val, valBytes, newVer); + drReplicate(drType, val, newVer); recordNodeId(affNodeId); @@ -1231,7 +1227,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean hadValPtr = valPtr != 0; - update(null, null, 0, 0, newVer); + update(null, 0, 0, newVer); if (cctx.offheapTiered() && hadValPtr) { boolean rmv = cctx.swap().removeOffheap(key); @@ -1254,7 +1250,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } } - drReplicate(drType, null, null, newVer); + drReplicate(drType, null, newVer); if (metrics && cctx.cache().configuration().isStatisticsEnabled()) cctx.cache().metrics0().onRemove(); @@ -1343,7 +1339,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridTuple3> innerUpdateLocal( + @Override public GridTuple3> innerUpdateLocal( GridCacheVersion ver, GridCacheOperation op, @Nullable Object writeObj, @@ -1412,11 +1408,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx); if (old != null) - updateIndex(old, null, expireTime, ver, null); + updateIndex(old, expireTime, ver, null); else clearIndex(null); - update(old, null, expireTime, ttl, ver); + update(old, expireTime, ttl, ver); } // Apply metrics. @@ -1434,7 +1430,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) updateTtl(expiryPlc); - return new T3<>(false, retval ? old : null, null); + return new T3<>(false, retval ? CU.value(old, cctx, false) : null, null); } } @@ -1442,6 +1438,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject updated; + Object key0 = null; + Object updated0 = null; + // Calculate new value. if (op == GridCacheOperation.TRANSFORM) { transformCloClsName = writeObj.getClass().getName(); @@ -1450,16 +1449,19 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert entryProcessor != null; - // TODO IGNITE-51. - CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, - key.value(cctx, false), - old.value(cctx, false)); + key0 = key.value(cctx, false); + old0 = value(old0, old, false); + + CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key0, old0); try { Object computed = entryProcessor.process(entry, invokeArgs); - if (entry.modified()) - updated = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())); + if (entry.modified()) { + updated0 = cctx.unwrapTemporary(entry.getValue()); + + updated = cctx.toCacheObject(updated0); + } else updated = old; @@ -1484,19 +1486,26 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; if (intercept) { -// TODO IGNITE-51. -// if (op == GridCacheOperation.UPDATE) { -// updated = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); -// -// if (updated == null) -// return new GridTuple3<>(false, cctx.unwrapTemporary(old), invokeRes); -// } -// else { -// interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key, old); -// -// if (cctx.cancelRemove(interceptorRes)) -// return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes); -// } + if (op == GridCacheOperation.UPDATE) { + key0 = value(key0, key, false); + updated0 = value(updated0, updated, false); + old0 = value(old0, old, false); + + Object interceptorVal + = cctx.config().getInterceptor().onBeforePut(key0, old0, updated0); + + if (interceptorVal == null) + return new GridTuple3<>(false, cctx.unwrapTemporary(old0), invokeRes); + } + else { + key0 = value(key0, key, false); + old0 = value(old0, old, false); + + interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0); + + if (cctx.cancelRemove(interceptorRes)) + return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes); + } } boolean hadVal = hasValueUnlocked(); @@ -1535,11 +1544,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - updateIndex(updated, null, expireTime, ver, old); + updateIndex(updated, expireTime, ver, old); assert ttl != CU.TTL_ZERO; - update(updated, null, expireTime, ttl, ver); + update(updated, expireTime, ttl, ver); if (evt) { CacheObject evtOld = null; @@ -1571,7 +1580,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // in load methods without actually holding entry lock. clearIndex(old); - update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); if (evt) { CacheObject evtOld = null; @@ -1603,15 +1612,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (intercept) { if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(key, val); + cctx.config().getInterceptor().onAfterPut(key0, val); else - cctx.config().getInterceptor().onAfterRemove(key, old); + cctx.config().getInterceptor().onAfterRemove(key0, old0); } } - // TODO IGNITE-51. return new GridTuple3<>(res, - cctx.unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : old), + cctx.unwrapTemporary(interceptorRes != null ? interceptorRes.get2() : CU.value(old, cctx, false)), invokeRes); } @@ -1623,7 +1631,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { UUID affNodeId, GridCacheOperation op, @Nullable Object writeObj, - @Nullable byte[] valBytes, @Nullable Object[] invokeArgs, boolean writeThrough, boolean retval, @@ -1814,7 +1821,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { old0 = readThrough(null, key, false, subjId, taskName); - oldVal = cctx.toCacheObject(oldVal); + oldVal = cctx.toCacheObject(old0); readThrough = true; @@ -1837,11 +1844,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } if (oldVal != null) - updateIndex(oldVal, null, initExpireTime, ver, null); + updateIndex(oldVal, initExpireTime, ver, null); else clearIndex(null); - update(oldVal, null, initExpireTime, initTtl, ver); + update(oldVal, initExpireTime, initTtl, ver); if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); @@ -1856,7 +1863,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Check filter inside of synchronization. if (!F.isEmptyOrNulls(filter)) { - // TODO IGNITE-51 can get key/value only once. boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { @@ -1903,15 +1909,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (computed != null) invokeRes = new CacheInvokeDirectResult(key, cctx.toCacheObject(cctx.unwrapTemporary(computed))); - - valBytes = null; } catch (Exception e) { invokeRes = new CacheInvokeDirectResult(key, e); updated = oldVal; - - valBytes = oldValBytes.getIfMarshaled(); } if (!entry.modified()) { @@ -1981,7 +1983,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { newExpireTime = CU.EXPIRE_TIME_ETERNAL; updated = null; - valBytes = null; } else { newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; @@ -2030,10 +2031,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { null, null, false); - else if (interceptorVal != updated0) { + else if (interceptorVal != updated0) updated = cctx.toCacheObject(cctx.unwrapTemporary(updated0)); - valBytes = null; - } } // Try write-through. @@ -2061,11 +2060,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - updateIndex(updated, valBytes, newExpireTime, newVer, oldVal); + updateIndex(updated, newExpireTime, newVer, oldVal); - update(updated, valBytes, newExpireTime, newTtl, newVer); + update(updated, newExpireTime, newTtl, newVer); - drReplicate(drType, updated, valBytes, newVer); + drReplicate(drType, updated, newVer); recordNodeId(affNodeId); @@ -2140,7 +2139,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { boolean hasValPtr = valPtr != 0; // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); assert newSysTtl == CU.TTL_NOT_CHANGED; assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; @@ -2155,7 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { recordNodeId(affNodeId); - drReplicate(drType, null, null, newVer); + drReplicate(drType, null, newVer); if (evt) { CacheObject evtOld = null; @@ -2304,11 +2303,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * * @param drType DR type. * @param val Value. - * @param valBytes Value bytes. * @param ver Version. * @throws IgniteCheckedException In case of exception. */ - private void drReplicate(GridDrType drType, @Nullable CacheObject val, @Nullable byte[] valBytes, GridCacheVersion ver) + private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { // TODO IGNITE-51. // if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) @@ -2456,7 +2454,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (cctx.deferredDelete() && !isStartVersion() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, null, 0L, 0L, ver); + update(null, 0L, 0L, ver); deletedUnlocked(true); @@ -2528,7 +2526,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { obsoleteVersionExtras(obsoleteVer); if (clear) - value(null, null); + value(null); } return obsoleteVer != null; @@ -2562,7 +2560,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (curVer == null || ver.equals(curVer)) { CacheObject val = saveValueForIndexUnlocked(); - value(null, null); + value(null); ver = newVer; @@ -2666,13 +2664,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** * * @param val New value. - * @param valBytes New value bytes. * @param expireTime Expiration time. * @param ttl Time to live. * @param ver Update version. */ - protected final void update(@Nullable CacheObject val, @Nullable byte[] valBytes, long expireTime, long ttl, - GridCacheVersion ver) { + protected final void update(@Nullable CacheObject val, long expireTime, long ttl, GridCacheVersion ver) { assert ver != null; assert Thread.holdsLock(this); assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; @@ -2682,7 +2678,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (oldExpireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl()) cctx.ttl().removeTrackedEntry(this); - value(val, valBytes); + value(val); ttlAndExpireTimeExtras(ttl, expireTime); @@ -2986,9 +2982,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); - updateIndex(val, null, expireTime, nextVer, old); + updateIndex(val, expireTime, nextVer, old); - update(val, null, expireTime, ttlExtras(), nextVer); + update(val, expireTime, ttlExtras(), nextVer); } if (log.isDebugEnabled()) @@ -3238,7 +3234,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @Override public synchronized CacheObject rawPut(CacheObject val, long ttl) { CacheObject old = this.val; - update(val, null, CU.toExpireTime(ttl), ttl, nextVersion()); + update(val, CU.toExpireTime(ttl), ttl, nextVersion()); return old; } @@ -3247,7 +3243,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @SuppressWarnings({"RedundantTypeArguments"}) @Override public boolean initialValue( CacheObject val, - byte[] valBytes, GridCacheVersion ver, long ttl, long expireTime, @@ -3267,15 +3262,15 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); - if (val != null || valBytes != null) - updateIndex(val, valBytes, expTime, ver, null); + if (val != null) + updateIndex(val, expTime, ver, null); // Version does not change for load ops. - update(val, valBytes, expTime, ttl, ver); + update(val, expTime, ttl, ver); boolean skipQryNtf = false; - if (val == null && valBytes == null) { + if (val == null) { skipQryNtf = true; if (cctx.deferredDelete() && !isInternal()) { @@ -3287,7 +3282,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { else if (deletedUnlocked()) deletedUnlocked(false); - drReplicate(drType, val, valBytes, ver); + drReplicate(drType, val, ver); if (!skipQryNtf) { if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) @@ -3322,7 +3317,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Version does not change for load ops. update(val, - unswapped.valueBytes(), unswapped.expireTime(), unswapped.ttl(), unswapped.version() @@ -3365,14 +3359,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); if (val != null) { - updateIndex(val, null, expTime, newVer, old); + updateIndex(val, expTime, newVer, old); if (deletedUnlocked()) deletedUnlocked(false); } // Version does not change for load ops. - update(val, null, expTime, ttl, newVer); + update(val, expTime, ttl, newVer); } return true; @@ -3584,7 +3578,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (expired) { if (cctx.deferredDelete() && !detached() && !isInternal()) { if (!deletedUnlocked()) { - update(null, null, 0L, 0L, ver); + update(null, 0L, 0L, ver); deletedUnlocked(true); @@ -3785,25 +3779,23 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * Updates cache index. * * @param val Value. - * @param valBytes Value bytes. * @param expireTime Expire time. * @param ver New entry version. * @param prevVal Previous value. * @throws IgniteCheckedException If update failed. */ protected void updateIndex(@Nullable CacheObject val, - @Nullable byte[] valBytes, long expireTime, GridCacheVersion ver, @Nullable CacheObject prevVal) throws IgniteCheckedException { assert Thread.holdsLock(this); - assert val != null || valBytes != null : "null values in update index for key: " + key; + assert val != null : "null values in update index for key: " + key; try { GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr != null) - qryMgr.store(key, null, val, valBytes, ver, expireTime); + qryMgr.store(key, null, val, null, ver, expireTime); } catch (IgniteCheckedException e) { throw new GridCacheIndexUpdateException(e); @@ -3920,7 +3912,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { clearIndex(prev); // Nullify value after swap. - value(null, null); + value(null); marked = true; @@ -3963,7 +3955,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { clearIndex(prevVal); // Nullify value after swap. - value(null, null); + value(null); marked = true; @@ -4018,7 +4010,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { valClsLdrId); } - value(null, null); + value(null); } } catch (GridCacheEntryRemovedException ignored) { @@ -4491,7 +4483,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } if (detached()) - return rawGet().value(cctx, false); + return CU.value(rawGet(), cctx, false); for (;;) { GridCacheEntryEx e = cctx.cache().peekEx(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index a4d7384..fffa35f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -587,8 +587,12 @@ public abstract class GridCacheMessage implements Message { int size = col.size(); - for (int i = 0 ; i < size; i++) - col.get(i).prepareMarshal(ctx.cacheObjectContext()); + for (int i = 0 ; i < size; i++) { + CacheObject obj = col.get(i); + + if (obj != null) + obj.prepareMarshal(ctx.cacheObjectContext()); + } } /** @@ -601,8 +605,10 @@ public abstract class GridCacheMessage implements Message { if (col == null) return; - for (CacheObject obj : col) - obj.prepareMarshal(ctx.cacheObjectContext()); + for (CacheObject obj : col) { + if (obj != null) + obj.prepareMarshal(ctx.cacheObjectContext()); + } } /** @@ -622,8 +628,12 @@ public abstract class GridCacheMessage implements Message { int size = col.size(); - for (int i = 0 ; i < size; i++) - col.get(i).finishUnmarshal(ctx, ldr); + for (int i = 0 ; i < size; i++) { + CacheObject obj = col.get(i); + + if (obj != null) + obj.finishUnmarshal(ctx, ldr); + } } /** @@ -640,8 +650,10 @@ public abstract class GridCacheMessage implements Message { if (col == null) return; - for (CacheObject obj : col) - obj.finishUnmarshal(ctx, ldr); + for (CacheObject obj : col) { + if (obj != null) + obj.finishUnmarshal(ctx, ldr); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 4b21f8c..012d393 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -511,7 +511,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First check off-heap store. if (readOffheap && offheapEnabled) { - byte[] bytes = offheap.get(spaceName, part, key, key.valueBytes(cctx)); + byte[] bytes = offheap.get(spaceName, part, key, keyBytes); if (bytes != null) return swapEntry(unmarshalSwapEntry(bytes)); @@ -522,7 +522,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { assert key != null; - byte[] bytes = swapMgr.read(spaceName, new SwapKey(key, part, keyBytes), cctx.deploy().globalLoader()); + byte[] bytes = swapMgr.read(spaceName, + new SwapKey(key.value(cctx, false), part, keyBytes), + cctx.deploy().globalLoader()); if (bytes == null && lsnr != null) return lsnr.entry; @@ -606,7 +608,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridTuple t = F.t1(); final GridTuple err = F.t1(); - swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), new CI1() { + swapMgr.remove(spaceName, new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)), new CI1() { @Override public void apply(byte[] rmv) { if (rmv != null) { try { @@ -727,7 +729,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return read(key, CU.marshal(cctx.shared(), key), part, false, readOffheap, readSwap); + return read(key, key.valueBytes(cctx), part, false, readOffheap, readSwap); } /** @@ -801,8 +803,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (unprocessedKeys == null) unprocessedKeys = new ArrayList<>(keys.size()); - unprocessedKeys.add( - new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key))); + SwapKey swapKey = new SwapKey(key.value(cctx, false), + cctx.affinity().partition(key), + key.valueBytes(cctx)); + + unprocessedKeys.add(swapKey); } } @@ -812,8 +817,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { else { unprocessedKeys = new ArrayList<>(keys.size()); - for (KeyCacheObject key : keys) - unprocessedKeys.add(new SwapKey(key, cctx.affinity().partition(key), CU.marshal(cctx.shared(), key))); + for (KeyCacheObject key : keys) { + SwapKey swapKey = new SwapKey(key.value(cctx, false), + cctx.affinity().partition(key), + key.valueBytes(cctx)); + + unprocessedKeys.add(swapKey); + } } assert swapEnabled; @@ -960,7 +970,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try offheap. if (offheapEnabled) { - byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx)); + byte[] val = offheap.remove(spaceName, part, key.value(cctx, false), key.valueBytes(cctx)); if (val != null) { if (c != null) @@ -970,9 +980,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } } - if (swapEnabled) - swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), c, + if (swapEnabled) { + swapMgr.remove(spaceName, + new SwapKey(key.value(cctx, false), part, key.valueBytes(cctx)), + c, cctx.deploy().globalLoader()); + } } /** @@ -1060,8 +1073,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { else { Map batch = new LinkedHashMap<>(); - for (GridCacheBatchSwapEntry entry : swapped) - batch.put(new SwapKey(entry.key(), entry.partition(), entry.key().valueBytes(cctx)), entry.marshal()); + for (GridCacheBatchSwapEntry entry : swapped) { + SwapKey swapKey = new SwapKey(entry.key().value(cctx, false), + entry.partition(), + entry.key().valueBytes(cctx)); + + batch.put(swapKey, entry.marshal()); + } swapMgr.writeAll(spaceName, batch, cctx.deploy().globalLoader()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index ad146fe..8ab6dec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -41,6 +41,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb */ public KeyCacheObjectImpl(Object val, byte[] valBytes) { assert val != null; + assert valBytes != null || this instanceof UserKeyCacheObjectImpl : this; this.val = val; this.valBytes = valBytes; @@ -61,8 +62,8 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ - @Override public byte[] valueBytes(GridCacheContext ctx) { - assert valBytes != null; + @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException { + assert valBytes != null : this; return valBytes; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java index 5c635a6..0b7bc2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; /** * Cache object wrapping key provided by user. Need to be copied before stored in cache. @@ -38,19 +39,29 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { } /** {@inheritDoc} */ + @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException { + if (valBytes == null) + valBytes = CU.marshal(ctx.shared(), val); + + return valBytes; + } + + /** {@inheritDoc} */ @Override public CacheObject prepareForCache(GridCacheContext ctx) { - if (needCopy(ctx)) { - try { - if (valBytes == null) - valBytes = ctx.marshaller().marshal(val); + try { + if (valBytes == null) + valBytes = ctx.marshaller().marshal(val); - return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()), valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); + if (needCopy(ctx)) { + Object val = ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()); + + return new KeyCacheObjectImpl(val, valBytes); } - } - else + return new KeyCacheObjectImpl(val, valBytes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 497af0a..8dbabc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -74,7 +74,7 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture txLockAsync( - Collection keys, + Collection keys, long timeout, IgniteTxLocalEx tx, boolean isRead, @@ -95,7 +95,15 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter IgniteTxLocalEx tx = ctx.tm().userTxx(); // Return value flag is true because we choose to bring values for explicit locks. - return lockAllAsync(keys, timeout, tx, false, false, /*retval*/true, null, -1L, filter); + return lockAllAsync(ctx.cacheKeysView(keys), + timeout, + tx, + false, + false, + /*retval*/true, + null, + -1L, + filter); } /** @@ -110,7 +118,7 @@ public abstract class GridDistributedCacheAdapter extends GridCacheAdapter * @param filter Optional filter. * @return Future for locks. */ - protected abstract IgniteInternalFuture lockAllAsync(Collection keys, + protected abstract IgniteInternalFuture lockAllAsync(Collection keys, long timeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 3dc9e45..5648ad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -87,9 +88,13 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectTransient private Map dhtVers; - /** Serialized map. */ - @GridToStringExclude - private byte[] dhtVersBytes; + /** */ + @GridDirectCollection(IgniteTxKey.class) + private Collection dhtVerKeys; + + /** */ + @GridDirectCollection(IgniteTxKey.class) + private Collection dhtVerVals; /** Group lock key, if any. */ @GridToStringInclude @@ -317,8 +322,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (grpLockKey != null && grpLockKeyBytes == null) grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey); - if (dhtVers != null && dhtVersBytes == null) - dhtVersBytes = ctx.marshaller().marshal(dhtVers); + if (dhtVers != null) { + for (IgniteTxKey key : dhtVers.keySet()) { + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); + + key.prepareMarshal(cctx); + } + + dhtVerKeys = dhtVers.keySet(); + dhtVerVals = dhtVers.values(); + } if (txNodes != null) txNodesBytes = ctx.marshaller().marshal(txNodes); @@ -349,8 +362,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (grpLockKeyBytes != null && grpLockKey == null) grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); - if (dhtVersBytes != null && dhtVers == null) - dhtVers = ctx.marshaller().unmarshal(dhtVersBytes, ldr); + if (dhtVerKeys != null && dhtVers == null) { + assert dhtVerVals != null; + assert dhtVerKeys.size() == dhtVerVals.size(); + + Iterator keyIt = dhtVerKeys.iterator(); + Iterator verIt = dhtVerVals.iterator(); + + dhtVers = U.newHashMap(dhtVerKeys.size()); + + while (keyIt.hasNext()) { + IgniteTxKey key = keyIt.next(); + + key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + + dhtVers.put(key, verIt.next()); + } + } if (txNodesBytes != null) txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); @@ -436,84 +464,90 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 9: - if (!writer.writeByteArray("dhtVersBytes", dhtVersBytes)) + if (!writer.writeCollection("dhtVerKeys", dhtVerKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 10: - if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes)) + if (!writer.writeCollection("dhtVerVals", dhtVerVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("invalidate", invalidate)) + if (!writer.writeByteArray("grpLockKeyBytes", grpLockKeyBytes)) return false; writer.incrementState(); case 12: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) + if (!writer.writeBoolean("invalidate", invalidate)) return false; writer.incrementState(); case 13: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 14: - if (!writer.writeBoolean("partLock", partLock)) + if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) return false; writer.incrementState(); case 15: - if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeBoolean("partLock", partLock)) return false; writer.incrementState(); case 16: - if (!writer.writeBoolean("sys", sys)) + if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); case 17: - if (!writer.writeLong("threadId", threadId)) + if (!writer.writeBoolean("sys", sys)) return false; writer.incrementState(); case 18: - if (!writer.writeLong("timeout", timeout)) + if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); case 19: - if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) + if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); case 20: - if (!writer.writeInt("txSize", txSize)) + if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("writeVer", writeVer)) + if (!writer.writeInt("txSize", txSize)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("writeVer", writeVer)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeCollection("writesBytes", writesBytes, MessageCollectionItemType.BYTE_ARR)) return false; @@ -548,7 +582,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 9: - dhtVersBytes = reader.readByteArray("dhtVersBytes"); + dhtVerKeys = reader.readCollection("dhtVerKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -556,7 +590,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 10: - grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes"); + dhtVerVals = reader.readCollection("dhtVerVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -564,7 +598,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 11: - invalidate = reader.readBoolean("invalidate"); + grpLockKeyBytes = reader.readByteArray("grpLockKeyBytes"); if (!reader.isLastRead()) return false; @@ -572,6 +606,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 12: + invalidate = reader.readBoolean("invalidate"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -583,7 +625,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 13: + case 14: onePhaseCommit = reader.readBoolean("onePhaseCommit"); if (!reader.isLastRead()) @@ -591,7 +633,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 14: + case 15: partLock = reader.readBoolean("partLock"); if (!reader.isLastRead()) @@ -599,7 +641,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 15: + case 16: readsBytes = reader.readCollection("readsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -607,7 +649,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 16: + case 17: sys = reader.readBoolean("sys"); if (!reader.isLastRead()) @@ -615,7 +657,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 17: + case 18: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -623,7 +665,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 18: + case 19: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -631,7 +673,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 19: + case 20: txNodesBytes = reader.readByteArray("txNodesBytes"); if (!reader.isLastRead()) @@ -639,7 +681,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 20: + case 21: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -647,7 +689,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 21: + case 22: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -655,7 +697,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 22: + case 23: writesBytes = reader.readCollection("writesBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -675,7 +717,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index c1000ea..3ce5cd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -500,12 +500,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) txEntry.cached().unswap(true, false); - GridTuple3 res = + IgniteBiTuple res = applyTransformClosures(txEntry, false); GridCacheOperation op = res.get1(); CacheObject val = res.get2(); - byte[] valBytes = res.get3(); GridCacheVersion explicitVer = txEntry.conflictVersion(); @@ -556,7 +555,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); else { - cached.innerSet(this, eventNodeId(), nodeId, val, valBytes, false, false, + cached.innerSet(this, eventNodeId(), nodeId, val, false, false, txEntry.ttl(), true, true, topVer, txEntry.filters(), replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(), near() ? null : explicitVer, CU.subjectId(this, cctx), @@ -565,7 +564,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Keep near entry up to date. if (nearCached != null) { CacheObject val0 = null; - byte[] valBytes0 = null; GridCacheValueBytes valBytesTuple = cached.valueBytes(); @@ -579,8 +577,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter else val0 = cached.rawGet(); - nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), - cached.ttl(), nodeId); + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), + nodeId); } } } @@ -591,7 +592,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Keep near entry up to date. if (nearCached != null) - nearCached.updateOrEvict(xidVer, null, null, 0, 0, nodeId); + nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId); } else if (op == RELOAD) { CacheObject reloaded = cached.innerReload(); @@ -599,7 +600,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (nearCached != null) { nearCached.innerReload(); - nearCached.updateOrEvict(cached.version(), reloaded, null, + nearCached.updateOrEvict(cached.version(), reloaded, cached.expireTime(), cached.ttl(), nodeId); } } @@ -621,7 +622,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (nearCached != null) { CacheObject val0 = null; - byte[] valBytes0 = null; GridCacheValueBytes valBytesTuple = cached.valueBytes(); @@ -635,8 +635,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter else val0 = cached.rawGet(); - nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), - cached.ttl(), nodeId); + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), + nodeId); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 298e522..5febfcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -458,7 +458,7 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap entry = entryEx(key, false); - entry.initialValue(cacheVal, null, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, + entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, replicate ? DR_LOAD : DR_NONE); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 09f1629..6e4eac8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -311,22 +311,18 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { if (isNew() || !valid(-1) || deletedUnlocked()) return null; else { - CacheObject val0 = null; - byte[] valBytes0 = null; - -// TODO IGNITE-51. -// GridCacheValueBytes valBytesTuple = valueBytesUnlocked(); -// -// if (!valBytesTuple.isNull()) { -// if (valBytesTuple.isPlain()) -// val0 = (V)valBytesTuple.get(); -// else -// valBytes0 = valBytesTuple.get(); -// } -// else -// val0 = val; - - return F.t(ver, val0, valBytes0); + CacheObject val0 = val; + + if (val0 == null && valPtr != 0) { + IgniteBiTuple t = valueBytes0(); + + if (t.get2()) + val0 = cctx.toCacheObject(t.get1(), null); + else + val0 = cctx.toCacheObject(null, t.get1()); + } + + return F.t(ver, val0, null); } } @@ -563,7 +559,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { clearIndex(prev); // Give to GC. - update(null, null, 0L, 0L, ver); + update(null, 0L, 0L, ver); if (swap) { releaseSwap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a040311d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 516b2bb..e9674c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -357,14 +357,14 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridDhtCach /** {@inheritDoc} */ @Override public IgniteInternalFuture lockAllAsync( - @Nullable Collection keys, + @Nullable Collection keys, long timeout, IgniteTxLocalEx txx, boolean isInvalidate, @@ -583,7 +583,7 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach * @param filter Optional filter. * @return Lock future. */ - public GridDhtFuture lockAllAsyncInternal(@Nullable Collection keys, + public GridDhtFuture lockAllAsyncInternal(@Nullable Collection keys, long timeout, IgniteTxLocalEx txx, boolean isInvalidate, @@ -612,16 +612,10 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach accessTtl, filter); - for (K key : keys) { - if (key == null) - continue; - - // TODO IGNITE-51. - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - + for (KeyCacheObject key : keys) { try { while (true) { - GridDhtCacheEntry entry = entryExx(cacheKey, tx.topologyVersion()); + GridDhtCacheEntry entry = entryExx(key, tx.topologyVersion()); try { fut.addEntry(entry);