Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 80064182A5 for ; Wed, 28 Oct 2015 13:15:41 +0000 (UTC) Received: (qmail 45664 invoked by uid 500); 28 Oct 2015 13:15:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 45567 invoked by uid 500); 28 Oct 2015 13:15:20 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42796 invoked by uid 99); 28 Oct 2015 13:14:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 13:14:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26399E0AA0; Wed, 28 Oct 2015 13:14:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Wed, 28 Oct 2015 13:15:05 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f22e753..82e5f2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -31,12 +31,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; import javax.cache.CacheException; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -57,7 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -85,10 +86,8 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.GPC; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; -import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.plugin.security.SecurityPermission; @@ -105,6 +104,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; import static org.apache.ignite.transactions.TransactionState.COMMITTED; @@ -424,46 +425,126 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture loadMissing( + @Override public IgniteInternalFuture loadMissing( final GridCacheContext cacheCtx, final boolean readThrough, boolean async, final Collection keys, - boolean deserializePortable, boolean skipVals, - final IgniteBiInClosure c + boolean needVer, + final GridInClosure3 c ) { - if (!async) { - try { - if (!readThrough || !cacheCtx.readThrough()) { - for (KeyCacheObject key : keys) - c.apply(key, null); + assert cacheCtx.isLocal() : cacheCtx.name(); - return new GridFinishedFuture<>(false); - } + if (!readThrough || !cacheCtx.readThrough()) { + for (KeyCacheObject key : keys) + c.apply(key, null, SER_READ_EMPTY_ENTRY_VER); - return new GridFinishedFuture<>( - cacheCtx.store().loadAll(this, keys, c)); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } + return new GridFinishedFuture<>(); } - else { - return cctx.kernalContext().closure().callLocalSafe( - new GPC() { - @Override public Boolean call() throws Exception { - if (!readThrough || !cacheCtx.readThrough()) { - for (KeyCacheObject key : keys) - c.apply(key, null); - - return false; + + try { + IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys); + + Map misses = null; + + for (KeyCacheObject key : keys) { + while (true) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + + GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) : + txEntry.cached(); + + if (entry == null) + continue; + + try { + T2 res = entry.innerGetVersioned(this, + /*readSwap*/true, + /*unmarshal*/true, + /*update-metrics*/!skipVals, + /*event*/!skipVals, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + expiryPlc); + + if (res == null) { + if (misses == null) + misses = new LinkedHashMap<>(); + + misses.put(key, entry.version()); } + else + c.apply(key, skipVals ? true : res.get1(), res.get2()); - return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c); + break; } - }, - true); + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry, will retry: " + key); + + if (txEntry != null) + txEntry.cached(cacheCtx.cache().entryEx(key)); + } + } + } + + if (misses != null) { + final Map misses0 = misses; + + cacheCtx.store().loadAll(this, misses.keySet(), new CI2() { + @Override public void apply(KeyCacheObject key, Object val) { + GridCacheVersion ver = misses0.remove(key); + + assert ver != null : key; + + if (val != null) { + CacheObject cacheVal = cacheCtx.toCacheObject(val); + + while (true) { + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + + try { + GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null); + + boolean set = setVer != null; + + if (set) + ver = setVer; + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry [set=" + set + + ", curVer=" + ver + ", newVer=" + setVer + ", " + + "entry=" + entry + ']'); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry, (will retry): " + entry); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } + } + } + else + ver = SER_READ_EMPTY_ENTRY_VER; + + c.apply(key, val, ver); + } + }); + + for (KeyCacheObject key : misses0.keySet()) + c.apply(key, null, SER_READ_EMPTY_ENTRY_VER); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } @@ -834,13 +915,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter IgniteBiTuple res = applyTransformClosures(txEntry, true); + GridCacheVersion dhtVer = null; + // For near local transactions we must record DHT version // in order to keep near entries on backup nodes until // backup remote transaction completes. if (cacheCtx.isNear()) { if (txEntry.op() == CREATE || txEntry.op() == UPDATE || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) - ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion()); + dhtVer = txEntry.dhtVersion(); if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) && txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { @@ -921,6 +1004,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.conflictVersion(explicitVer); } + if (dhtVer == null) + dhtVer = explicitVer != null ? explicitVer : writeVersion(); + if (op == CREATE || op == UPDATE) { GridCacheUpdateTxResult updRes = cached.innerSet( this, @@ -938,9 +1024,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.conflictExpireTime(), cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); + resolveTaskName(), + dhtVer); - if (nearCached != null && updRes.success()) + if (nearCached != null && updRes.success()) { nearCached.innerSet( null, eventNodeId(), @@ -957,7 +1044,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.conflictExpireTime(), null, CU.subjectId(this, cctx), - resolveTaskName()); + resolveTaskName(), + dhtVer); + } } else if (op == DELETE) { GridCacheUpdateTxResult updRes = cached.innerRemove( @@ -973,9 +1062,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cached.detached() ? DR_NONE : drType, cached.isNear() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); + resolveTaskName(), + dhtVer); - if (nearCached != null && updRes.success()) + if (nearCached != null && updRes.success()) { nearCached.innerRemove( null, eventNodeId(), @@ -989,7 +1079,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter DR_NONE, null, CU.subjectId(this, cctx), - resolveTaskName()); + resolveTaskName(), + dhtVer); + } } else if (op == RELOAD) { cached.innerReload(); @@ -1180,14 +1272,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter commitErr.get()); } + if (near()) { + // Must evict near entries before rolling back from + // transaction manager, so they will be removed from cache. + for (IgniteTxEntry e : allEntries()) + evictNearEntry(e, false); + } + if (doneFlag.compareAndSet(false, true)) { try { - if (near()) - // Must evict near entries before rolling back from - // transaction manager, so they will be removed from cache. - for (IgniteTxEntry e : allEntries()) - evictNearEntry(e, false); - cctx.tm().rollbackTx(this); if (!internal()) { @@ -1228,12 +1321,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param entry Entry. + * @return {@code True} if local node is current primary for given entry. + */ + private boolean primaryLocal(GridCacheEntryEx entry) { + return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE); + } + + /** * Checks if there is a cached or swapped value for - * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method. + * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. * * @param cacheCtx Cache context. * @param keys Key to enlist. - * @param cached Cached entry, if called from entry wrapper. * @param expiryPlc Explicitly specified expiry policy for entry. * @param map Return map. * @param missed Map of missed keys. @@ -1249,7 +1349,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter private Collection enlistRead( final GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, @Nullable ExpiryPolicy expiryPlc, Map map, Map missed, @@ -1261,7 +1360,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ) throws IgniteCheckedException { assert !F.isEmpty(keys); assert keysCnt == keys.size(); - assert cached == null || F.first(keys).equals(cached.key()); cacheCtx.checkSecurity(SecurityPermission.CACHE_READ); @@ -1271,11 +1369,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter AffinityTopologyVersion topVer = topologyVersion(); + boolean needReadVer = serializable() && optimistic(); + // In this loop we cover only read-committed or optimistic transactions. // Transactions that are pessimistic and not read-committed are covered // outside of this loop. for (KeyCacheObject key : keys) { - if (pessimistic() && !readCommitted() && !skipVals) + if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals) addActiveCache(cacheCtx); IgniteTxKey txKey = cacheCtx.txKey(key); @@ -1337,13 +1437,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter break; } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + txEntry); - - if (!readCommitted()) - txEntry.readValue(e.value()); - } catch (GridCacheEntryRemovedException ignored) { txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); } @@ -1359,38 +1452,49 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter lockKeys.add(key); while (true) { - GridCacheEntryEx entry; - - if (cached != null) { - entry = cached; - - cached = null; - } - else - entry = entryEx(cacheCtx, txKey, topVer); + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer); try { GridCacheVersion ver = entry.version(); CacheObject val = null; + GridCacheVersion readVer = null; if (!pessimistic() || readCommitted() && !skipVals) { IgniteCacheExpiryPolicy accessPlc = optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; - // This call will check for filter. - val = entry.innerGet(this, - /*swap*/true, - /*no read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc); + if (needReadVer) { + T2 res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc) : null; + + if (res != null) { + val = res.get1(); + readVer = res.get2(); + } + } + else { + val = entry.innerGet(this, + /*swap*/true, + /*no read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc); + } if (val != null) { cacheCtx.addResult(map, @@ -1424,8 +1528,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // As optimization, mark as checked immediately // for non-pessimistic if value is not null. - if (val != null && !pessimistic()) + if (val != null && !pessimistic()) { txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(readVer); + } + } } break; // While. @@ -1434,34 +1545,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + entry); - - if (!readCommitted()) { - // Value for which failure occurred. - CacheObject val = e.value(); - - txEntry = addEntry(READ, - val, - null, - null, - entry, - expiryPlc, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore); - - // Mark as checked immediately for non-pessimistic. - if (val != null && !pessimistic()) - txEntry.markValid(); - } - - break; // While loop. - } finally { if (cacheCtx.isNear() && entry != null && readCommitted()) { if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) { @@ -1492,6 +1575,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param keys Keys. + * @return Expiry policy. + */ + protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { + return null; + } + + /** * Adds skipped key. * * @param skipped Skipped set (possibly {@code null}). @@ -1512,12 +1604,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** * Loads all missed keys for - * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method. + * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. * * @param cacheCtx Cache context. * @param map Return map. * @param missedMap Missed keys. - * @param redos Keys to retry. * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects flag. @@ -1528,55 +1619,25 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final GridCacheContext cacheCtx, final Map map, final Map missedMap, - @Nullable final Collection redos, final boolean deserializePortable, final boolean skipVals, final boolean keepCacheObjects, final boolean skipStore ) { - assert redos != null || pessimistic(); - if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); - final Collection loaded = new HashSet<>(); + final boolean needReadVer = serializable() && optimistic(); return new GridEmbeddedFuture<>( - new C2>() { - @Override public Map apply(Boolean b, Exception e) { + new C2>() { + @Override public Map apply(Void v, Exception e) { if (e != null) { setRollbackOnly(); throw new GridClosureException(e); } - if (!b && !readCommitted()) { - // There is no store - we must mark the entries. - for (KeyCacheObject key : missedMap.keySet()) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - - if (txEntry != null) - txEntry.markValid(); - } - } - - if (readCommitted()) { - Collection notFound = new HashSet<>(missedMap.keySet()); - - notFound.removeAll(loaded); - - // In read-committed mode touch entries that have just been read. - for (KeyCacheObject key : notFound) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - - GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : - txEntry.cached(); - - if (entry != null) - cacheCtx.evicts().touch(entry, topologyVersion()); - } - } - return map; } }, @@ -1585,13 +1646,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter !skipStore, false, missedMap.keySet(), - deserializePortable, skipVals, - new CI2() { - /** */ - private GridCacheVersion nextVer; - - @Override public void apply(KeyCacheObject key, Object val) { + needReadVer, + new GridInClosure3() { + @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { if (isRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Ignoring loaded value for read because transaction was rolled back: " + @@ -1600,15 +1658,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return; } - GridCacheVersion ver = missedMap.get(key); - - if (ver == null) { - if (log.isDebugEnabled()) - log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); - - return; - } - CacheObject cacheVal = cacheCtx.toCacheObject(val); CacheObject visibleVal = cacheVal; @@ -1625,90 +1674,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter visibleVal = txEntry.applyEntryProcessors(visibleVal); } - // In pessimistic mode we hold the lock, so filter validation - // should always be valid. - if (pessimistic()) - ver = null; - - // Initialize next version. - if (nextVer == null) - nextVer = cctx.versions().next(topologyVersion()); - - while (true) { - assert txEntry != null || readCommitted() || skipVals; - - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - - try { - // Must initialize to true since even if filter didn't pass, - // we still record the transaction value. - boolean set; - - try { - set = e.versionedValue(cacheVal, ver, nextVer); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAll method " + - "(will try again): " + e); - - if (pessimistic() && !readCommitted() && !isRollbackOnly()) { - U.error(log, "Inconsistent transaction state (entry got removed while " + - "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); - - setRollbackOnly(); - - return; - } - - if (txEntry != null) - txEntry.cached(entryEx(cacheCtx, txKey)); - - continue; // While loop. - } - - // In pessimistic mode, we should always be able to set. - assert set || !pessimistic(); - - if (readCommitted() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); + assert txEntry != null || readCommitted() || skipVals; - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } - else { - assert txEntry != null; + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); - txEntry.setAndMarkValid(cacheVal); + if (readCommitted() || skipVals) { + cacheCtx.evicts().touch(e, topologyVersion()); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); + } + } + else { + assert txEntry != null; - loaded.add(key); + txEntry.setAndMarkValid(cacheVal); - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry from transaction [set=" + set + - ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + if (needReadVer) { + assert loadVer != null; - break; // While loop. + txEntry.serializableReadVersion(loadVer); } - catch (IgniteCheckedException ex) { - throw new IgniteException("Failed to put value for cache entry: " + e, ex); + + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializePortable, + false); } } } @@ -1720,7 +1721,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Override public IgniteInternalFuture> getAllAsync( final GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, final boolean deserializePortable, final boolean skipVals, final boolean keepCacheObjects, @@ -1747,7 +1747,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final Collection lockKeys = enlistRead(cacheCtx, keys, - cached, expiryPlc, retMap, missed, @@ -1850,20 +1849,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.cached(entryEx(cacheCtx, txKey)); } - catch (GridCacheFilterFailedException e) { - // Failed value for the filter. - CacheObject val = e.value(); - - if (val != null) { - // If filter fails after lock is acquired, we don't reload, - // regardless if value is null or not. - missed.remove(cacheKey); - - txEntry.setAndMarkValid(val); - } - - break; // While. - } } } @@ -1871,7 +1856,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return checkMissed(cacheCtx, retMap, missed, - null, deserializePortable, skipVals, keepCacheObjects, @@ -1920,8 +1904,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { assert optimistic() || readCommitted() || skipVals; - final Collection redos = new ArrayList<>(); - if (!missed.isEmpty()) { if (!readCommitted()) for (Iterator it = missed.keySet().iterator(); it.hasNext(); ) { @@ -1937,67 +1919,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (missed.isEmpty()) return new GridFinishedFuture<>(retMap); - IgniteInternalFuture> fut0 = checkMissed(cacheCtx, + return checkMissed(cacheCtx, retMap, missed, - redos, deserializePortable, skipVals, keepCacheObjects, skipStore); - - return new GridEmbeddedFuture<>( - // First future. - fut0, - // Closure that returns another future, based on result from first. - new PMC>() { - @Override public IgniteInternalFuture> postMiss(Map map) { - if (redos.isEmpty()) - return new GridFinishedFuture<>( - Collections.emptyMap()); - - if (log.isDebugEnabled()) - log.debug("Starting to future-recursively get values for keys: " + redos); - - // Future recursion. - return getAllAsync(cacheCtx, - redos, - null, - deserializePortable, - skipVals, - true, - skipStore); - } - }, - // Finalize. - new FinishClosure>() { - @Override Map finish(Map loaded) { - for (Map.Entry entry : loaded.entrySet()) { - KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey(); - - IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey)); - - CacheObject val = (CacheObject)entry.getValue(); - - if (!readCommitted()) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - cacheCtx.addResult(retMap, - cacheKey, - val, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - - return retMap; - } - } - ); } return new GridFinishedFuture<>(retMap); @@ -2016,8 +1944,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheContext cacheCtx, Map map, boolean retval, - @Nullable GridCacheEntryEx cached, - long ttl, CacheEntryPredicate[] filter ) { return (IgniteInternalFuture)putAllAsync0(cacheCtx, @@ -2026,7 +1952,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, null, retval, - cached, filter); } @@ -2041,7 +1966,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter null, drMap, false, - null, null); } @@ -2058,7 +1982,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter invokeArgs, null, true, - null, null); } @@ -2067,20 +1990,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter GridCacheContext cacheCtx, Map drMap ) { - return removeAllAsync0(cacheCtx, null, drMap, null, false, null); + return removeAllAsync0(cacheCtx, null, drMap, false, null, false); } /** * Checks filter for non-pessimistic transactions. * - * @param cached Cached entry. + * @param cctx Cache context. + * @param key Key. + * @param val Value. * @param filter Filter to check. * @return {@code True} if passed or pessimistic. - * @throws IgniteCheckedException If failed. */ - private boolean filter(GridCacheEntryEx cached, - CacheEntryPredicate[] filter) throws IgniteCheckedException { - return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter); + private boolean filter( + GridCacheContext cctx, + KeyCacheObject key, + CacheObject val, + CacheEntryPredicate[] filter) { + return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter); } /** @@ -2088,7 +2015,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * * @param cacheCtx Cache context. * @param keys Keys to enlist. - * @param cached Cached entry. * @param expiryPlc Explicitly specified expiry policy for entry. * @param implicit Implicit flag. * @param lookup Value lookup map ({@code null} for remove). @@ -2102,28 +2028,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param drPutMap DR put map (optional). * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). */ - protected IgniteInternalFuture> enlistWrite( + private IgniteInternalFuture> enlistWrite( final GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, @Nullable ExpiryPolicy expiryPlc, boolean implicit, @Nullable Map lookup, @Nullable Map> invokeMap, @Nullable Object[] invokeArgs, - boolean retval, + final boolean retval, boolean lockOnly, - CacheEntryPredicate[] filter, + final CacheEntryPredicate[] filter, final GridCacheReturn ret, Collection enlisted, @Nullable Map drPutMap, @Nullable Map drRmvMap, - boolean skipStore + boolean skipStore, + final boolean singleRmv ) { - assert cached == null || keys.size() == 1; - assert cached == null || F.first(keys).equals(cached.key()); + assert retval || invokeMap == null; try { addActiveCache(cacheCtx); @@ -2138,6 +2064,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter Set missedForLoad = null; + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needVal = singleRmv || retval || hasFilters; + final boolean needReadVer = needVal && (serializable() && optimistic()); + try { // Set transform flag for transaction. if (invokeMap != null) @@ -2194,19 +2124,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // First time access. if (txEntry == null) { while (true) { - GridCacheEntryEx entry = null; + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); try { - if (cached != null) { - entry = cached; - - cached = null; - } - else { - entry = entryEx(cacheCtx, txKey, topologyVersion()); - - entry.unswap(false); - } + entry.unswap(false); // Check if lock is being explicitly acquired by the same thread. if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && @@ -2217,45 +2138,57 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ", locNodeId=" + cctx.localNodeId() + ']'); CacheObject old = null; - - boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + GridCacheVersion readVer = null; if (optimistic() && !implicit()) { try { - // Should read through if filter is specified. - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/readThrough && cacheCtx.loadPreviousValue(), - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null); + if (needReadVer) { + T2 res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null) : null; + + if (res != null) { + old = res.get1(); + readVer = res.get2(); + } + } + else { + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); + } } catch (ClusterTopologyCheckedException e) { entry.context().evicts().touch(entry, topologyVersion()); throw e; } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false : "Empty filter failed: " + e; - } } else old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - if (!filter(entry, filter)) { + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { skipped = skip(skipped, cacheKey); ret.set(cacheCtx, old, false); - if (!readCommitted() && old != null) { + if (!readCommitted()) { // Enlist failed filters as reads for non-read-committed mode, // so future ops will get the same values. txEntry = addEntry(READ, @@ -2272,9 +2205,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter skipStore); txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } } - if (readCommitted() || old == null) + if (readCommitted()) cacheCtx.evicts().touch(entry, topologyVersion()); break; // While. @@ -2305,9 +2244,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.markValid(); if (old == null) { - boolean load = retval && !readThrough; - - if (load) { + if (needVal) { if (missedForLoad == null) missedForLoad = new HashSet<>(); @@ -2324,6 +2261,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } else { + if (needReadVer) { + assert readVer != null; + + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } + if (retval && !transform) ret.set(cacheCtx, old, true); else { @@ -2369,7 +2312,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { if (entryProcessor == null && txEntry.op() == TRANSFORM) throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after transform closure is applied): " + key); + "transaction after EntryProcessor is applied): " + key); GridCacheEntryEx entry = txEntry.cached(); @@ -2378,7 +2321,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter boolean del = txEntry.op() == DELETE && rmv; if (!del) { - if (!filter(entry, filter)) { + if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { skipped = skip(skipped, cacheKey); ret.set(cacheCtx, v, false); @@ -2439,15 +2382,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (missedForLoad != null) { - IgniteInternalFuture fut = loadMissing( + final boolean skipVals = singleRmv; + + IgniteInternalFuture fut = loadMissing( cacheCtx, /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, /*async*/true, missedForLoad, - deserializePortables(cacheCtx), - /*skip values*/false, - new CI2() { - @Override public void apply(KeyCacheObject key, Object val) { + skipVals, + needReadVer, + new GridInClosure3() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { if (log.isDebugEnabled()) log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); @@ -2455,33 +2402,50 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert e != null; - CacheObject cacheVal = cacheCtx.toCacheObject(val); + if (needReadVer) { + assert loadVer != null; - if (e.op() == TRANSFORM) { - GridCacheVersion ver; + e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + ret.set(cacheCtx, null, val != null); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - ver = null; + if (e.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(e, cacheVal, ret, ver); } + else { + boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - addInvokeResult(e, cacheVal, ret, ver); + ret.set(cacheCtx, cacheVal, success); + } } - else - ret.set(cacheCtx, cacheVal, true); } }); return new GridEmbeddedFuture<>( - new C2>() { - @Override public Set apply(Boolean b, Exception e) { + new C2>() { + @Override public Set apply(Void b, Exception e) { if (e != null) throw new GridClosureException(e); @@ -2495,6 +2459,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cctx Cache context. + * @param key Key. + * @param val Value. + * @param filter Filter. + * @return {@code True} if filter passed. + */ + private boolean isAll(GridCacheContext cctx, + KeyCacheObject key, + CacheObject val, + CacheEntryPredicate[] filter) { + GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) { + @Nullable @Override public CacheObject peekVisibleValue() { + return rawGet(); + } + }; + + for (CacheEntryPredicate p0 : filter) { + if (p0 != null && !p0.apply(e)) + return false; + } + + return true; + } + + /** * Post lock processing for put or remove. * * @param cacheCtx Context. @@ -2555,29 +2544,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (retval || invoke) { if (!cacheCtx.isNear()) { - try { - if (!hasPrevVal) { - boolean readThrough = - (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore(); - - v = cached.innerGet(this, - /*swap*/true, - readThrough, - /*failFast*/false, - /*unmarshal*/true, - /*metrics*/!invoke, - /*event*/!invoke && !dht(), - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - null); - } - } - catch (GridCacheFilterFailedException e) { - e.printStackTrace(); - - assert false : "Empty filter failed: " + e; + if (!hasPrevVal) { + boolean readThrough = + (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore(); + + v = cached.innerGet(this, + /*swap*/true, + readThrough, + /*failFast*/false, + /*unmarshal*/true, + /*metrics*/!invoke, + /*event*/!invoke && !dht(), + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + null); } } else { @@ -2725,7 +2707,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param invokeArgs Optional arguments for EntryProcessor. * @param drMap DR map. * @param retval Key-transform value map to store. - * @param cached Cached entry, if any. * @param filter Filter. * @return Operation future. */ @@ -2737,7 +2718,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Nullable final Object[] invokeArgs, @Nullable final Map drMap, final boolean retval, - @Nullable GridCacheEntryEx cached, @Nullable final CacheEntryPredicate[] filter ) { assert filter == null || invokeMap == null; @@ -2778,8 +2758,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); assert map0 != null || invokeMap0 != null; - assert cached == null || - (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1); try { checkValid(); @@ -2814,7 +2792,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final IgniteInternalFuture> loadFut = enlistWrite( cacheCtx, keySet, - cached, opCtx != null ? opCtx.expiry() : null, implicit, map0, @@ -2827,7 +2804,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter enlisted, drMap, null, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + false); if (pessimistic()) { // Loose all skipped. @@ -2921,8 +2899,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } else return nonInterruptable(loadFut.chain(new CX1>, GridCacheReturn>() { - @Override - public GridCacheReturn applyx(IgniteInternalFuture> f) throws IgniteCheckedException { + @Override public GridCacheReturn applyx(IgniteInternalFuture> f) throws IgniteCheckedException { f.get(); return ret; @@ -2951,11 +2928,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter @Override public IgniteInternalFuture removeAllAsync( GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, boolean retval, - CacheEntryPredicate[] filter + CacheEntryPredicate[] filter, + boolean singleRmv ) { - return removeAllAsync0(cacheCtx, keys, null, cached, retval, filter); + return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv); } /** @@ -2963,8 +2940,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param keys Keys to remove. * @param drMap DR map. * @param retval Flag indicating whether a value should be returned. - * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1. * @param filter Filter. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. * @return Future for asynchronous remove. */ @SuppressWarnings("unchecked") @@ -2972,9 +2949,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final GridCacheContext cacheCtx, @Nullable final Collection keys, @Nullable Map drMap, - @Nullable GridCacheEntryEx cached, final boolean retval, - @Nullable final CacheEntryPredicate[] filter) { + @Nullable final CacheEntryPredicate[] filter, + boolean singleRmv) { try { checkUpdatesAllowed(cacheCtx); } @@ -2998,7 +2975,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter keys0 = keys; assert keys0 != null; - assert cached == null || keys0.size() == 1; if (log.isDebugEnabled()) log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit + @@ -3043,7 +3019,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final IgniteInternalFuture> loadFut = enlistWrite( cacheCtx, keys0, - /** cached entry */null, plc, implicit, /** lookup map */null, @@ -3056,7 +3031,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter enlisted, null, drMap, - opCtx != null && opCtx.skipStore() + opCtx != null && opCtx.skipStore(), + singleRmv ); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 6f72290..0d83338 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -19,17 +19,17 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; import java.util.Map; +import javax.cache.Cache; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.internal.util.lang.GridInClosure3; import org.jetbrains.annotations.Nullable; /** @@ -64,8 +64,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @param cacheCtx Cache context. * @param keys Keys to get. - * @param cached Cached entry if this method is called from entry wrapper - * Cached entry is passed if and only if there is only one key in collection of keys. * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects @@ -75,7 +73,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { public IgniteInternalFuture> getAllAsync( GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, boolean deserializePortable, boolean skipVals, boolean keepCacheObjects, @@ -85,17 +82,13 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param cacheCtx Cache context. * @param map Map to put. * @param retval Flag indicating whether a value should be returned. - * @param cached Cached entry, if any. Will be provided only if map has size 1. * @param filter Filter. - * @param ttl Time to live for entry. If negative, leave unchanged. * @return Future for put operation. */ public IgniteInternalFuture putAllAsync( GridCacheContext cacheCtx, Map map, boolean retval, - @Nullable GridCacheEntryEx cached, - long ttl, CacheEntryPredicate[] filter); /** @@ -113,16 +106,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param cacheCtx Cache context. * @param keys Keys to remove. * @param retval Flag indicating whether a value should be returned. - * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1. * @param filter Filter. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. * @return Future for asynchronous remove. */ public IgniteInternalFuture removeAllAsync( GridCacheContext cacheCtx, Collection keys, - @Nullable GridCacheEntryEx cached, boolean retval, - CacheEntryPredicate[] filter); + CacheEntryPredicate[] filter, + boolean singleRmv); /** * @param cacheCtx Cache context. @@ -161,17 +154,17 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { * @param readThrough Read through flag. * @param async if {@code True}, then loading will happen in a separate thread. * @param keys Keys. - * @param c Closure. - * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. + * @param needVer If {@code true} version is required for loaded values. + * @param c Closure to be applied for loaded values. * @return Future with {@code True} value if loading took place. */ - public IgniteInternalFuture loadMissing( + public IgniteInternalFuture loadMissing( GridCacheContext cacheCtx, boolean readThrough, boolean async, Collection keys, - boolean deserializePortable, boolean skipVals, - IgniteBiInClosure c); + boolean needVer, + GridInClosure3 c); } \ No newline at end of file