Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0F0D5200C5D for ; Thu, 23 Mar 2017 08:15:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0DC9C160B84; Thu, 23 Mar 2017 07:15:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2F706160B9E for ; Thu, 23 Mar 2017 08:15:39 +0100 (CET) Received: (qmail 16112 invoked by uid 500); 23 Mar 2017 07:15:37 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 15887 invoked by uid 99); 23 Mar 2017 07:15:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Mar 2017 07:15:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14FDEDFFF0; Thu, 23 Mar 2017 07:15:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Thu, 23 Mar 2017 07:15:40 -0000 Message-Id: In-Reply-To: <1bc61bea256a42b3a0825603bd46da22@git.apache.org> References: <1bc61bea256a42b3a0825603bd46da22@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [abbrv] ignite git commit: Internal cache API cleanup. archived-at: Thu, 23 Mar 2017 07:15:44 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8ed749c..81606d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -18,24 +18,37 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.cache.Cache; +import javax.cache.CacheException; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; +import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; @@ -46,31 +59,52 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; +import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -83,7 +117,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; * Replicated user transaction. */ @SuppressWarnings("unchecked") -public class GridNearTxLocal extends GridDhtTxLocalAdapter { +public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoCloseable { /** */ private static final long serialVersionUID = 0L; @@ -135,6 +169,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** */ private boolean hasRemoteLocks; + /** If this transaction contains transform entries. */ + protected boolean transform; + + /** */ + @GridToStringExclude + private TransactionProxyImpl proxy; + /** * Empty constructor required for {@link Externalizable}. */ @@ -244,14 +285,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** * Marks transaction to check if commit on backup. */ - public void markForBackupCheck() { + void markForBackupCheck() { needCheckBackup = true; } /** * @return If need to check tx commit on backup. */ - public boolean onNeedCheckBackup() { + boolean onNeedCheckBackup() { Boolean check = needCheckBackup; if (check != null && check) { @@ -260,52 +301,2127 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return true; } - return false; - } + return false; + } + + /** + * @return If backup check was requested. + */ + boolean needCheckBackup() { + return needCheckBackup != null; + } + + /** + * @return {@code True} if transaction contains at least one near cache key mapped to the local node. + */ + public boolean nearLocallyMapped() { + return nearLocallyMapped; + } + + /** + * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local node. + */ + void nearLocallyMapped(boolean nearLocallyMapped) { + this.nearLocallyMapped = nearLocallyMapped; + } + + /** + * @return {@code True} if transaction contains colocated key mapped to the local node. + */ + public boolean colocatedLocallyMapped() { + return colocatedLocallyMapped; + } + + /** + * @param colocatedLocallyMapped {@code True} if transaction contains colocated key mapped to the local node. + */ + public void colocatedLocallyMapped(boolean colocatedLocallyMapped) { + this.colocatedLocallyMapped = colocatedLocallyMapped; + } + + /** {@inheritDoc} */ + @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) { + return entry.detached() || super.ownsLockUnsafe(entry); + } + + /** {@inheritDoc} */ + @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException { + return entry.detached() || super.ownsLock(entry); + } + + /** + * @param cacheCtx Cache context. + * @param map Map to put. + * @param retval Flag indicating whether a value should be returned. + * @return Future for put operation. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture putAllAsync( + GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + Map map, + boolean retval + ) { + return (IgniteInternalFuture)putAllAsync0(cacheCtx, + entryTopVer, + map, + null, + null, + null, + retval); + } + + /** + * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param retval Return value flag. + * @param filter Filter. + * @return Future for put operation. + */ + public final IgniteInternalFuture putAsync( + GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + K key, + V val, + boolean retval, + CacheEntryPredicate filter) { + return putAsync0(cacheCtx, + entryTopVer, + key, + val, + null, + null, + retval, + filter); + } + + /** + * @param cacheCtx Cache context. + * @param key Key. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for entry processor. + * @return Operation future. + */ + public IgniteInternalFuture invokeAsync(GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + K key, + EntryProcessor entryProcessor, + Object... invokeArgs) { + return (IgniteInternalFuture)putAsync0(cacheCtx, + entryTopVer, + key, + null, + entryProcessor, + invokeArgs, + true, + null); + } + + /** + * @param cacheCtx Cache context. + * @param map Entry processors map. + * @param invokeArgs Optional arguments for entry processor. + * @return Operation future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture invokeAsync( + GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + @Nullable Map> map, + Object... invokeArgs + ) { + return (IgniteInternalFuture)putAllAsync0(cacheCtx, + entryTopVer, + null, + map, + invokeArgs, + null, + true); + } + + /** + * @param cacheCtx Cache context. + * @param drMap DR map to put. + * @return Future for DR put operation. + */ + public IgniteInternalFuture putAllDrAsync( + GridCacheContext cacheCtx, + Map drMap + ) { + Map map = F.viewReadOnly(drMap, new IgniteClosure() { + @Override public Object apply(GridCacheDrInfo val) { + return val.value(); + } + }); + + return this.putAllAsync0(cacheCtx, + null, + map, + null, + null, + drMap, + false); + } + + /** + * @param cacheCtx Cache context. + * @param drMap DR map. + * @return Future for asynchronous remove. + */ + public IgniteInternalFuture removeAllDrAsync( + GridCacheContext cacheCtx, + Map drMap + ) { + return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false); + } + + /** + * @param cacheCtx Cache context. + * @param keys Keys to remove. + * @param retval Flag indicating whether a value should be returned. + * @param filter Filter. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @return Future for asynchronous remove. + */ + public IgniteInternalFuture removeAllAsync( + GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + Collection keys, + boolean retval, + CacheEntryPredicate filter, + boolean singleRmv + ) { + return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv); + } + + /** + * Internal method for single update operation. + * + * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Return value flag. + * @param filter Filter. + * @return Operation future. + */ + private IgniteInternalFuture putAsync0( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + K key, + @Nullable V val, + @Nullable EntryProcessor entryProcessor, + @Nullable final Object[] invokeArgs, + final boolean retval, + @Nullable final CacheEntryPredicate filter + ) { + assert key != null; + + try { + beforePut(cacheCtx, retval); + + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); + + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null; + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + + boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + final CacheEntryPredicate[] filters = CU.filterArray(filter); + + final IgniteInternalFuture loadFut = enlistWrite( + cacheCtx, + entryTopVer, + cacheKey, + val, + opCtx != null ? opCtx.expiry() : null, + entryProcessor, + invokeArgs, + retval, + /*lockOnly*/false, + filters, + ret, + opCtx != null && opCtx.skipStore(), + /*singleRmv*/false, + keepBinary, + dataCenterId); + + if (pessimistic()) { + assert loadFut == null || loadFut.isDone() : loadFut; + + if (loadFut != null) + loadFut.get(); + + final Collection enlisted = Collections.singleton(cacheKey); + + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for put on key: " + enlisted); + + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + + IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, + this, + /*read*/entryProcessor != null, // Needed to force load from store. + retval, + isolation, + isInvalidate(), + -1L, + -1L); + + PLC1 plc1 = new PLC1(ret) { + @Override public GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for put on keys: " + enlisted); + + postLockWrite(cacheCtx, + enlisted, + ret, + /*remove*/false, + retval, + /*read*/false, + -1L, + filters, + /*computeInvoke*/true); + + return ret; + } + }; + + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return nonInterruptable(plc1.apply(false, e)); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else { + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + } + else + return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** + * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} + * maps must be non-null. + * + * @param cacheCtx Context. + * @param map Key-value map to store. + * @param invokeMap Invoke map. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param drMap DR map. + * @param retval Key-transform value map to store. + * @return Operation future. + */ + @SuppressWarnings("unchecked") + private IgniteInternalFuture putAllAsync0( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + @Nullable Map map, + @Nullable Map> invokeMap, + @Nullable final Object[] invokeArgs, + @Nullable Map drMap, + final boolean retval + ) { + try { + beforePut(cacheCtx, retval); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + final CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + final Byte dataCenterId; + + if (opCtx != null && opCtx.hasDataCenterId()) { + assert drMap == null : drMap; + assert map != null || invokeMap != null; + + dataCenterId = opCtx.dataCenterId(); + } + else + dataCenterId = null; + + // Cached entry may be passed only from entry wrapper. + final Map map0 = map; + final Map> invokeMap0 = (Map>)invokeMap; + + if (log.isDebugEnabled()) + log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); + + assert map0 != null || invokeMap0 != null; + + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); + + if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { + if (implicit()) + try { + commit(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + return new GridFinishedFuture<>(ret.success(true)); + } + + try { + Set keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); + + final Collection enlisted = new ArrayList<>(keySet.size()); + + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + final IgniteInternalFuture loadFut = enlistWrite( + cacheCtx, + entryTopVer, + keySet, + opCtx != null ? opCtx.expiry() : null, + map0, + invokeMap0, + invokeArgs, + retval, + false, + CU.filterArray(null), + ret, + enlisted, + drMap, + null, + opCtx != null && opCtx.skipStore(), + false, + keepBinary, + dataCenterId); + + if (pessimistic()) { + assert loadFut == null || loadFut.isDone() : loadFut; + + if (loadFut != null) { + try { + loadFut.get(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + } + + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for put on keys: " + enlisted); + + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + + IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, + this, + /*read*/invokeMap != null, // Needed to force load from store. + retval, + isolation, + isInvalidate(), + -1L, + -1L); + + PLC1 plc1 = new PLC1(ret) { + @Override public GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for put on keys: " + enlisted); + + postLockWrite(cacheCtx, + enlisted, + ret, + /*remove*/false, + retval, + /*read*/false, + -1L, + CU.filterArray(null), + /*computeInvoke*/true); + + return ret; + } + }; + + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return nonInterruptable(plc1.apply(false, e)); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else { + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + } + else + return optimisticPutFuture(cacheCtx, loadFut, ret, keepBinary); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** + * @param cacheCtx Cache context. + * @param cacheKey Key to enlist. + * @param val Value. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param entryProcessor Entry processor (for invoke operation). + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Flag indicating whether a value should be returned. + * @param lockOnly If {@code true}, then entry will be enlisted as noop. + * @param filter User filters. + * @param ret Return value. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @return Future for entry values loading. + */ + private IgniteInternalFuture enlistWrite( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + KeyCacheObject cacheKey, + Object val, + @Nullable ExpiryPolicy expiryPlc, + @Nullable EntryProcessor entryProcessor, + @Nullable Object[] invokeArgs, + final boolean retval, + boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + boolean skipStore, + final boolean singleRmv, + boolean keepBinary, + Byte dataCenterId) { + try { + addActiveCache(cacheCtx); + + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needVal = singleRmv || retval || hasFilters; + final boolean needReadVer = needVal && (serializable() && optimistic()); + + if (entryProcessor != null) + transform = true; + + GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null; + + boolean loadMissed = enlistWriteEntry(cacheCtx, + entryTopVer, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + /*drVer*/drVer, + /*drTtl*/-1L, + /*drExpireTime*/-1L, + ret, + /*enlisted*/null, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer, + keepBinary); + + if (loadMissed) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + + return loadMissing(cacheCtx, + topVer != null ? topVer : topologyVersion(), + Collections.singleton(cacheKey), + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, + retval, + keepBinary, + expiryPlc); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** + * Internal routine for putAll(..) + * + * @param cacheCtx Cache context. + * @param keys Keys to enlist. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param lookup Value lookup map ({@code null} for remove). + * @param invokeMap Map with entry processors for invoke operation. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Flag indicating whether a value should be returned. + * @param lockOnly If {@code true}, then entry will be enlisted as noop. + * @param filter User filters. + * @param ret Return value. + * @param enlisted Collection of keys enlisted into this transaction. + * @param drPutMap DR put map (optional). + * @param drRmvMap DR remove map (optional). + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @param keepBinary Keep binary flag. + * @param dataCenterId Optional data center ID. + * @return Future for missing values loading. + */ + private IgniteInternalFuture enlistWrite( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + Collection keys, + @Nullable ExpiryPolicy expiryPlc, + @Nullable Map lookup, + @Nullable Map> invokeMap, + @Nullable Object[] invokeArgs, + final boolean retval, + boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + Collection enlisted, + @Nullable Map drPutMap, + @Nullable Map drRmvMap, + boolean skipStore, + final boolean singleRmv, + final boolean keepBinary, + Byte dataCenterId + ) { + assert retval || invokeMap == null; + + try { + addActiveCache(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + boolean rmv = lookup == null && invokeMap == null; + + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needVal = singleRmv || retval || hasFilters; + final boolean needReadVer = needVal && (serializable() && optimistic()); + + try { + // Set transform flag for transaction. + if (invokeMap != null) + transform = true; + + Set missedForLoad = null; + + for (Object key : keys) { + if (key == null) { + rollback(); + + throw new NullPointerException("Null key."); + } + + Object val = rmv || lookup == null ? null : lookup.get(key); + EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); + + GridCacheVersion drVer; + long drTtl; + long drExpireTime; + + if (drPutMap != null) { + GridCacheDrInfo info = drPutMap.get(key); + + assert info != null; + + drVer = info.version(); + drTtl = info.ttl(); + drExpireTime = info.expireTime(); + } + else if (drRmvMap != null) { + assert drRmvMap.get(key) != null; + + drVer = drRmvMap.get(key); + drTtl = -1L; + drExpireTime = -1L; + } + else if (dataCenterId != null) { + drVer = cctx.versions().next(dataCenterId); + drTtl = -1L; + drExpireTime = -1L; + } + else { + drVer = null; + drTtl = -1L; + drExpireTime = -1L; + } + + if (!rmv && val == null && entryProcessor == null) { + setRollbackOnly(); + + throw new NullPointerException("Null value."); + } + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + + boolean loadMissed = enlistWriteEntry(cacheCtx, + entryTopVer, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + drVer, + drTtl, + drExpireTime, + ret, + enlisted, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer, + keepBinary); + + if (loadMissed) { + if (missedForLoad == null) + missedForLoad = new HashSet<>(); + + missedForLoad.add(cacheKey); + } + } + + if (missedForLoad != null) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + + return loadMissing(cacheCtx, + topVer != null ? topVer : topologyVersion(), + missedForLoad, + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, + retval, + keepBinary, + expiryPlc); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** + * @param cacheCtx Cache context. + * @param cacheKey Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param retval Return value flag. + * @param lockOnly Lock only flag. + * @param filter Filter. + * @param drVer DR version. + * @param drTtl DR ttl. + * @param drExpireTime DR expire time. + * @param ret Return value. + * @param enlisted Enlisted keys collection. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param needVal {@code True} if value is needed. + * @param needReadVer {@code True} if need read entry version. + * @return {@code True} if entry value should be loaded. + * @throws IgniteCheckedException If failed. + */ + private boolean enlistWriteEntry(GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + final KeyCacheObject cacheKey, + @Nullable final Object val, + @Nullable final EntryProcessor entryProcessor, + @Nullable final Object[] invokeArgs, + @Nullable final ExpiryPolicy expiryPlc, + final boolean retval, + final boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheVersion drVer, + final long drTtl, + long drExpireTime, + final GridCacheReturn ret, + @Nullable final Collection enlisted, + boolean skipStore, + boolean singleRmv, + boolean hasFilters, + final boolean needVal, + boolean needReadVer, + boolean keepBinary + ) throws IgniteCheckedException { + boolean loadMissed = false; + + final boolean rmv = val == null && entryProcessor == null; + + IgniteTxKey txKey = cacheCtx.txKey(cacheKey); + + IgniteTxEntry txEntry = entry(txKey); + + // First time access. + if (txEntry == null) { + while (true) { + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion()); + + try { + entry.unswap(false); + + // Check if lock is being explicitly acquired by the same thread. + if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && + entry.lockedByThread(threadId, xidVer)) { + throw new IgniteCheckedException("Cannot access key within transaction if lock is " + + "externally held [key=" + CU.value(cacheKey, cacheCtx, false) + + ", entry=" + entry + + ", xidVer=" + xidVer + + ", threadId=" + threadId + + ", locNodeId=" + cctx.localNodeId() + ']'); + } + + CacheObject old = null; + GridCacheVersion readVer = null; + + if (optimistic() && !implicit()) { + try { + if (needReadVer) { + EntryGetResult res = primaryLocal(entry) ? + entry.innerGetVersioned( + null, + this, + /*swap*/false, + /*unmarshal*/retval || needVal, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null, + keepBinary, + null) : null; + + if (res != null) { + old = res.value(); + readVer = res.version(); + } + } + else { + old = entry.innerGet( + null, + this, + /*swap*/false, + /*read-through*/false, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null, + keepBinary); + } + } + catch (ClusterTopologyCheckedException e) { + entry.context().evicts().touch(entry, topologyVersion()); + + throw e; + } + } + else + old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { + ret.set(cacheCtx, old, false, keepBinary); + + if (!readCommitted()) { + if (optimistic() && serializable()) { + txEntry = addEntry(op, + old, + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore, + keepBinary); + } + else { + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.empty0(), + false, + -1L, + -1L, + null, + skipStore, + keepBinary); + } + + txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } + } + + if (readCommitted()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + break; // While. + } + + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore, + keepBinary); + + if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + if (enlisted != null) + enlisted.add(cacheKey); + + if (!pessimistic() && !implicit()) { + txEntry.markValid(); + + if (old == null) { + if (needVal) + loadMissed = true; + else { + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; + + if (retval) + ret.set(cacheCtx, null, true, keepBinary); + else + ret.success(true); + } + } + else { + if (needReadVer) { + assert readVer != null; + + txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } + + if (retval && !transform) + ret.set(cacheCtx, old, true, keepBinary); + else { + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : txEntry; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version " + + "[err=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, old, ret, ver); + } + else + ret.success(true); + } + } + } + // Pessimistic. + else { + if (retval && !transform) + ret.set(cacheCtx, old, true, keepBinary); + else + ret.success(true); + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction putAll0 method: " + entry); + } + } + } + else { + if (entryProcessor == null && txEntry.op() == TRANSFORM) + throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + + "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); + + GridCacheEntryEx entry = txEntry.cached(); + + CacheObject v = txEntry.value(); + + boolean del = txEntry.op() == DELETE && rmv; + + if (!del) { + if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { + ret.set(cacheCtx, v, false, keepBinary); + + return loadMissed; + } + + GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + v != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore, + keepBinary); + + if (enlisted != null) + enlisted.add(cacheKey); + + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, txEntry.value(), ret, ver); + } + } + + if (!pessimistic()) { + txEntry.markValid(); + + if (retval && !transform) + ret.set(cacheCtx, v, true, keepBinary); + else + ret.success(true); + } + } + + return loadMissed; + } + + /** + * @param cacheCtx Cache context. + * @param keys Keys to remove. + * @param drMap DR map. + * @param retval Flag indicating whether a value should be returned. + * @param filter Filter. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @return Future for asynchronous remove. + */ + @SuppressWarnings("unchecked") + private IgniteInternalFuture removeAllAsync0( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + @Nullable final Collection keys, + @Nullable Map drMap, + final boolean retval, + @Nullable final CacheEntryPredicate filter, + boolean singleRmv) { + try { + checkUpdatesAllowed(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + + cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE); + + if (retval) + needReturnValue(true); + + final Collection keys0; + + if (drMap != null) { + assert keys == null; + + keys0 = drMap.keySet(); + } + else + keys0 = keys; + + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + final Byte dataCenterId; + + if (opCtx != null && opCtx.hasDataCenterId()) { + assert drMap == null : drMap; + + dataCenterId = opCtx.dataCenterId(); + } + else + dataCenterId = null; + + assert keys0 != null; + + if (log.isDebugEnabled()) + log.debug(S.toString("Called removeAllAsync(...)", + "tx", this, false, + "keys", keys0, true, + "implicit", implicit, false, + "retval", retval, false)); + + try { + checkValid(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); + + if (F.isEmpty(keys0)) { + if (implicit()) { + try { + commit(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + return new GridFinishedFuture<>(ret.success(true)); + } + + init(); + + final Collection enlisted = new ArrayList<>(); + + ExpiryPolicy plc; + + final CacheEntryPredicate[] filters = CU.filterArray(filter); + + if (!F.isEmpty(filters)) + plc = opCtx != null ? opCtx.expiry() : null; + else + plc = null; + + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + final IgniteInternalFuture loadFut = enlistWrite( + cacheCtx, + entryTopVer, + keys0, + plc, + /*lookup map*/null, + /*invoke map*/null, + /*invoke arguments*/null, + retval, + /*lock only*/false, + filters, + ret, + enlisted, + null, + drMap, + opCtx != null && opCtx.skipStore(), + singleRmv, + keepBinary, + dataCenterId + ); + + if (log.isDebugEnabled()) + log.debug("Remove keys: " + enlisted); + + // Acquire locks only after having added operation to the write set. + // Otherwise, during rollback we will not know whether locks need + // to be rolled back. + if (pessimistic()) { + assert loadFut == null || loadFut.isDone() : loadFut; + + if (loadFut != null) { + try { + loadFut.get(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); + + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + + IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, + this, + false, + retval, + isolation, + isInvalidate(), + -1L, + -1L); + + PLC1 plc1 = new PLC1(ret) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for remove on keys: " + enlisted); + + postLockWrite(cacheCtx, + enlisted, + ret, + /*remove*/true, + retval, + /*read*/false, + -1L, + filters, + /*computeInvoke*/false); + + return ret; + } + }; + + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return nonInterruptable(plc1.apply(false, e)); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + else { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); + + return nonInterruptable(commitNearTxLocalAsync().chain(new CX1, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture txFut) + throws IgniteCheckedException { + try { + txFut.get(); + + return new GridCacheReturn(cacheCtx, true, keepBinary, + implicitRes.value(), implicitRes.success()); + } + catch (IgniteCheckedException | RuntimeException e) { + rollbackNearTxLocalAsync(); + + throw e; + } + } + })); + } + else { + return nonInterruptable(loadFut.chain(new CX1, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture f) + throws IgniteCheckedException { + f.get(); + + return ret; + } + })); + } + } + } + + /** + * @param cacheCtx Cache context. + * @param keys Keys to get. + * @param deserializeBinary Deserialize binary flag. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects + * @param skipStore Skip store flag. + * @return Future for this get. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture> getAllAsync( + final GridCacheContext cacheCtx, + @Nullable final AffinityTopologyVersion entryTopVer, + Collection keys, + final boolean deserializeBinary, + final boolean skipVals, + final boolean keepCacheObjects, + final boolean skipStore, + final boolean needVer) { + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(Collections.emptyMap()); + + init(); + + int keysCnt = keys.size(); + + boolean single = keysCnt == 1; + + try { + checkValid(); + + final Map retMap = new GridLeanMap<>(keysCnt); + + final Map missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0); + + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null; + + final Collection lockKeys = enlistRead(cacheCtx, + entryTopVer, + keys, + expiryPlc, + retMap, + missed, + keysCnt, + deserializeBinary, + skipVals, + keepCacheObjects, + skipStore, + needVer); + + if (single && missed.isEmpty()) + return new GridFinishedFuture<>(retMap); + + // Handle locks. + if (pessimistic() && !readCommitted() && !skipVals) { + if (expiryPlc == null) + expiryPlc = cacheCtx.expiry(); + + long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED; + long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED; + + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + + IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(lockKeys, + timeout, + this, + true, + true, + isolation, + isInvalidate(), + createTtl, + accessTtl); + + final ExpiryPolicy expiryPlc0 = expiryPlc; + + PLC2> plc2 = new PLC2>() { + @Override public IgniteInternalFuture> postLock() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for read on keys: " + lockKeys); + + // Load keys only after the locks have been acquired. + for (KeyCacheObject cacheKey : lockKeys) { + K keyVal = (K) + (keepCacheObjects ? cacheKey : + cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary)); + + if (retMap.containsKey(keyVal)) + // We already have a return value. + continue; + + IgniteTxKey txKey = cacheCtx.txKey(cacheKey); + + IgniteTxEntry txEntry = entry(txKey); + + assert txEntry != null; + + // Check if there is cached value. + while (true) { + GridCacheEntryEx cached = txEntry.cached(); + + CacheObject val = null; + GridCacheVersion readVer = null; + EntryGetResult getRes = null; + + try { + Object transformClo = + (!F.isEmpty(txEntry.entryProcessors()) && + cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? + F.first(txEntry.entryProcessors()) : null; + + if (needVer) { + getRes = cached.innerGetVersioned( + null, + GridNearTxLocal.this, + /*swap*/cacheCtx.isSwapOrOffheapEnabled(), + /*unmarshal*/true, + /*update-metrics*/true, + /*event*/!skipVals, + CU.subjectId(GridNearTxLocal.this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary(), + null); + + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); + } + } + else{ + val = cached.innerGet( + null, + GridNearTxLocal.this, + cacheCtx.isSwapOrOffheapEnabled(), + /*read-through*/false, + /*metrics*/true, + /*events*/!skipVals, + /*temporary*/false, + CU.subjectId(GridNearTxLocal.this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + } + + // If value is in cache and passed the filter. + if (val != null) { + missed.remove(cacheKey); + + txEntry.setAndMarkValid(val); + + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); + + cacheCtx.addResult(retMap, + cacheKey, + val, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + getRes, + readVer, + 0, + 0, + needVer); + + if (readVer != null) + txEntry.entryReadVersion(readVer); + } + + // Even though we bring the value back from lock acquisition, + // we still need to recheck primary node for consistent values + // in case of concurrent transactional locks. + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed exception in get postLock (will retry): " + + cached); + + txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion())); + } + } + } + + if (!missed.isEmpty() && cacheCtx.isLocal()) { + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + + return checkMissed(cacheCtx, + topVer != null ? topVer : topologyVersion(), + retMap, + missed, + deserializeBinary, + skipVals, + keepCacheObjects, + skipStore, + needVer, + expiryPlc0); + } + + return new GridFinishedFuture<>(Collections.emptyMap()); + } + }; + + FinishClosure> finClos = new FinishClosure>() { + @Override Map finish(Map loaded) { + retMap.putAll(loaded); + + return retMap; + } + }; + + if (fut.isDone()) { + try { + IgniteInternalFuture> fut1 = plc2.apply(fut.get(), null); + + return fut1.isDone() ? + new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) : + new GridEmbeddedFuture<>(finClos, fut1); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return plc2.apply(false, e); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else { + return new GridEmbeddedFuture<>( + fut, + plc2, + finClos); + } + } + else { + assert optimistic() || readCommitted() || skipVals; + + if (!missed.isEmpty()) { + if (!readCommitted()) + for (Iterator it = missed.keySet().iterator(); it.hasNext(); ) { + KeyCacheObject cacheKey = it.next(); + + K keyVal = + (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false)); + + if (retMap.containsKey(keyVal)) + it.remove(); + } + + if (missed.isEmpty()) + return new GridFinishedFuture<>(retMap); + + AffinityTopologyVersion topVer = topologyVersionSnapshot(); + + if (topVer == null) + topVer = entryTopVer; + + return checkMissed(cacheCtx, + topVer != null ? topVer : topologyVersion(), + retMap, + missed, + deserializeBinary, + skipVals, + keepCacheObjects, + skipStore, + needVer, + expiryPlc); + } + + return new GridFinishedFuture<>(retMap); + } + } + catch (IgniteCheckedException e) { + setRollbackOnly(); + + return new GridFinishedFuture<>(e); + } + } + + /** + * @param cacheCtx Cache context. + * @param keys Key to enlist. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param map Return map. + * @param missed Map of missed keys. + * @param keysCnt Keys count (to avoid call to {@code Collection.size()}). + * @param deserializeBinary Deserialize binary flag. + * @param skipVals Skip values flag. + * @param keepCacheObjects Keep cache objects flag. + * @param skipStore Skip store flag. + * @throws IgniteCheckedException If failed. + * @return Enlisted keys. + */ + @SuppressWarnings({"RedundantTypeArguments"}) + private Collection enlistRead( + final GridCacheContext cacheCtx, + @Nullable AffinityTopologyVersion entryTopVer, + Collection keys, + @Nullable ExpiryPolicy expiryPlc, + Map map, + Map missed, + int keysCnt, + boolean deserializeBinary, + boolean skipVals, + boolean keepCacheObjects, + boolean skipStore, + final boolean needVer + ) throws IgniteCheckedException { + assert !F.isEmpty(keys); + assert keysCnt == keys.size(); + + cacheCtx.checkSecurity(SecurityPermission.CACHE_READ); + + boolean single = keysCnt == 1; + + Collection lockKeys = null; + + AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion(); + + boolean needReadVer = (serializable() && optimistic()) || needVer; + + // In this loop we cover only read-committed or optimistic transactions. + // Transactions that are pessimistic and not read-committed are covered + // outside of this loop. + for (KeyCacheObject key : keys) { + if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals) + addActiveCache(cacheCtx); + + IgniteTxKey txKey = cacheCtx.txKey(key); + + // Check write map (always check writes first). + IgniteTxEntry txEntry = entry(txKey); + + // Either non-read-committed or there was a previous write. + if (txEntry != null) { + CacheObject val = txEntry.value(); + + if (txEntry.hasValue()) { + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); + + if (val != null) { + GridCacheVersion ver = null; + + if (needVer) { + if (txEntry.op() != READ) + ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED; + else { + ver = txEntry.entryReadVersion(); + + if (ver == null && pessimistic()) { + while (true) { + try { + GridCacheEntryEx cached = txEntry.cached(); + + ver = cached.isNear() ? + ((GridNearCacheEntry)cached).dhtVersion() : cached.version(); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); + } + } + } + + if (ver == null) { + assert optimistic() && repeatableRead() : this; + + ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET; + } + } + + assert ver != null; + } + + cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, + ver, 0, 0); + } + } + else { + assert txEntry.op() == TRANSFORM; + + while (true) { + try { + GridCacheVersion readVer = null; + EntryGetResult getRes = null; + + Object transformClo = + (txEntry.op() == TRANSFORM && + cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? + F.first(txEntry.entryProcessors()) : null; + + if (needVer) { + getRes = txEntry.cached().innerGetVersioned( + null, + this, + /*swap*/true, + /*unmarshal*/true, + /*update-metrics*/true, + /*event*/!skipVals, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary(), + null); + + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); + } + } + else { + val = txEntry.cached().innerGet( + null, + this, + /*swap*/true, + /*read-through*/false, + /*metrics*/true, + /*event*/!skipVals, + /*temporary*/false, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + null, + txEntry.keepBinary()); + } + + if (val != null) { + if (!readCommitted() && !skipVals) + txEntry.readValue(val); + + if (!F.isEmpty(txEntry.entryProcessors())) + val = txEntry.applyEntryProcessors(val); + + cacheCtx.addResult(map, + key, + val, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + getRes, + readVer, + 0, + 0, + needVer); + } + else + missed.put(key, txEntry.cached().version()); + + break; + } + catch (GridCacheEntryRemovedException ignored) { + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer)); + } + } + } + } + // First time access within transaction. + else { + if (lockKeys == null && !skipVals) + lockKeys = single ? Collections.singleton(key) : new ArrayList(keysCnt); + + if (!single && !skipVals) + lockKeys.add(key); + + while (true) { + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer); + + try { + GridCacheVersion ver = entry.version(); + + CacheObject val = null; + GridCacheVersion readVer = null; + EntryGetResult getRes = null; + + if (!pessimistic() || readCommitted() && !skipVals) { + IgniteCacheExpiryPolicy accessPlc = + optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null; + + if (needReadVer) { + getRes = primaryLocal(entry) ? + entry.innerGetVersioned( + null, + this, + /*swap*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc, + !deserializeBinary, + null) : null; + + if (getRes != null) { + val = getRes.value(); + readVer = getRes.version(); + } + } + else { + val = entry.innerGet( + null, + this, + /*swap*/true, + /*read-through*/false, + /*metrics*/true, + /*event*/true, + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + accessPlc, + !deserializeBinary); + } + + if (val != null) { + cacheCtx.addResult(map, + key, + val, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + getRes, + readVer, + 0, + 0, + needVer); + } + else + missed.put(key, ver); + } + else + // We must wait for the lock in pessimistic mode. + missed.put(key, ver); + + if (!readCommitted() && !skipVals) { + txEntry = addEntry(READ, + val, + null, + null, + entry, + expiryPlc, + null, + true, + -1L, + -1L, + null, + skipStore, + !deserializeBinary); + + // As optimization, mark as checked immediately + // for non-pessimistic if value is not null. + if (val != null && !pessimistic()) { + txEntry.markValid(); + + if (needReadVer) { + assert readVer != null; + + txEntry.entryReadVersion(readVer); + } + } + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); + } + finally { + if (entry != null && readCommitted()) { + if (cacheCtx.isNear()) { + if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) { + if (entry.markObsolete(xidVer)) + cacheCtx.cache().removeEntry(entry); + } + } + else + entry.context().evicts().touch(entry, topVer); + } + } + } + } + } + + return lockKeys != null ? lockKeys : Collections.emptyList(); + } + + /** + * @param cacheCtx Cache context. + * @param keys Keys to load. + * @param filter Filter. + * @param ret Return value. + * @param needReadVer Read version flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param readThrough Read through flag. + * @param retval Return value flag. + * @param expiryPlc Expiry policy. + * @return Load future. + */ + private IgniteInternalFuture loadMissing( + final GridCacheContext cacheCtx, + final AffinityTopologyVersion topVer, + final Set keys, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + final boolean needReadVer, + final boolean singleRmv, + final boolean hasFilters, + final boolean readThrough, + final boolean retval, + final boolean keepBinary, + final ExpiryPolicy expiryPlc) { + GridInClosure3 c = + new GridInClosure3() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); + + assert e != null; + + if (needReadVer) { + assert loadVer != null; + + e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } + + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; + + ret.set(cacheCtx, null, val != null, keepBinary); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); + + if (e.op() == TRANSFORM) { + GridCacheVersion ver; + + e.readValue(cacheVal); + + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success; + + if (hasFilters) { + success = isAll(e.context(), key, cacheVal, filter); + + if (!success) + e.value(cacheVal, false, false); + } + else + success = true; + + ret.set(cacheCtx, cacheVal, success, keepBinary); + } + } + } + }; + + return loadMissing( + cacheCtx, + topVer, + readThrough, + /*async*/true, + keys, + /*skipVals*/singleRmv, + needReadVer, + keepBinary, + expiryPlc, + c); + } + + /** + * @param cacheCtx Cache context. + * @param loadFut Missing keys load future. + * @param ret Future result. + * @param keepBinary Keep binary flag. + * @return Future. + */ + private IgniteInternalFuture optimisticPutFuture( + final GridCacheContext cacheCtx, + IgniteInternalFuture loadFut, + final GridCacheReturn ret, + final boolean keepBinary + ) { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); + + try { + loadFut.get(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + return nonInterruptable(commitNearTxLocalAsync().chain( + new CX1, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture txFut) + throws IgniteCheckedException { + try { + txFut.get(); + + Object res = implicitRes.value(); + + if (implicitRes.invokeResult()) { + assert res == null || res instanceof Map : implicitRes; - /** - * @return If backup check was requested. - */ - public boolean needCheckBackup() { - return needCheckBackup != null; - } + res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary); + } - /** - * @return {@code True} if transaction contains at least one near cache key mapped to the local node. - */ - public boolean nearLocallyMapped() { - return nearLocallyMapped; - } + return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success()); + } + catch (IgniteCheckedException | RuntimeException e) { + if (!(e instanceof NodeStoppingException)) + rollbackNearTxLocalAsync(); - /** - * @param nearLocallyMapped {@code True} if transaction contains near key mapped to the local