Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E766200C3D for ; Tue, 14 Mar 2017 16:16:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9CF35160B7C; Tue, 14 Mar 2017 15:16:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 622F6160B8C for ; Tue, 14 Mar 2017 16:16:04 +0100 (CET) Received: (qmail 76586 invoked by uid 500); 14 Mar 2017 15:16:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 76514 invoked by uid 99); 14 Mar 2017 15:16:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 15:16:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6984BDFE1E; Tue, 14 Mar 2017 15:16:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 14 Mar 2017 15:16:04 -0000 Message-Id: <5d200032757e4eb0b9653e561bb1ccb3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] ignite git commit: ignite-4768 txs archived-at: Tue, 14 Mar 2017 15:16:06 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/5523eac7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b1a4003..d457399 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -18,33 +18,23 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.io.Externalizable; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import javax.cache.Cache; -import javax.cache.CacheException; import javax.cache.expiry.Duration; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheOperationContext; -import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -58,9 +48,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; -import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -68,36 +55,26 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; -import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.C2; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; @@ -105,8 +82,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; -import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; -import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; import static org.apache.ignite.transactions.TransactionState.COMMITTED; @@ -391,142 +366,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig return null; } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture loadMissing( - final GridCacheContext cacheCtx, - final AffinityTopologyVersion topVer, - final boolean readThrough, - boolean async, - final Collection keys, - boolean skipVals, - boolean needVer, - boolean keepBinary, - final ExpiryPolicy expiryPlc, - final GridInClosure3 c - ) { - assert cacheCtx.isLocal() : cacheCtx.name(); - - if (!readThrough || !cacheCtx.readThrough()) { - for (KeyCacheObject key : keys) - c.apply(key, null, SER_READ_EMPTY_ENTRY_VER); - - return new GridFinishedFuture<>(); - } - - try { - IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ? - accessPolicy(cacheCtx, keys) : - cacheCtx.cache().expiryPolicy(expiryPlc); - - Map misses = null; - - for (KeyCacheObject key : keys) { - while (true) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); - - GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) : - txEntry.cached(); - - if (entry == null) - continue; - - try { - EntryGetResult res = entry.innerGetVersioned( - null, - this, - /*readSwap*/true, - /*unmarshal*/true, - /*update-metrics*/!skipVals, - /*event*/!skipVals, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - expiryPlc0, - txEntry == null ? keepBinary : txEntry.keepBinary(), - null); - - if (res == null) { - if (misses == null) - misses = new LinkedHashMap<>(); - - misses.put(key, entry.version()); - } - else - c.apply(key, skipVals ? true : res.value(), res.version()); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry, will retry: " + key); - - if (txEntry != null) - txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion())); - } - } - } - - if (misses != null) { - final Map misses0 = misses; - - cacheCtx.store().loadAll(this, misses.keySet(), new CI2() { - @Override public void apply(KeyCacheObject key, Object val) { - GridCacheVersion ver = misses0.remove(key); - - assert ver != null : key; - - if (val != null) { - CacheObject cacheVal = cacheCtx.toCacheObject(val); - - while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer); - - try { - EntryGetResult verVal = entry.versionedValue(cacheVal, - ver, - null, - null, - null); - - if (log.isDebugEnabled()) { - log.debug("Set value loaded from store into entry [" + - "oldVer=" + ver + - ", newVer=" + verVal.version() + - ", entry=" + entry + ']'); - } - - ver = verVal.version(); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry, (will retry): " + entry); - } - catch (IgniteCheckedException e) { - // Wrap errors (will be unwrapped). - throw new GridClosureException(e); - } - } - } - else - ver = SER_READ_EMPTY_ENTRY_VER; - - c.apply(key, val, ver); - } - }); - - for (KeyCacheObject key : misses0.keySet()) - c.apply(key, null, SER_READ_EMPTY_ENTRY_VER); - } - - return new GridFinishedFuture<>(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - /** * Gets minimum version present in transaction. * @@ -1103,2484 +942,226 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** - * @param entry Entry. - * @return {@code True} if local node is current primary for given entry. + * @param ctx Cache context. + * @param key Key. + * @param expiryPlc Expiry policy. + * @return Expiry policy wrapper for entries accessed locally in optimistic transaction. */ - private boolean primaryLocal(GridCacheEntryEx entry) { - return entry.context().affinity().primaryByPartition(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE); + protected IgniteCacheExpiryPolicy accessPolicy( + GridCacheContext ctx, + IgniteTxKey key, + @Nullable ExpiryPolicy expiryPlc + ) { + return null; } /** * @param cacheCtx Cache context. - * @param keys Key to enlist. - * @param expiryPlc Explicitly specified expiry policy for entry. - * @param map Return map. - * @param missed Map of missed keys. - * @param keysCnt Keys count (to avoid call to {@code Collection.size()}). - * @param deserializeBinary Deserialize binary flag. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects flag. - * @param skipStore Skip store flag. - * @throws IgniteCheckedException If failed. - * @return Enlisted keys. + * @param keys Keys. + * @return Expiry policy. */ - @SuppressWarnings({"RedundantTypeArguments"}) - private Collection enlistRead( - final GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Collection keys, - @Nullable ExpiryPolicy expiryPlc, - Map map, - Map missed, - int keysCnt, - boolean deserializeBinary, - boolean skipVals, - boolean keepCacheObjects, - boolean skipStore, - final boolean needVer - ) throws IgniteCheckedException { - assert !F.isEmpty(keys); - assert keysCnt == keys.size(); - - cacheCtx.checkSecurity(SecurityPermission.CACHE_READ); - - boolean single = keysCnt == 1; - - Collection lockKeys = null; - - AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion(); - - boolean needReadVer = (serializable() && optimistic()) || needVer; - - // In this loop we cover only read-committed or optimistic transactions. - // Transactions that are pessimistic and not read-committed are covered - // outside of this loop. - for (KeyCacheObject key : keys) { - if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals) - addActiveCache(cacheCtx); - - IgniteTxKey txKey = cacheCtx.txKey(key); - - // Check write map (always check writes first). - IgniteTxEntry txEntry = entry(txKey); - - // Either non-read-committed or there was a previous write. - if (txEntry != null) { - CacheObject val = txEntry.value(); - - if (txEntry.hasValue()) { - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - if (val != null) { - GridCacheVersion ver = null; - - if (needVer) { - if (txEntry.op() != READ) - ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; - else { - ver = txEntry.entryReadVersion(); + protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { + return null; + } - if (ver == null && pessimistic()) { - while (true) { - try { - GridCacheEntryEx cached = txEntry.cached(); + /** + * Post lock processing for put or remove. + * + * @param cacheCtx Context. + * @param keys Keys. + * @param ret Return value. + * @param rmv {@code True} if remove. + * @param retval Flag to return value or not. + * @param read {@code True} if read. + * @param accessTtl TTL for read operation. + * @param filter Filter to check entries. + * @throws IgniteCheckedException If error. + * @param computeInvoke If {@code true} computes return value for invoke operation. + */ + @SuppressWarnings("unchecked") + protected final void postLockWrite( + GridCacheContext cacheCtx, + Iterable keys, + GridCacheReturn ret, + boolean rmv, + boolean retval, + boolean read, + long accessTtl, + CacheEntryPredicate[] filter, + boolean computeInvoke + ) throws IgniteCheckedException { + for (KeyCacheObject k : keys) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(k)); - ver = cached.isNear() ? - ((GridNearCacheEntry)cached).dhtVersion() : cached.version(); + if (txEntry == null) + throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " + + "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']'); - break; - } - catch (GridCacheEntryRemovedException ignored) { - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); - } - } - } + while (true) { + GridCacheEntryEx cached = txEntry.cached(); - if (ver == null) { - assert optimistic() && repeatableRead() : this; + try { + assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() : + "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + + ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; - ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; - } - } + if (log.isDebugEnabled()) + log.debug("Post lock write entry: " + cached); - assert ver != null; - } + CacheObject v = txEntry.previousValue(); + boolean hasPrevVal = txEntry.hasPreviousValue(); - cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, - ver, 0, 0); - } - } - else { - assert txEntry.op() == TRANSFORM; + if (onePhaseCommit()) + filter = txEntry.filters(); - while (true) { - try { - GridCacheVersion readVer = null; - EntryGetResult getRes = null; + // If we have user-passed filter, we must read value into entry for peek(). + if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) + retval = true; - Object transformClo = - (txEntry.op() == TRANSFORM && - cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.entryProcessors()) : null; + boolean invoke = txEntry.op() == TRANSFORM; - if (needVer) { - getRes = txEntry.cached().innerGetVersioned( - null, - this, - /*swap*/true, - /*unmarshal*/true, - /*update-metrics*/true, - /*event*/!skipVals, - CU.subjectId(this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary(), - null); + if (retval || invoke) { + if (!cacheCtx.isNear()) { + if (!hasPrevVal) { + // For non-local cache should read from store after lock on primary. + boolean readThrough = cacheCtx.isLocal() && + (invoke || cacheCtx.loadPreviousValue()) && + !txEntry.skipStore(); - if (getRes != null) { - val = getRes.value(); - readVer = getRes.version(); - } - } - else { - val = txEntry.cached().innerGet( + v = cached.innerGet( null, this, /*swap*/true, - /*read-through*/false, - /*metrics*/true, - /*event*/!skipVals, + readThrough, + /*metrics*/!invoke, + /*event*/!invoke && !dht(), /*temporary*/false, CU.subjectId(this, cctx), - transformClo, + null, resolveTaskName(), null, txEntry.keepBinary()); } - - if (val != null) { - if (!readCommitted() && !skipVals) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - cacheCtx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - getRes, - readVer, - 0, - 0, - needVer); - } - else - missed.put(key, txEntry.cached().version()); - - break; } - catch (GridCacheEntryRemovedException ignored) { - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); + else { + if (!hasPrevVal) + v = cached.rawGetOrUnmarshal(false); } - } - } - } - // First time access within transaction. - else { - if (lockKeys == null && !skipVals) - lockKeys = single ? Collections.singleton(key) : new ArrayList(keysCnt); - if (!single && !skipVals) - lockKeys.add(key); + if (txEntry.op() == TRANSFORM) { + if (computeInvoke) { + GridCacheVersion ver; + + try { + ver = cached.version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer); + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - try { - GridCacheVersion ver = entry.version(); - - CacheObject val = null; - GridCacheVersion readVer = null; - EntryGetResult getRes = null; - - if (!pessimistic() || readCommitted() && !skipVals) { - IgniteCacheExpiryPolicy accessPlc = - optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; - - if (needReadVer) { - getRes = primaryLocal(entry) ? - entry.innerGetVersioned( - null, - this, - /*swap*/true, - /*unmarshal*/true, - /*metrics*/true, - /*event*/true, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc, - !deserializeBinary, - null) : null; - - if (getRes != null) { - val = getRes.value(); - readVer = getRes.version(); + ver = null; } - } - else { - val = entry.innerGet( - null, - this, - /*swap*/true, - /*read-through*/false, - /*metrics*/true, - /*event*/true, - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - accessPlc, - !deserializeBinary); - } - if (val != null) { - cacheCtx.addResult(map, - key, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - getRes, - readVer, - 0, - 0, - needVer); + addInvokeResult(txEntry, v, ret, ver); } - else - missed.put(key, ver); } else - // We must wait for the lock in pessimistic mode. - missed.put(key, ver); - - if (!readCommitted() && !skipVals) { - txEntry = addEntry(READ, - val, - null, - null, - entry, - expiryPlc, - null, - true, - -1L, - -1L, - null, - skipStore, - !deserializeBinary); - - // As optimization, mark as checked immediately - // for non-pessimistic if value is not null. - if (val != null && !pessimistic()) { - txEntry.markValid(); - - if (needReadVer) { - assert readVer != null; - - txEntry.entryReadVersion(readVer); - } - } - } - - break; // While. + ret.value(cacheCtx, v, txEntry.keepBinary()); } - catch (GridCacheEntryRemovedException ignored) { + + boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); + + // For remove operation we return true only if we are removing s/t, + // i.e. cached value is not null. + ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null)); + + if (onePhaseCommit()) + txEntry.filtersPassed(pass); + + boolean updateTtl = read; + + if (pass) { + txEntry.markValid(); + if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); + log.debug("Filter passed in post lock for key: " + k); } - finally { - if (entry != null && readCommitted()) { - if (cacheCtx.isNear()) { - if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) { - if (entry.markObsolete(xidVer)) - cacheCtx.cache().removeEntry(entry); - } - } - else - entry.context().evicts().touch(entry, topVer); + else { + // Revert operation to previous. (if no - NOOP, so entry will be unlocked). + txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); + txEntry.filters(CU.empty0()); + txEntry.filtersSet(false); + + updateTtl = !cacheCtx.putIfAbsentFilter(filter); + } + + if (updateTtl) { + if (!read) { + ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry); + + if (expiryPlc != null) + txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess())); } + else + txEntry.ttl(accessTtl); } + + break; // While. + } + // If entry cached within transaction got removed before lock. + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in putAllAsync method (will retry): " + cached); + + txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion())); } } } - - return lockKeys != null ? lockKeys : Collections.emptyList(); } /** - * @param ctx Cache context. - * @param key Key. - * @param expiryPlc Expiry policy. - * @return Expiry policy wrapper for entries accessed locally in optimistic transaction. + * @param txEntry Entry. + * @param cacheVal Value. + * @param ret Return value to update. + * @param ver Entry version. */ - protected IgniteCacheExpiryPolicy accessPolicy( - GridCacheContext ctx, - IgniteTxKey key, - @Nullable ExpiryPolicy expiryPlc - ) { - return null; - } - - /** - * @param cacheCtx Cache context. - * @param keys Keys. - * @return Expiry policy. - */ - protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection keys) { - return null; - } - - /** - * @param cacheCtx Cache context. - * @param topVer Topology version. - * @param map Return map. - * @param missedMap Missed keys. - * @param deserializeBinary Deserialize binary flag. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects flag. - * @param skipStore Skip store flag. - * @param expiryPlc Expiry policy. - * @return Loaded key-value pairs. - */ - private IgniteInternalFuture> checkMissed( - final GridCacheContext cacheCtx, - final AffinityTopologyVersion topVer, - final Map map, - final Map missedMap, - final boolean deserializeBinary, - final boolean skipVals, - final boolean keepCacheObjects, - final boolean skipStore, - final boolean needVer, - final ExpiryPolicy expiryPlc - - ) { - if (log.isDebugEnabled()) - log.debug("Loading missed values for missed map: " + missedMap); - - final boolean needReadVer = (serializable() && optimistic()) || needVer; - - return new GridEmbeddedFuture<>( - new C2>() { - @Override public Map apply(Void v, Exception e) { - if (e != null) { - setRollbackOnly(); - - throw new GridClosureException(e); - } - - return map; - } - }, - loadMissing( - cacheCtx, - topVer, - !skipStore, - false, - missedMap.keySet(), - skipVals, - needReadVer, - !deserializeBinary, - expiryPlc, - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { - if (isRollbackOnly()) { - if (log.isDebugEnabled()) - log.debug("Ignoring loaded value for read because transaction was rolled back: " + - IgniteTxLocalAdapter.this); - - return; - } - - CacheObject cacheVal = cacheCtx.toCacheObject(val); - - CacheObject visibleVal = cacheVal; - - IgniteTxKey txKey = cacheCtx.txKey(key); - - IgniteTxEntry txEntry = entry(txKey); - - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); - - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } - - assert txEntry != null || readCommitted() || skipVals; - - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); - - if (readCommitted() || skipVals) { - cacheCtx.evicts().touch(e, topologyVersion()); - - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0); - } - } - else { - assert txEntry != null; - - txEntry.setAndMarkValid(cacheVal); - - if (needReadVer) { - assert loadVer != null; - - txEntry.entryReadVersion(loadVer); - } - - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0); - } - } - } - }) - ); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture> getAllAsync( - final GridCacheContext cacheCtx, - @Nullable final AffinityTopologyVersion entryTopVer, - Collection keys, - final boolean deserializeBinary, - final boolean skipVals, - final boolean keepCacheObjects, - final boolean skipStore, - final boolean needVer) { - if (F.isEmpty(keys)) - return new GridFinishedFuture<>(Collections.emptyMap()); - - init(); - - int keysCnt = keys.size(); - - boolean single = keysCnt == 1; - - try { - checkValid(); - - final Map retMap = new GridLeanMap<>(keysCnt); - - final Map missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0); - - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - - ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null; - - final Collection lockKeys = enlistRead(cacheCtx, - entryTopVer, - keys, - expiryPlc, - retMap, - missed, - keysCnt, - deserializeBinary, - skipVals, - keepCacheObjects, - skipStore, - needVer); - - if (single && missed.isEmpty()) - return new GridFinishedFuture<>(retMap); - - // Handle locks. - if (pessimistic() && !readCommitted() && !skipVals) { - if (expiryPlc == null) - expiryPlc = cacheCtx.expiry(); - - long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED; - long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED; - - long timeout = remainingTime(); - - if (timeout == -1) - return new GridFinishedFuture<>(timeoutException()); - - IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(lockKeys, - timeout, - this, - true, - true, - isolation, - isInvalidate(), - createTtl, - accessTtl); - - final ExpiryPolicy expiryPlc0 = expiryPlc; - - PLC2> plc2 = new PLC2>() { - @Override public IgniteInternalFuture> postLock() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for read on keys: " + lockKeys); - - // Load keys only after the locks have been acquired. - for (KeyCacheObject cacheKey : lockKeys) { - K keyVal = (K) - (keepCacheObjects ? cacheKey : - cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary)); - - if (retMap.containsKey(keyVal)) - // We already have a return value. - continue; - - IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - - IgniteTxEntry txEntry = entry(txKey); - - assert txEntry != null; - - // Check if there is cached value. - while (true) { - GridCacheEntryEx cached = txEntry.cached(); - - CacheObject val = null; - GridCacheVersion readVer = null; - EntryGetResult getRes = null; - - try { - Object transformClo = - (!F.isEmpty(txEntry.entryProcessors()) && - cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? - F.first(txEntry.entryProcessors()) : null; - - if (needVer) { - getRes = cached.innerGetVersioned( - null, - IgniteTxLocalAdapter.this, - /*swap*/cacheCtx.isSwapOrOffheapEnabled(), - /*unmarshal*/true, - /*update-metrics*/true, - /*event*/!skipVals, - CU.subjectId(IgniteTxLocalAdapter.this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary(), - null); - - if (getRes != null) { - val = getRes.value(); - readVer = getRes.version(); - } - } - else{ - val = cached.innerGet( - null, - IgniteTxLocalAdapter.this, - cacheCtx.isSwapOrOffheapEnabled(), - /*read-through*/false, - /*metrics*/true, - /*events*/!skipVals, - /*temporary*/false, - CU.subjectId(IgniteTxLocalAdapter.this, cctx), - transformClo, - resolveTaskName(), - null, - txEntry.keepBinary()); - } - - // If value is in cache and passed the filter. - if (val != null) { - missed.remove(cacheKey); - - txEntry.setAndMarkValid(val); - - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - cacheCtx.addResult(retMap, - cacheKey, - val, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - getRes, - readVer, - 0, - 0, - needVer); - - if (readVer != null) - txEntry.entryReadVersion(readVer); - } - - // Even though we bring the value back from lock acquisition, - // we still need to recheck primary node for consistent values - // in case of concurrent transactional locks. - - break; // While. - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed exception in get postLock (will retry): " + - cached); - - txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion())); - } - } - } - - if (!missed.isEmpty() && cacheCtx.isLocal()) { - AffinityTopologyVersion topVer = topologyVersionSnapshot(); - - if (topVer == null) - topVer = entryTopVer; - - return checkMissed(cacheCtx, - topVer != null ? topVer : topologyVersion(), - retMap, - missed, - deserializeBinary, - skipVals, - keepCacheObjects, - skipStore, - needVer, - expiryPlc0); - } - - return new GridFinishedFuture<>(Collections.emptyMap()); - } - }; - - FinishClosure> finClos = new FinishClosure>() { - @Override Map finish(Map loaded) { - retMap.putAll(loaded); - - return retMap; - } - }; - - if (fut.isDone()) { - try { - IgniteInternalFuture> fut1 = plc2.apply(fut.get(), null); - - return fut1.isDone() ? - new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) : - new GridEmbeddedFuture<>(finClos, fut1); - } - catch (GridClosureException e) { - return new GridFinishedFuture<>(e.unwrap()); - } - catch (IgniteCheckedException e) { - try { - return plc2.apply(false, e); - } - catch (Exception e1) { - return new GridFinishedFuture<>(e1); - } - } - } - else { - return new GridEmbeddedFuture<>( - fut, - plc2, - finClos); - } - } - else { - assert optimistic() || readCommitted() || skipVals; - - if (!missed.isEmpty()) { - if (!readCommitted()) - for (Iterator it = missed.keySet().iterator(); it.hasNext(); ) { - KeyCacheObject cacheKey = it.next(); - - K keyVal = - (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false)); - - if (retMap.containsKey(keyVal)) - it.remove(); - } - - if (missed.isEmpty()) - return new GridFinishedFuture<>(retMap); - - AffinityTopologyVersion topVer = topologyVersionSnapshot(); - - if (topVer == null) - topVer = entryTopVer; - - return checkMissed(cacheCtx, - topVer != null ? topVer : topologyVersion(), - retMap, - missed, - deserializeBinary, - skipVals, - keepCacheObjects, - skipStore, - needVer, - expiryPlc); - } - - return new GridFinishedFuture<>(retMap); - } - } - catch (IgniteCheckedException e) { - setRollbackOnly(); - - return new GridFinishedFuture<>(e); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture putAllAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Map map, - boolean retval - ) { - return (IgniteInternalFuture)putAllAsync0(cacheCtx, - entryTopVer, - map, - null, - null, - null, - retval); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - K key, - V val, - boolean retval, - CacheEntryPredicate filter) { - return putAsync0(cacheCtx, - entryTopVer, - key, - val, - null, - null, - retval, - filter); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture invokeAsync(GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - K key, - EntryProcessor entryProcessor, - Object... invokeArgs) { - return (IgniteInternalFuture)putAsync0(cacheCtx, - entryTopVer, - key, - null, - entryProcessor, - invokeArgs, - true, - null); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllDrAsync( - GridCacheContext cacheCtx, - Map drMap - ) { - Map map = F.viewReadOnly(drMap, new IgniteClosure() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); - - return this.putAllAsync0(cacheCtx, - null, - map, - null, - null, - drMap, - false); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture invokeAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - @Nullable Map> map, - Object... invokeArgs - ) { - return (IgniteInternalFuture)putAllAsync0(cacheCtx, - entryTopVer, - null, - map, - invokeArgs, - null, - true); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllDrAsync( - GridCacheContext cacheCtx, - Map drMap - ) { - return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false); - } - - /** - * Checks filter for non-pessimistic transactions. - * - * @param cctx Cache context. - * @param key Key. - * @param val Value. - * @param filter Filter to check. - * @return {@code True} if passed or pessimistic. - */ - private boolean filter( - GridCacheContext cctx, - KeyCacheObject key, - CacheObject val, - CacheEntryPredicate[] filter) { - return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter); - } - - /** - * @param cacheCtx Cache context. - * @param cacheKey Key to enlist. - * @param val Value. - * @param expiryPlc Explicitly specified expiry policy for entry. - * @param entryProcessor Entry processor (for invoke operation). - * @param invokeArgs Optional arguments for EntryProcessor. - * @param retval Flag indicating whether a value should be returned. - * @param lockOnly If {@code true}, then entry will be enlisted as noop. - * @param filter User filters. - * @param ret Return value. - * @param skipStore Skip store flag. - * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. - * @return Future for entry values loading. - */ - private IgniteInternalFuture enlistWrite( - final GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - KeyCacheObject cacheKey, - Object val, - @Nullable ExpiryPolicy expiryPlc, - @Nullable EntryProcessor entryProcessor, - @Nullable Object[] invokeArgs, - final boolean retval, - boolean lockOnly, - final CacheEntryPredicate[] filter, - final GridCacheReturn ret, - boolean skipStore, - final boolean singleRmv, - boolean keepBinary, - Byte dataCenterId) { - try { - addActiveCache(cacheCtx); - - final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); - final boolean needVal = singleRmv || retval || hasFilters; - final boolean needReadVer = needVal && (serializable() && optimistic()); - - if (entryProcessor != null) - transform = true; - - GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null; - - boolean loadMissed = enlistWriteEntry(cacheCtx, - entryTopVer, - cacheKey, - val, - entryProcessor, - invokeArgs, - expiryPlc, - retval, - lockOnly, - filter, - /*drVer*/drVer, - /*drTtl*/-1L, - /*drExpireTime*/-1L, - ret, - /*enlisted*/null, - skipStore, - singleRmv, - hasFilters, - needVal, - needReadVer, - keepBinary); - - if (loadMissed) { - AffinityTopologyVersion topVer = topologyVersionSnapshot(); - - if (topVer == null) - topVer = entryTopVer; - - return loadMissing(cacheCtx, - topVer != null ? topVer : topologyVersion(), - Collections.singleton(cacheKey), - filter, - ret, - needReadVer, - singleRmv, - hasFilters, - /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, - retval, - keepBinary, - expiryPlc); - } - - return new GridFinishedFuture<>(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - /** - * Internal routine for putAll(..) - * - * @param cacheCtx Cache context. - * @param keys Keys to enlist. - * @param expiryPlc Explicitly specified expiry policy for entry. - * @param lookup Value lookup map ({@code null} for remove). - * @param invokeMap Map with entry processors for invoke operation. - * @param invokeArgs Optional arguments for EntryProcessor. - * @param retval Flag indicating whether a value should be returned. - * @param lockOnly If {@code true}, then entry will be enlisted as noop. - * @param filter User filters. - * @param ret Return value. - * @param enlisted Collection of keys enlisted into this transaction. - * @param drPutMap DR put map (optional). - * @param drRmvMap DR remove map (optional). - * @param skipStore Skip store flag. - * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. - * @param keepBinary Keep binary flag. - * @param dataCenterId Optional data center ID. - * @return Future for missing values loading. - */ - private IgniteInternalFuture enlistWrite( - final GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Collection keys, - @Nullable ExpiryPolicy expiryPlc, - @Nullable Map lookup, - @Nullable Map> invokeMap, - @Nullable Object[] invokeArgs, - final boolean retval, - boolean lockOnly, - final CacheEntryPredicate[] filter, - final GridCacheReturn ret, - Collection enlisted, - @Nullable Map drPutMap, - @Nullable Map drRmvMap, - boolean skipStore, - final boolean singleRmv, - final boolean keepBinary, - Byte dataCenterId - ) { - assert retval || invokeMap == null; - - try { - addActiveCache(cacheCtx); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - - boolean rmv = lookup == null && invokeMap == null; - - final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); - final boolean needVal = singleRmv || retval || hasFilters; - final boolean needReadVer = needVal && (serializable() && optimistic()); - - try { - // Set transform flag for transaction. - if (invokeMap != null) - transform = true; - - Set missedForLoad = null; - - for (Object key : keys) { - if (key == null) { - rollback(); - - throw new NullPointerException("Null key."); - } - - Object val = rmv || lookup == null ? null : lookup.get(key); - EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); - - GridCacheVersion drVer; - long drTtl; - long drExpireTime; - - if (drPutMap != null) { - GridCacheDrInfo info = drPutMap.get(key); - - assert info != null; - - drVer = info.version(); - drTtl = info.ttl(); - drExpireTime = info.expireTime(); - } - else if (drRmvMap != null) { - assert drRmvMap.get(key) != null; - - drVer = drRmvMap.get(key); - drTtl = -1L; - drExpireTime = -1L; - } - else if (dataCenterId != null) { - drVer = cctx.versions().next(dataCenterId); - drTtl = -1L; - drExpireTime = -1L; - } - else { - drVer = null; - drTtl = -1L; - drExpireTime = -1L; - } - - if (!rmv && val == null && entryProcessor == null) { - setRollbackOnly(); - - throw new NullPointerException("Null value."); - } - - KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); - - boolean loadMissed = enlistWriteEntry(cacheCtx, - entryTopVer, - cacheKey, - val, - entryProcessor, - invokeArgs, - expiryPlc, - retval, - lockOnly, - filter, - drVer, - drTtl, - drExpireTime, - ret, - enlisted, - skipStore, - singleRmv, - hasFilters, - needVal, - needReadVer, - keepBinary); - - if (loadMissed) { - if (missedForLoad == null) - missedForLoad = new HashSet<>(); - - missedForLoad.add(cacheKey); - } - } - - if (missedForLoad != null) { - AffinityTopologyVersion topVer = topologyVersionSnapshot(); - - if (topVer == null) - topVer = entryTopVer; - - return loadMissing(cacheCtx, - topVer != null ? topVer : topologyVersion(), - missedForLoad, - filter, - ret, - needReadVer, - singleRmv, - hasFilters, - /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, - retval, - keepBinary, - expiryPlc); - } - - return new GridFinishedFuture<>(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - } - - /** - * @param cacheCtx Cache context. - * @param keys Keys to load. - * @param filter Filter. - * @param ret Return value. - * @param needReadVer Read version flag. - * @param singleRmv {@code True} for single remove operation. - * @param hasFilters {@code True} if filters not empty. - * @param readThrough Read through flag. - * @param retval Return value flag. - * @param expiryPlc Expiry policy. - * @return Load future. - */ - private IgniteInternalFuture loadMissing( - final GridCacheContext cacheCtx, - final AffinityTopologyVersion topVer, - final Set keys, - final CacheEntryPredicate[] filter, - final GridCacheReturn ret, - final boolean needReadVer, - final boolean singleRmv, - final boolean hasFilters, - final boolean readThrough, - final boolean retval, - final boolean keepBinary, - final ExpiryPolicy expiryPlc) { - GridInClosure3 c = - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - - IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - - assert e != null; - - if (needReadVer) { - assert loadVer != null; - - e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); - } - - if (singleRmv) { - assert !hasFilters && !retval; - assert val == null || Boolean.TRUE.equals(val) : val; - - ret.set(cacheCtx, null, val != null, keepBinary); - } - else { - CacheObject cacheVal = cacheCtx.toCacheObject(val); - - if (e.op() == TRANSFORM) { - GridCacheVersion ver; - - e.readValue(cacheVal); - - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; - - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - - ver = null; - } - - addInvokeResult(e, cacheVal, ret, ver); - } - else { - boolean success; - - if (hasFilters) { - success = isAll(e.context(), key, cacheVal, filter); - - if (!success) - e.value(cacheVal, false, false); - } - else - success = true; - - ret.set(cacheCtx, cacheVal, success, keepBinary); - } - } - } - }; - - return loadMissing( - cacheCtx, - topVer, - readThrough, - /*async*/true, - keys, - /*skipVals*/singleRmv, - needReadVer, - keepBinary, - expiryPlc, - c); - } - - /** - * @param cacheCtx Cache context. - * @param cacheKey Key. - * @param val Value. - * @param entryProcessor Entry processor. - * @param invokeArgs Optional arguments for EntryProcessor. - * @param expiryPlc Explicitly specified expiry policy for entry. - * @param retval Return value flag. - * @param lockOnly Lock only flag. - * @param filter Filter. - * @param drVer DR version. - * @param drTtl DR ttl. - * @param drExpireTime DR expire time. - * @param ret Return value. - * @param enlisted Enlisted keys collection. - * @param skipStore Skip store flag. - * @param singleRmv {@code True} for single remove operation. - * @param hasFilters {@code True} if filters not empty. - * @param needVal {@code True} if value is needed. - * @param needReadVer {@code True} if need read entry version. - * @return {@code True} if entry value should be loaded. - * @throws IgniteCheckedException If failed. - */ - private boolean enlistWriteEntry(GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - final KeyCacheObject cacheKey, - @Nullable final Object val, - @Nullable final EntryProcessor entryProcessor, - @Nullable final Object[] invokeArgs, - @Nullable final ExpiryPolicy expiryPlc, - final boolean retval, - final boolean lockOnly, - final CacheEntryPredicate[] filter, - final GridCacheVersion drVer, - final long drTtl, - long drExpireTime, - final GridCacheReturn ret, - @Nullable final Collection enlisted, - boolean skipStore, - boolean singleRmv, - boolean hasFilters, - final boolean needVal, - boolean needReadVer, - boolean keepBinary - ) throws IgniteCheckedException { - boolean loadMissed = false; - - final boolean rmv = val == null && entryProcessor == null; - - IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - - IgniteTxEntry txEntry = entry(txKey); - - // First time access. - if (txEntry == null) { - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion()); - - try { - entry.unswap(false); - - // Check if lock is being explicitly acquired by the same thread. - if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && - entry.lockedByThread(threadId, xidVer)) { - throw new IgniteCheckedException("Cannot access key within transaction if lock is " + - "externally held [key=" + CU.value(cacheKey, cacheCtx, false) + - ", entry=" + entry + - ", xidVer=" + xidVer + - ", threadId=" + threadId + - ", locNodeId=" + cctx.localNodeId() + ']'); - } - - CacheObject old = null; - GridCacheVersion readVer = null; - - if (optimistic() && !implicit()) { - try { - if (needReadVer) { - EntryGetResult res = primaryLocal(entry) ? - entry.innerGetVersioned( - null, - this, - /*swap*/false, - /*unmarshal*/retval || needVal, - /*metrics*/retval, - /*events*/retval, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null, - keepBinary, - null) : null; - - if (res != null) { - old = res.value(); - readVer = res.version(); - } - } - else { - old = entry.innerGet( - null, - this, - /*swap*/false, - /*read-through*/false, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null, - keepBinary); - } - } - catch (ClusterTopologyCheckedException e) { - entry.context().evicts().touch(entry, topologyVersion()); - - throw e; - } - } - else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; - - if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { - ret.set(cacheCtx, old, false, keepBinary); - - if (!readCommitted()) { - if (optimistic() && serializable()) { - txEntry = addEntry(op, - old, - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore, - keepBinary); - } - else { - txEntry = addEntry(READ, - old, - null, - null, - entry, - null, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore, - keepBinary); - } - - txEntry.markValid(); - - if (needReadVer) { - assert readVer != null; - - txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } - } - - if (readCommitted()) - cacheCtx.evicts().touch(entry, topologyVersion()); - - break; // While. - } - - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore, - keepBinary); - - if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) - cacheCtx.evicts().touch(entry, topologyVersion()); - - if (enlisted != null) - enlisted.add(cacheKey); - - if (!pessimistic() && !implicit()) { - txEntry.markValid(); - - if (old == null) { - if (needVal) - loadMissed = true; - else { - assert !implicit() || !transform : this; - assert txEntry.op() != TRANSFORM : txEntry; - - if (retval) - ret.set(cacheCtx, null, true, keepBinary); - else - ret.success(true); - } - } - else { - if (needReadVer) { - assert readVer != null; - - txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } - - if (retval && !transform) - ret.set(cacheCtx, old, true, keepBinary); - else { - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; - - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : txEntry; - - if (log.isDebugEnabled()) - log.debug("Failed to get entry version " + - "[err=" + ex.getMessage() + ']'); - - ver = null; - } - - addInvokeResult(txEntry, old, ret, ver); - } - else - ret.success(true); - } - } - } - // Pessimistic. - else { - if (retval && !transform) - ret.set(cacheCtx, old, true, keepBinary); - else - ret.success(true); - } - - break; // While. - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction putAll0 method: " + entry); - } - } - } - else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) - throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); - - GridCacheEntryEx entry = txEntry.cached(); - - CacheObject v = txEntry.value(); - - boolean del = txEntry.op() == DELETE && rmv; - - if (!del) { - if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { - ret.set(cacheCtx, v, false, keepBinary); - - return loadMissed; - } - - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : - v != null ? UPDATE : CREATE; - - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore, - keepBinary); - - if (enlisted != null) - enlisted.add(cacheKey); - - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; - - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; - - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - - ver = null; - } - - addInvokeResult(txEntry, txEntry.value(), ret, ver); - } - } - - if (!pessimistic()) { - txEntry.markValid(); - - if (retval && !transform) - ret.set(cacheCtx, v, true, keepBinary); - else - ret.success(true); - } - } - - return loadMissed; - } - - /** - * @param cctx Cache context. - * @param key Key. - * @param val Value. - * @param filter Filter. - * @return {@code True} if filter passed. - */ - private boolean isAll(GridCacheContext cctx, - KeyCacheObject key, - CacheObject val, - CacheEntryPredicate[] filter) { - GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) { - @Nullable @Override public CacheObject peekVisibleValue() { - return rawGet(); - } - }; - - for (CacheEntryPredicate p0 : filter) { - if (p0 != null && !p0.apply(e)) - return false; - } - - return true; - } - - /** - * Post lock processing for put or remove. - * - * @param cacheCtx Context. - * @param keys Keys. - * @param ret Return value. - * @param rmv {@code True} if remove. - * @param retval Flag to return value or not. - * @param read {@code True} if read. - * @param accessTtl TTL for read operation. - * @param filter Filter to check entries. - * @throws IgniteCheckedException If error. - * @param computeInvoke If {@code true} computes return value for invoke operation. - */ - @SuppressWarnings("unchecked") - protected final void postLockWrite( - GridCacheContext cacheCtx, - Iterable keys, - GridCacheReturn ret, - boolean rmv, - boolean retval, - boolean read, - long accessTtl, - CacheEntryPredicate[] filter, - boolean computeInvoke - ) throws IgniteCheckedException { - for (KeyCacheObject k : keys) { - IgniteTxEntry txEntry = entry(cacheCtx.txKey(k)); - - if (txEntry == null) - throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " + - "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']'); - - while (true) { - GridCacheEntryEx cached = txEntry.cached(); - - try { - assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() : - "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + - ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; - - if (log.isDebugEnabled()) - log.debug("Post lock write entry: " + cached); - - CacheObject v = txEntry.previousValue(); - boolean hasPrevVal = txEntry.hasPreviousValue(); - - if (onePhaseCommit()) - filter = txEntry.filters(); - - // If we have user-passed filter, we must read value into entry for peek(). - if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) - retval = true; - - boolean invoke = txEntry.op() == TRANSFORM; - - if (retval || invoke) { - if (!cacheCtx.isNear()) { - if (!hasPrevVal) { - // For non-local cache should read from store after lock on primary. - boolean readThrough = cacheCtx.isLocal() && - (invoke || cacheCtx.loadPreviousValue()) && - !txEntry.skipStore(); - - v = cached.innerGet( - null, - this, - /*swap*/true, - readThrough, - /*metrics*/!invoke, - /*event*/!invoke && !dht(), - /*temporary*/false, - CU.subjectId(this, cctx), - null, - resolveTaskName(), - null, - txEntry.keepBinary()); - } - } - else { - if (!hasPrevVal) - v = cached.rawGetOrUnmarshal(false); - } - - if (txEntry.op() == TRANSFORM) { - if (computeInvoke) { - GridCacheVersion ver; - - try { - ver = cached.version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; - - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - - ver = null; - } - - addInvokeResult(txEntry, v, ret, ver); - } - } - else - ret.value(cacheCtx, v, txEntry.keepBinary()); - } - - boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); - - // For remove operation we return true only if we are removing s/t, - // i.e. cached value is not null. - ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null)); - - if (onePhaseCommit()) - txEntry.filtersPassed(pass); - - boolean updateTtl = read; - - if (pass) { - txEntry.markValid(); - - if (log.isDebugEnabled()) - log.debug("Filter passed in post lock for key: " + k); - } - else { - // Revert operation to previous. (if no - NOOP, so entry will be unlocked). - txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); - txEntry.filters(CU.empty0()); - txEntry.filtersSet(false); - - updateTtl = !cacheCtx.putIfAbsentFilter(filter); - } - - if (updateTtl) { - if (!read) { - ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry); - - if (expiryPlc != null) - txEntry.ttl(CU.toTtl(expiryPlc.getExpiryForAccess())); - } - else - txEntry.ttl(accessTtl); - } - - break; // While. - } - // If entry cached within transaction got removed before lock. - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in putAllAsync method (will retry): " + cached); - - txEntry.cached(entryEx(cached.context(), txEntry.txK