Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 55C271076B for ; Tue, 23 Dec 2014 00:07:00 +0000 (UTC) Received: (qmail 78087 invoked by uid 500); 23 Dec 2014 00:07:00 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 78054 invoked by uid 500); 23 Dec 2014 00:07:00 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 78045 invoked by uid 99); 23 Dec 2014 00:07:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Dec 2014 00:07:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 23 Dec 2014 00:06:36 +0000 Received: (qmail 65253 invoked by uid 99); 22 Dec 2014 23:59:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Dec 2014 23:59:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 358A4A33DFF; Mon, 22 Dec 2014 23:59:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 23 Dec 2014 00:00:09 -0000 Message-Id: <795cba90d5f7466098b959ae6969b95c@git.apache.org> In-Reply-To: <26feadb5eea944938f598249aa42f8a1@git.apache.org> References: <26feadb5eea944938f598249aa42f8a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java new file mode 100644 index 0000000..4fc7140 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -0,0 +1,3179 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.security.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.grid.kernal.processors.cache.dr.*; +import org.gridgain.grid.kernal.processors.dr.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.future.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.transactions.IgniteTxState.*; +import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; +import static org.gridgain.grid.kernal.processors.dr.GridDrType.*; + +/** + * Transaction adapter for cache transactions. + */ +public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter + implements IgniteTxLocalEx { + /** */ + private static final long serialVersionUID = 0L; + + /** Per-transaction read map. */ + @GridToStringExclude + protected Map, IgniteTxEntry> txMap; + + /** Read view on transaction map. */ + @GridToStringExclude + protected IgniteTxMap readView; + + /** Write view on transaction map. */ + @GridToStringExclude + protected IgniteTxMap writeView; + + /** Minimal version encountered (either explicit lock or XID of this transaction). */ + protected GridCacheVersion minVer; + + /** Flag indicating with TM commit happened. */ + protected AtomicBoolean doneFlag = new AtomicBoolean(false); + + /** Committed versions, relative to base. */ + private Collection committedVers = Collections.emptyList(); + + /** Rolled back versions, relative to base. */ + private Collection rolledbackVers = Collections.emptyList(); + + /** Base for completed versions. */ + private GridCacheVersion completedBase; + + /** Flag indicating partition lock in group lock transaction. */ + private boolean partLock; + + /** Flag indicating that transformed values should be sent to remote nodes. */ + private boolean sndTransformedVals; + + /** Commit error. */ + protected AtomicReference commitErr = new AtomicReference<>(); + + /** Active cache IDs. */ + protected Set activeCacheIds = new HashSet<>(); + + /** + * Empty constructor required for {@link Externalizable}. + */ + protected IgniteTxLocalAdapter() { + // No-op. + } + + /** + * @param cctx Cache registry. + * @param xidVer Transaction ID. + * @param implicit {@code True} if transaction was implicitly started by the system, + * {@code false} if it was started explicitly by user. + * @param implicitSingle {@code True} if transaction is implicit with only one key. + * @param sys System flag. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout. + * @param txSize Expected transaction size. + * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition. + */ + protected IgniteTxLocalAdapter( + GridCacheSharedContext cctx, + GridCacheVersion xidVer, + boolean implicit, + boolean implicitSingle, + boolean sys, + IgniteTxConcurrency concurrency, + IgniteTxIsolation isolation, + long timeout, + boolean invalidate, + boolean storeEnabled, + int txSize, + @Nullable IgniteTxKey grpLockKey, + boolean partLock, + @Nullable UUID subjId, + int taskNameHash + ) { + super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate, + storeEnabled, txSize, grpLockKey, subjId, taskNameHash); + + assert !partLock || grpLockKey != null; + + this.partLock = partLock; + + minVer = xidVer; + } + + /** {@inheritDoc} */ + @Override public UUID eventNodeId() { + return cctx.localNodeId(); + } + + /** {@inheritDoc} */ + @Override public UUID originatingNodeId() { + return cctx.localNodeId(); + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return txMap.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public Collection masterNodeIds() { + return Collections.singleton(nodeId); + } + + /** {@inheritDoc} */ + @Override public boolean partitionLock() { + return partLock; + } + + /** {@inheritDoc} */ + @Override public Throwable commitError() { + return commitErr.get(); + } + + /** {@inheritDoc} */ + @Override public void commitError(Throwable e) { + commitErr.compareAndSet(null, e); + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + assert false; + return false; + } + + /** + * Gets collection of active cache IDs for this transaction. + * + * @return Collection of active cache IDs. + */ + @Override public Collection activeCacheIds() { + return activeCacheIds; + } + + /** {@inheritDoc} */ + @Override public boolean isStarted() { + return txMap != null; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return writeView.containsKey(key); + } + + /** + * @return Transaction read set. + */ + @Override public Set> readSet() { + return txMap == null ? Collections.>emptySet() : readView.keySet(); + } + + /** + * @return Transaction write set. + */ + @Override public Set> writeSet() { + return txMap == null ? Collections.>emptySet() : writeView.keySet(); + } + + /** {@inheritDoc} */ + @Override public boolean removed(IgniteTxKey key) { + if (txMap == null) + return false; + + IgniteTxEntry e = txMap.get(key); + + return e != null && e.op() == DELETE; + } + + /** {@inheritDoc} */ + @Override public Map, IgniteTxEntry> readMap() { + return readView == null ? Collections., IgniteTxEntry>emptyMap() : readView; + } + + /** {@inheritDoc} */ + @Override public Map, IgniteTxEntry> writeMap() { + return writeView == null ? Collections., IgniteTxEntry>emptyMap() : writeView; + } + + /** {@inheritDoc} */ + @Override public Collection> allEntries() { + return txMap == null ? Collections.>emptySet() : txMap.values(); + } + + /** {@inheritDoc} */ + @Override public Collection> readEntries() { + return readView == null ? Collections.>emptyList() : readView.values(); + } + + /** {@inheritDoc} */ + @Override public Collection> writeEntries() { + return writeView == null ? Collections.>emptyList() : writeView.values(); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) { + return txMap == null ? null : txMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void seal() { + if (readView != null) + readView.seal(); + + if (writeView != null) + writeView.seal(); + } + + /** + * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent + * to remote nodes. + */ + public void sendTransformedValues(boolean snd) { + sndTransformedVals = snd; + } + + /** + * @return {@code True} if should be committed after lock is acquired. + */ + protected boolean commitAfterLock() { + return implicit() && (!dht() || colocated()); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"RedundantTypeArguments"}) + @Nullable @Override public GridTuple peek( + GridCacheContext cacheCtx, + boolean failFast, + K key, + IgnitePredicate>[] filter + ) throws GridCacheFilterFailedException { + IgniteTxEntry e = txMap == null ? null : txMap.get(cacheCtx.txKey(key)); + + if (e != null) { + // We should look at tx entry previous value. If this is a user peek then previous + // value is the same as value. If this is a filter evaluation peek then previous value holds + // value visible to filter while value contains value enlisted for write. + if (!F.isAll(e.cached().wrap(false), filter)) + return e.hasPreviousValue() ? F.t(CU.failed(failFast, e.previousValue())) : null; + + return e.hasPreviousValue() ? F.t(e.previousValue()) : null; + } + + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture loadMissing( + final GridCacheContext cacheCtx, + boolean async, + final Collection keys, + boolean deserializePortable, + final IgniteBiInClosure c + ) { + if (!async) { + try { + return new GridFinishedFuture<>(cctx.kernalContext(), + cacheCtx.store().loadAllFromStore(this, keys, c)); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(cctx.kernalContext(), e); + } + } + else + return cctx.kernalContext().closure().callLocalSafe( + new GPC() { + @Override public Boolean call() throws Exception { + return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c); + } + }, + true); + } + + /** + * Gets minimum version present in transaction. + * + * @return Minimum versions. + */ + @Override public GridCacheVersion minVersion() { + return minVer; + } + + /** + * @throws IgniteCheckedException If prepare step failed. + */ + @SuppressWarnings({"CatchGenericClass"}) + public void userPrepare() throws IgniteCheckedException { + if (state() != PREPARING) { + if (timedOut()) + throw new IgniteTxTimeoutException("Transaction timed out: " + this); + + IgniteTxState state = state(); + + setRollbackOnly(); + + throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']'); + } + + checkValid(); + + try { + cctx.tm().prepareTx(this); + } + catch (IgniteCheckedException e) { + throw e; + } + catch (Throwable e) { + setRollbackOnly(); + + throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e); + } + } + + /** {@inheritDoc} */ + @Override public void commit() throws IgniteCheckedException { + try { + commitAsync().get(); + } + finally { + cctx.tm().txContextReset(); + } + } + + /** {@inheritDoc} */ + @Override public void prepare() throws IgniteCheckedException { + prepareAsync().get(); + } + + /** + * Checks that locks are in proper state for commit. + * + * @param entry Cache entry to check. + */ + private void checkCommitLocks(GridCacheEntryEx entry) { + assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry + + ", tx=" + this + ']'; + } + + /** + * Uncommits transaction by invalidating all of its entries. + */ + @SuppressWarnings({"CatchGenericClass"}) + private void uncommit() { + for (IgniteTxEntry e : writeMap().values()) { + try { + GridCacheEntryEx cacheEntry = e.cached(); + + if (e.op() != NOOP) + cacheEntry.invalidate(null, xidVer); + } + catch (Throwable t) { + U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t); + + break; + } + } + + cctx.tm().uncommitTx(this); + } + + /** + * Gets cache entry for given key. + * + * @param key Key. + * @return Cache entry. + */ + protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) { + return cacheCtx.cache().entryEx(key.key()); + } + + /** + * Gets cache entry for given key and topology version. + * + * @param key Key. + * @param topVer Topology version. + * @return Cache entry. + */ + protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key, long topVer) { + return cacheCtx.cache().entryEx(key.key(), topVer); + } + + /** + * Performs batch database operations. This commit must be called + * before {@link #userCommit()}. This way if there is a DB failure, + * cache transaction can still be rolled back. + * + * @param writeEntries Transaction write set. + * @throws IgniteCheckedException If batch update failed. + */ + @SuppressWarnings({"CatchGenericClass"}) + protected void batchStoreCommit(Iterable> writeEntries) throws IgniteCheckedException { + GridCacheStoreManager store = store(); + + if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) { + try { + if (writeEntries != null) { + Map> putMap = null; + List rmvCol = null; + + boolean skipNear = near() && store.writeToStoreFromDht(); + + for (IgniteTxEntry e : writeEntries) { + if (skipNear && e.cached().isNear()) + continue; + + boolean intercept = e.context().config().getInterceptor() != null; + + if (intercept || !F.isEmpty(e.transformClosures())) + e.cached().unswap(true, false); + + GridTuple3 res = applyTransformClosures(e, false); + + GridCacheContext cacheCtx = e.context(); + + GridCacheOperation op = res.get1(); + K key = e.key(); + V val = res.get2(); + GridCacheVersion ver = writeVersion(); + + if (op == CREATE || op == UPDATE) { + // Batch-process all removes if needed. + if (rmvCol != null && !rmvCol.isEmpty()) { + store.removeAllFromStore(this, rmvCol); + + // Reset. + rmvCol.clear(); + } + + if (intercept) { + V old = e.cached().rawGetOrUnmarshal(true); + + val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val); + + if (val == null) + continue; + + val = cacheCtx.unwrapTemporary(val); + } + + if (putMap == null) + putMap = new LinkedHashMap<>(writeMap().size(), 1.0f); + + putMap.put(key, F.t(val, ver)); + } + else if (op == DELETE) { + // Batch-process all puts if needed. + if (putMap != null && !putMap.isEmpty()) { + store.putAllToStore(this, putMap); + + // Reset. + putMap.clear(); + } + + if (intercept) { + V old = e.cached().rawGetOrUnmarshal(true); + + IgniteBiTuple t = cacheCtx.config().getInterceptor() + .onBeforeRemove(key, old); + + if (cacheCtx.cancelRemove(t)) + continue; + } + + if (rmvCol == null) + rmvCol = new LinkedList<>(); + + rmvCol.add(key); + } + else if (log.isDebugEnabled()) + log.debug("Ignoring NOOP entry for batch store commit: " + e); + } + + if (putMap != null && !putMap.isEmpty()) { + assert rmvCol == null || rmvCol.isEmpty(); + + // Batch put at the end of transaction. + store.putAllToStore(this, putMap); + } + + if (rmvCol != null && !rmvCol.isEmpty()) { + assert putMap == null || putMap.isEmpty(); + + // Batch remove at the end of transaction. + store.removeAllFromStore(this, rmvCol); + } + } + + // Commit while locks are held. + store.txEnd(this, true); + } + catch (IgniteCheckedException ex) { + commitError(ex); + + setRollbackOnly(); + + // Safe to remove transaction from committed tx list because nothing was committed yet. + cctx.tm().removeCommittedTx(this); + + throw ex; + } + catch (Throwable ex) { + commitError(ex); + + setRollbackOnly(); + + // Safe to remove transaction from committed tx list because nothing was committed yet. + cctx.tm().removeCommittedTx(this); + + throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CatchGenericClass"}) + @Override public void userCommit() throws IgniteCheckedException { + IgniteTxState state = state(); + + if (state != COMMITTING) { + if (timedOut()) + throw new IgniteTxTimeoutException("Transaction timed out: " + this); + + setRollbackOnly(); + + throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); + } + + checkValid(); + + boolean empty = F.isEmpty(near() ? txMap : writeMap()); + + // Register this transaction as completed prior to write-phase to + // ensure proper lock ordering for removed entries. + // We add colocated transaction to committed set even if it is empty to correctly order + // locks on backup nodes. + if (!empty || colocated()) + cctx.tm().addCommittedTx(this); + + if (groupLock()) + addGroupTxMapping(writeSet()); + + if (!empty) { + // We are holding transaction-level locks for entries here, so we can get next write version. + writeVersion(cctx.versions().next(topologyVersion())); + + batchStoreCommit(writeMap().values()); + + try { + cctx.tm().txContext(this); + + long topVer = topologyVersion(); + + /* + * Commit to cache. Note that for 'near' transaction we loop through all the entries. + */ + for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { + GridCacheContext cacheCtx = txEntry.context(); + + GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE; + + UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId(); + + try { + while (true) { + try { + GridCacheEntryEx cached = txEntry.cached(); + + // Must try to evict near entries before committing from + // transaction manager to make sure locks are held. + if (!evictNearEntry(txEntry, false)) { + if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) { + cached.markObsolete(xidVer); + + break; + } + + if (cached.detached()) + break; + + GridCacheEntryEx nearCached = null; + + boolean metrics = true; + + if (updateNearCache(cacheCtx, txEntry.key(), topVer)) + nearCached = cacheCtx.dht().near().peekEx(txEntry.key()); + else if (cacheCtx.isNear() && txEntry.locallyMapped()) + metrics = false; + + boolean evt = !isNearLocallyMapped(txEntry, false); + + // For near local transactions we must record DHT version + // in order to keep near entries on backup nodes until + // backup remote transaction completes. + if (cacheCtx.isNear()) + ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion()); + + if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters())) + txEntry.cached().unswap(true, false); + + GridTuple3 res = applyTransformClosures(txEntry, + true); + + GridCacheOperation op = res.get1(); + V val = res.get2(); + byte[] valBytes = res.get3(); + + // Preserve TTL if needed. + if (txEntry.ttl() < 0) + txEntry.ttl(cached.ttl()); + + // Deal with DR conflicts. + GridCacheVersion explicitVer = txEntry.drVersion() != null ? + txEntry.drVersion() : writeVersion(); + + GridDrResolveResult drRes = cacheCtx.dr().resolveTx(cached, + txEntry, + explicitVer, + op, + val, + valBytes, + txEntry.ttl(), + txEntry.drExpireTime()); + + if (drRes != null) { + op = drRes.operation(); + val = drRes.value(); + valBytes = drRes.valueBytes(); + + if (drRes.isMerge()) + explicitVer = writeVersion(); + } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; + + if (sndTransformedVals || (drRes != null)) { + assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null); + + txEntry.value(val, true, false); + txEntry.valueBytes(valBytes); + txEntry.op(op); + txEntry.transformClosures(null); + txEntry.drVersion(explicitVer); + } + + if (op == CREATE || op == UPDATE) { + GridCacheUpdateTxResult updRes = cached.innerSet( + this, + eventNodeId(), + txEntry.nodeId(), + val, + valBytes, + false, + false, + txEntry.ttl(), + evt, + metrics, + topVer, + txEntry.filters(), + cached.detached() ? DR_NONE : drType, + txEntry.drExpireTime(), + cached.isNear() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName()); + + if (nearCached != null && updRes.success()) + nearCached.innerSet( + null, + eventNodeId(), + nodeId, + val, + valBytes, + false, + false, + txEntry.ttl(), + false, + metrics, + topVer, + CU.empty(), + DR_NONE, + txEntry.drExpireTime(), + null, + CU.subjectId(this, cctx), + resolveTaskName()); + } + else if (op == DELETE) { + GridCacheUpdateTxResult updRes = cached.innerRemove( + this, + eventNodeId(), + txEntry.nodeId(), + false, + false, + evt, + metrics, + topVer, + txEntry.filters(), + cached.detached() ? DR_NONE : drType, + cached.isNear() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName()); + + if (nearCached != null && updRes.success()) + nearCached.innerRemove( + null, + eventNodeId(), + nodeId, + false, + false, + false, + metrics, + topVer, + CU.empty(), + DR_NONE, + null, + CU.subjectId(this, cctx), + resolveTaskName()); + } + else if (op == RELOAD) { + cached.innerReload(CU.empty()); + + if (nearCached != null) + nearCached.innerReload(CU.empty()); + } + else if (op == READ) { + if (log.isDebugEnabled()) + log.debug("Ignoring READ entry when committing: " + txEntry); + } + else { + assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()): + "Transaction does not own lock for group lock entry during commit [tx=" + + this + ", txEntry=" + txEntry + ']'; + + if (log.isDebugEnabled()) + log.debug("Ignoring NOOP entry when committing: " + txEntry); + } + } + + // Check commit locks after set, to make sure that + // we are not changing obsolete entries. + // (innerSet and innerRemove will throw an exception + // if an entry is obsolete). + if (txEntry.op() != READ && !txEntry.groupLockEntry()) + checkCommitLocks(cached); + + // Break out of while loop. + break; + } + // If entry cached within transaction got removed. + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during transaction commit (will retry): " + txEntry); + + txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes()); + } + } + } + catch (Throwable ex) { + // We are about to initiate transaction rollback when tx has started to committing. + // Need to remove version from committed list. + cctx.tm().removeCommittedTx(this); + + if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) { + if (log.isDebugEnabled()) + log.debug("Failed to update mongo document index (transaction entry will " + + "be ignored): " + txEntry); + + // Set operation to NOOP. + txEntry.op(NOOP); + + setRollbackOnly(); + + throw ex; + } + else { + IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " + + "(all transaction entries will be invalidated, however there was a window when " + + "entries for this transaction were visible to others): " + this, ex); + + U.error(log, "Heuristic transaction failure.", err); + + commitErr.compareAndSet(null, err); + + state(UNKNOWN); + + try { + // Courtesy to minimize damage. + uncommit(); + } + catch (Throwable ex1) { + U.error(log, "Failed to uncommit transaction: " + this, ex1); + } + + throw err; + } + } + } + } + finally { + cctx.tm().txContextReset(); + } + } + else { + GridCacheStoreManager store = store(); + + if (store != null && (!internal() || groupLock())) { + try { + store.txEnd(this, true); + } + catch (IgniteCheckedException e) { + commitError(e); + + setRollbackOnly(); + + cctx.tm().removeCommittedTx(this); + + throw e; + } + } + } + + // Do not unlock transaction entries if one-phase commit. + if (!onePhaseCommit()) { + if (doneFlag.compareAndSet(false, true)) { + // Unlock all locks. + cctx.tm().commitTx(this); + + boolean needsCompletedVersions = needsCompletedVersions(); + + assert !needsCompletedVersions || completedBase != null; + assert !needsCompletedVersions || committedVers != null; + assert !needsCompletedVersions || rolledbackVers != null; + } + } + } + + /** + * Commits transaction to transaction manager. Used for one-phase commit transactions only. + */ + public void tmCommit() { + assert onePhaseCommit(); + + if (doneFlag.compareAndSet(false, true)) { + // Unlock all locks. + cctx.tm().commitTx(this); + + state(COMMITTED); + + boolean needsCompletedVersions = needsCompletedVersions(); + + assert !needsCompletedVersions || completedBase != null; + assert !needsCompletedVersions || committedVers != null; + assert !needsCompletedVersions || rolledbackVers != null; + } + } + + /** {@inheritDoc} */ + @Override public void completedVersions( + GridCacheVersion completedBase, + Collection committedVers, + Collection rolledbackVers) { + this.completedBase = completedBase; + this.committedVers = committedVers; + this.rolledbackVers = rolledbackVers; + } + + /** + * @return Completed base for ordering. + */ + public GridCacheVersion completedBase() { + return completedBase; + } + + /** + * @return Committed versions. + */ + public Collection committedVersions() { + return committedVers; + } + + /** + * @return Rolledback versions. + */ + public Collection rolledbackVersions() { + return rolledbackVers; + } + + /** {@inheritDoc} */ + @Override public void userRollback() throws IgniteCheckedException { + IgniteTxState state = state(); + + if (state != ROLLING_BACK && state != ROLLED_BACK) { + setRollbackOnly(); + + throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']', + commitErr.get()); + } + + if (doneFlag.compareAndSet(false, true)) { + try { + if (near()) + // Must evict near entries before rolling back from + // transaction manager, so they will be removed from cache. + for (IgniteTxEntry e : allEntries()) + evictNearEntry(e, false); + + cctx.tm().rollbackTx(this); + + GridCacheStoreManager store = store(); + + if (store != null && (near() || store.writeToStoreFromDht())) { + if (!internal() || groupLock()) + store.txEnd(this, false); + } + } + catch (Error | IgniteCheckedException | RuntimeException e) { + U.addLastCause(e, commitErr.get(), log); + + throw e; + } + } + } + + /** + * Checks if there is a cached or swapped value for + * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. + * + * + * @param keys Key to enlist. + * @param cached Cached entry, if called from entry wrapper. + * @param map Return map. + * @param missed Map of missed keys. + * @param keysCnt Keys count (to avoid call to {@code Collection.size()}). + * @param deserializePortable Deserialize portable flag. + * @param filter Filter to test. + * @throws IgniteCheckedException If failed. + * @return Enlisted keys. + */ + @SuppressWarnings({"RedundantTypeArguments"}) + private Collection enlistRead( + final GridCacheContext cacheCtx, + Collection keys, + @Nullable GridCacheEntryEx cached, + Map map, + Map missed, + int keysCnt, + boolean deserializePortable, + IgnitePredicate>[] filter) throws IgniteCheckedException { + assert !F.isEmpty(keys); + assert keysCnt == keys.size(); + assert cached == null || F.first(keys).equals(cached.key()); + + cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ); + + groupLockSanityCheck(cacheCtx, keys); + + boolean single = keysCnt == 1; + + Collection lockKeys = null; + + long topVer = topologyVersion(); + + // In this loop we cover only read-committed or optimistic transactions. + // Transactions that are pessimistic and not read-committed are covered + // outside of this loop. + for (K key : keys) { + if (key == null) + continue; + + if (pessimistic() && !readCommitted()) + addActiveCache(cacheCtx); + + IgniteTxKey txKey = cacheCtx.txKey(key); + + // Check write map (always check writes first). + IgniteTxEntry txEntry = entry(txKey); + + // Either non-read-committed or there was a previous write. + if (txEntry != null) { + if (cacheCtx.isAll(txEntry.cached(), filter)) { + V val = txEntry.value(); + + // Read value from locked entry in group-lock transaction as well. + if (txEntry.hasValue()) { + if (!F.isEmpty(txEntry.transformClosures())) { + for (IgniteClosure clos : txEntry.transformClosures()) + val = clos.apply(val); + } + + if (val != null) { + V val0 = val; + + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); + + map.put(key, val0); + } + } + else { + assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry()); + + while (true) { + try { + Object transformClo = + (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? + F.first(txEntry.transformClosures()) : null; + + val = txEntry.cached().innerGet(this, + /*swap*/true, + /*read-through*/false, + /*fail fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + /*temporary*/false, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + filter); + + if (val != null) { + if (!readCommitted()) + txEntry.readValue(val); + + if (!F.isEmpty(txEntry.transformClosures())) { + for (IgniteClosure clos : txEntry.transformClosures()) + val = clos.apply(val); + } + + V val0 = val; + + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); + + map.put(key, val0); + } + else + missed.put(key, txEntry.cached().version()); + + break; + } + catch (GridCacheFilterFailedException e) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for entry: " + txEntry); + + if (!readCommitted()) + txEntry.readValue(e.value()); + } + catch (GridCacheEntryRemovedException ignored) { + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes()); + } + } + } + } + } + // First time access within transaction. + else { + if (lockKeys == null) + lockKeys = single ? (Collection)keys : new ArrayList(keysCnt); + + if (!single) + lockKeys.add(key); + + while (true) { + GridCacheEntryEx entry; + + if (cached != null) { + entry = cached; + + cached = null; + } + else + entry = entryEx(cacheCtx, txKey, topVer); + + try { + GridCacheVersion ver = entry.version(); + + V val = null; + + if (!pessimistic() || readCommitted() || groupLock()) { + // This call will check for filter. + val = entry.innerGet(this, + /*swap*/true, + /*no read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*event*/true, + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + filter); + + if (val != null) { + V val0 = val; + + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); + + map.put(key, val0); + } + else + missed.put(key, ver); + } + else + // We must wait for the lock in pessimistic mode. + missed.put(key, ver); + + if (!readCommitted()) { + txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null); + + if (groupLock()) + txEntry.groupLockEntry(true); + + // As optimization, mark as checked immediately + // for non-pessimistic if value is not null. + if (val != null && !pessimistic()) + txEntry.markValid(); + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key); + } + catch (GridCacheFilterFailedException e) { + if (log.isDebugEnabled()) + log.debug("Filter validation failed for entry: " + entry); + + if (!readCommitted()) { + // Value for which failure occurred. + V val = e.value(); + + txEntry = addEntry(READ, val, null, entry, -1, CU.empty(), false, -1L, -1L, null); + + // Mark as checked immediately for non-pessimistic. + if (val != null && !pessimistic()) + txEntry.markValid(); + } + + break; // While loop. + } + } + } + } + + return lockKeys != null ? lockKeys : Collections.emptyList(); + } + + /** + * Adds skipped key. + * + * @param skipped Skipped set (possibly {@code null}). + * @param key Key to add. + * @return Skipped set. + */ + private Set skip(Set skipped, K key) { + if (skipped == null) + skipped = new GridLeanSet<>(); + + skipped.add(key); + + if (log.isDebugEnabled()) + log.debug("Added key to skipped set: " + key); + + return skipped; + } + + /** + * Loads all missed keys for + * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method. + * + * @param map Return map. + * @param missedMap Missed keys. + * @param redos Keys to retry. + * @param deserializePortable Deserialize portable flag. + * @param filter Filter. + * @return Loaded key-value pairs. + */ + private IgniteFuture> checkMissed( + final GridCacheContext cacheCtx, + final Map map, + final Map missedMap, + @Nullable final Collection redos, + final boolean deserializePortable, + final IgnitePredicate>[] filter + ) { + assert redos != null || pessimistic(); + + if (log.isDebugEnabled()) + log.debug("Loading missed values for missed map: " + missedMap); + + final Collection loaded = new HashSet<>(); + + return new GridEmbeddedFuture<>(cctx.kernalContext(), + loadMissing( + cacheCtx, + false, missedMap.keySet(), deserializePortable, new CI2() { + /** */ + private GridCacheVersion nextVer; + + @Override public void apply(K key, V val) { + if (isRollbackOnly()) { + if (log.isDebugEnabled()) + log.debug("Ignoring loaded value for read because transaction was rolled back: " + + IgniteTxLocalAdapter.this); + + return; + } + + GridCacheVersion ver = missedMap.get(key); + + if (ver == null) { + if (log.isDebugEnabled()) + log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']'); + + return; + } + + V visibleVal = val; + + IgniteTxKey txKey = cacheCtx.txKey(key); + + IgniteTxEntry txEntry = entry(txKey); + + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(val); + + if (!F.isEmpty(txEntry.transformClosures())) { + for (IgniteClosure clos : txEntry.transformClosures()) + visibleVal = clos.apply(visibleVal); + } + } + + // In pessimistic mode we hold the lock, so filter validation + // should always be valid. + if (pessimistic()) + ver = null; + + // Initialize next version. + if (nextVer == null) + nextVer = cctx.versions().next(topologyVersion()); + + while (true) { + assert txEntry != null || readCommitted() || groupLock(); + + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + + try { + boolean pass = cacheCtx.isAll(e, filter); + + // Must initialize to true since even if filter didn't pass, + // we still record the transaction value. + boolean set = true; + + if (pass) { + try { + set = e.versionedValue(val, ver, nextVer); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction getAll method " + + "(will try again): " + e); + + if (pessimistic() && !readCommitted() && !isRollbackOnly() && + (!groupLock() || F.eq(e.key(), groupLockKey()))) { + U.error(log, "Inconsistent transaction state (entry got removed while " + + "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]"); + + setRollbackOnly(); + + return; + } + + if (txEntry != null) + txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); + + continue; // While loop. + } + } + + // In pessimistic mode, we should always be able to set. + assert set || !pessimistic(); + + if (readCommitted() || groupLock()) { + cacheCtx.evicts().touch(e, topologyVersion()); + + if (pass && visibleVal != null) + map.put(key, visibleVal); + } + else { + assert txEntry != null; + + if (set || F.isEmptyOrNulls(filter)) { + txEntry.setAndMarkValid(val); + + if (pass && visibleVal != null) + map.put(key, visibleVal); + } + else { + assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " + + this; + + if (log.isDebugEnabled()) + log.debug("Failed to set versioned value for entry (will redo): " + e); + + redos.add(key); + } + } + + loaded.add(key); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry from transaction [set=" + set + + ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']'); + + break; // While loop. + } + catch (IgniteCheckedException ex) { + throw new IgniteException("Failed to put value for cache entry: " + e, ex); + } + } + } + }), + new C2>() { + @Override public Map apply(Boolean b, Exception e) { + if (e != null) { + setRollbackOnly(); + + throw new GridClosureException(e); + } + + if (!b && !readCommitted()) { + // There is no store - we must mark the entries. + for (K key : missedMap.keySet()) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + + if (txEntry != null) + txEntry.markValid(); + } + } + + if (readCommitted()) { + Collection notFound = new HashSet<>(missedMap.keySet()); + + notFound.removeAll(loaded); + + // In read-committed mode touch entries that have just been read. + for (K key : notFound) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + + GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) : + txEntry.cached(); + + if (entry != null) + cacheCtx.evicts().touch(entry, topologyVersion()); + } + } + + return map; + } + }); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture> getAllAsync( + final GridCacheContext cacheCtx, + Collection keys, + @Nullable GridCacheEntryEx cached, final boolean deserializePortable, + final IgnitePredicate>[] filter) { + if (F.isEmpty(keys)) + return new GridFinishedFuture<>(cctx.kernalContext(), Collections.emptyMap()); + + init(); + + int keysCnt = keys.size(); + + boolean single = keysCnt == 1; + + try { + checkValid(); + + final Map retMap = new GridLeanMap<>(keysCnt); + + final Map missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0); + + final Collection lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt, + deserializePortable, filter); + + if (single && missed.isEmpty()) + return new GridFinishedFuture<>(cctx.kernalContext(), retMap); + + // Handle locks. + if (pessimistic() && !readCommitted() && !groupLock()) { + IgniteFuture fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true, + isolation, isInvalidate(), CU.empty()); + + PLC2> plc2 = new PLC2>() { + @Override public IgniteFuture> postLock() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for read on keys: " + lockKeys); + + // Load keys only after the locks have been acquired. + for (K key : lockKeys) { + if (retMap.containsKey(key)) + // We already have a return value. + continue; + + IgniteTxKey txKey = cacheCtx.txKey(key); + + IgniteTxEntry txEntry = entry(txKey); + + assert txEntry != null; + + // Check if there is cached value. + while (true) { + GridCacheEntryEx cached = txEntry.cached(); + + try { + Object transformClo = + (!F.isEmpty(txEntry.transformClosures()) && + cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ? + F.first(txEntry.transformClosures()) : null; + + V val = cached.innerGet(IgniteTxLocalAdapter.this, + cacheCtx.isSwapOrOffheapEnabled(), + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*events*/true, + /*temporary*/true, + CU.subjectId(IgniteTxLocalAdapter.this, cctx), + transformClo, + resolveTaskName(), + filter); + + // If value is in cache and passed the filter. + if (val != null) { + missed.remove(key); + + txEntry.setAndMarkValid(val); + + if (!F.isEmpty(txEntry.transformClosures())) { + for (IgniteClosure clos : txEntry.transformClosures()) + val = clos.apply(val); + } + + if (cacheCtx.portableEnabled()) + val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); + + retMap.put(key, val); + } + + // Even though we bring the value back from lock acquisition, + // we still need to recheck primary node for consistent values + // in case of concurrent transactional locks. + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed exception in get postLock (will retry): " + + cached); + + txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes()); + } + catch (GridCacheFilterFailedException e) { + // Failed value for the filter. + V val = e.value(); + + if (val != null) { + // If filter fails after lock is acquired, we don't reload, + // regardless if value is null or not. + missed.remove(key); + + txEntry.setAndMarkValid(val); + } + + break; // While. + } + } + } + + if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal())) + return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter); + + return new GridFinishedFuture<>(cctx.kernalContext(), Collections.emptyMap()); + } + }; + + FinishClosure> finClos = new FinishClosure>() { + @Override Map finish(Map loaded) { + retMap.putAll(loaded); + + return retMap; + } + }; + + if (fut.isDone()) { + try { + IgniteFuture> fut1 = plc2.apply(fut.get(), null); + + return fut1.isDone() ? + new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) : + new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return plc2.apply(false, e); + } + catch (Exception e1) { + return new GridFinishedFuture<>(cctx.kernalContext(), e1); + } + } + } + else { + return new GridEmbeddedFuture<>( + cctx.kernalContext(), + fut, + plc2, + finClos); + } + } + else { + assert optimistic() || readCommitted() || groupLock(); + + final Collection redos = new LinkedList<>(); + + if (!missed.isEmpty()) { + if (!readCommitted()) + for (Iterator it = missed.keySet().iterator(); it.hasNext(); ) + if (retMap.containsKey(it.next())) + it.remove(); + + if (missed.isEmpty()) + return new GridFinishedFuture<>(cctx.kernalContext(), retMap); + + return new GridEmbeddedFuture<>( + cctx.kernalContext(), + // First future. + checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter), + // Closure that returns another future, based on result from first. + new PMC>() { + @Override public IgniteFuture> postMiss(Map map) { + if (redos.isEmpty()) + return new GridFinishedFuture<>(cctx.kernalContext(), + Collections.emptyMap()); + + if (log.isDebugEnabled()) + log.debug("Starting to future-recursively get values for keys: " + redos); + + // Future recursion. + return getAllAsync(cacheCtx, redos, null, deserializePortable, filter); + } + }, + // Finalize. + new FinishClosure>() { + @Override Map finish(Map loaded) { + for (Map.Entry entry : loaded.entrySet()) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(entry.getKey())); + + V val = entry.getValue(); + + if (!readCommitted()) + txEntry.readValue(val); + + if (!F.isEmpty(txEntry.transformClosures())) { + for (IgniteClosure clos : txEntry.transformClosures()) + val = clos.apply(val); + } + + retMap.put(entry.getKey(), val); + } + + return retMap; + } + } + ); + } + + return new GridFinishedFuture<>(cctx.kernalContext(), retMap); + } + } + catch (IgniteCheckedException e) { + setRollbackOnly(); + + return new GridFinishedFuture<>(cctx.kernalContext(), e); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture> putAllAsync( + GridCacheContext cacheCtx, + Map map, + boolean retval, + @Nullable GridCacheEntryEx cached, + long ttl, + IgnitePredicate>[] filter + ) { + return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture putAllDrAsync( + GridCacheContext cacheCtx, + Map> drMap + ) { + return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture> transformAllAsync( + GridCacheContext cacheCtx, + @Nullable Map> map, + boolean retval, + @Nullable GridCacheEntryEx cached, + long ttl + ) { + return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture removeAllDrAsync( + GridCacheContext cacheCtx, + Map drMap + ) { + return removeAllAsync0(cacheCtx, null, drMap, null, false, null); + } + + /** + * Checks filter for non-pessimistic transactions. + * + * @param cached Cached entry. + * @param filter Filter to check. + * @return {@code True} if passed or pessimistic. + * @throws IgniteCheckedException If failed. + */ + private boolean filter(GridCacheEntryEx cached, + IgnitePredicate>[] filter) throws IgniteCheckedException { + return pessimistic() || cached.context().isAll(cached, filter); + } + + /** + * Internal routine for putAll(..) + * + * @param keys Keys to enlist. + * @param cached Cached entry. + * @param ttl Time to live for entry. If negative, leave unchanged. + * @param implicit Implicit flag. + * @param lookup Value lookup map ({@code null} for remove). + * @param transformMap Map with transform closures if this is a transform operation. + * @param retval Flag indicating whether a value should be returned. + * @param lockOnly If {@code true}, then entry will be enlisted as noop. + * @param filter User filters. + * @param ret Return value. + * @param enlisted Collection of keys enlisted into this transaction. + * @param drPutMap DR put map (optional). + * @param drRmvMap DR remove map (optional). + * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). + */ + protected IgniteFuture> enlistWrite( + GridCacheContext cacheCtx, + Collection keys, + @Nullable GridCacheEntryEx cached, + long ttl, + boolean implicit, + @Nullable Map lookup, + @Nullable Map> transformMap, + boolean retval, + boolean lockOnly, + IgnitePredicate>[] filter, + final GridCacheReturn ret, + Collection enlisted, + @Nullable Map> drPutMap, + @Nullable Map drRmvMap + ) { + assert cached == null || keys.size() == 1; + assert cached == null || F.first(keys).equals(cached.key()); + + try { + addActiveCache(cacheCtx); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(cctx.kernalContext(), e); + } + + Set skipped = null; + + boolean rmv = lookup == null && transformMap == null; + + try { + // Set transform flag for transaction. + if (transformMap != null) + transform = true; + + groupLockSanityCheck(cacheCtx, keys); + + for (K key : keys) { + V val = rmv || lookup == null ? null : lookup.get(key); + IgniteClosure transformClo = transformMap == null ? null : transformMap.get(key); + + GridCacheVersion drVer; + long drTtl; + long drExpireTime; + + if (drPutMap != null) { + GridCacheDrInfo info = drPutMap.get(key); + + assert info != null; + + drVer = info.version(); + drTtl = info.ttl(); + drExpireTime = info.expireTime(); + } + else if (drRmvMap != null) { + assert drRmvMap.get(key) != null; + + drVer = drRmvMap.get(key); + drTtl = -1L; + drExpireTime = -1L; + } + else { + drVer = null; + drTtl = -1L; + drExpireTime = -1L; + } + + if (key == null) + continue; + + if (!rmv && val == null && transformClo == null) { + skipped = skip(skipped, key); + + continue; + } + + if (cacheCtx.portableEnabled()) + key = (K)cacheCtx.marshalToPortable(key); + + IgniteTxKey txKey = cacheCtx.txKey(key); + + IgniteTxEntry txEntry = entry(txKey); + + // First time access. + if (txEntry == null) { + while (true) { + GridCacheEntryEx entry; + + if (cached != null) { + entry = cached; + + cached = null; + } + else { + entry = entryEx(cacheCtx, txKey, topologyVersion()); + + entry.unswap(true, false); + } + + try { + // Check if lock is being explicitly acquired by the same thread. + if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && + entry.lockedByThread(threadId, xidVer)) + throw new IgniteCheckedException("Cannot access key within transaction if lock is " + + "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer + + ", threadId=" + threadId + + ", locNodeId=" + cctx.localNodeId() + ']'); + + V old = null; + + boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + + if (optimistic()) { + try { + //Should read through if filter is specified. + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/readThrough, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + transformClo, + resolveTaskName(), + CU.empty()); + } + catch (GridCacheFilterFailedException e) { + e.printStackTrace(); + + assert false : "Empty filter failed: " + e; + } + } + else + old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + + if (!filter(entry, filter)) { + skipped = skip(skipped, key); + + ret.set(old, false); + + if (!readCommitted() && old != null) { + // Enlist failed filters as reads for non-read-committed mode, + // so future ops will get the same values. + txEntry = addEntry(READ, old, null, entry, -1, CU.empty(), false, -1L, -1L, + null); + + txEntry.markValid(); + } + + if (readCommitted() || old == null) + cacheCtx.evicts().touch(entry, topologyVersion()); + + break; // While. + } + + txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM : + old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl, + drExpireTime, drVer); + + if (!implicit() && readCommitted()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + if (groupLock() && !lockOnly) + txEntry.groupLockEntry(true); + + enlisted.add(key); + + if (!pessimistic() || (groupLock() && !lockOnly)) { + txEntry.markValid(); + + if (old == null) { + if (retval && !readThrough) { + // If return value is required, then we know for sure that there is only + // one key in the keys collection. + assert keys.size() == 1; + + IgniteFuture fut = loadMissing( + cacheCtx, + true, + F.asList(key), + deserializePortables(cacheCtx), + new CI2() { + @Override public void apply(K k, V v) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + k + ", val=" + + v + ']'); + + ret.set(v, true); + } + }); + + return new GridEmbeddedFuture<>( + cctx.kernalContext(), + fut, + new C2>() { + @Override public Set apply(Boolean b, Exception e) { + if (e != null) + throw new GridClosureException(e); + + return Collections.emptySet(); + } + } + ); + } + else + ret.set(null, true); + } + else + ret.set(old, true); + } + // Pessimistic. + else + ret.set(old, true); + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction putAll0 method: " + entry); + } + } + } + else { + if (transformClo == null && txEntry.op() == TRANSFORM) + throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + + "transaction after transform closure is applied): " + key); + + GridCacheEntryEx entry = txEntry.cached(); + + V v = txEntry.value(); + + boolean del = txEntry.op() == DELETE && rmv; + + if (!del) { + if (!filter(entry, filter)) { + skipped = skip(skipped, key); + + ret.set(v, false); + + continue; + } + + txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM : + v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl, + drExpireTime, drVer); + + enlisted.add(key); + } + + if (!pessimistic()) { + txEntry.markValid(); + + // Set tx entry and return values. + ret.set(v, true); + } + } + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(cctx.kernalContext(), e); + } + + return new GridFinishedFuture<>(cctx.kernalContext(), skipped); + } + + /** + * Post lock processing for put or remove. + * + * @param keys Keys. + * @param failed Collection of potentially failed keys (need to populate in this method). + * @param transformed Output map where transformed values will be placed. + * @param transformMap Transform map. + * @param ret Return value. + * @param rmv {@code True} if remove. + * @param retval Flag to return value or not. + * @param filter Filter to check entries. + * @return Failed keys. + * @throws IgniteCheckedException If error. + */ + protected Set postLockWrite( + GridCacheContext cacheCtx, + Iterable keys, + Set failed, + @Nullable Map transformed, + @Nullable Map> transformMap, + GridCacheReturn ret, + boolean rmv, + boolean retval, + IgnitePredicate>[] filter + ) throws IgniteCheckedException { + for (K k : keys) { + IgniteTxEntry txEntry = entry(cacheCtx.txKey(k)); + + if (txEntry == null) + throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " + + "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']'); + + while (true) { + GridCacheEntryEx cached = txEntry.cached(); + + try { + assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() : + "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + + ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; + + if (log.isDebugEnabled()) + log.debug("Post lock write entry: " + cached); + + V v = txEntry.previousValue(); + boolean hasPrevVal = txEntry.hasPreviousValue(); + + if (onePhaseCommit()) + filter = txEntry.filters(); + + // If we have user-passed filter, we must read value into entry for peek(). + if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) + retval = true; + + if (retval) { + if (!cacheCtx.isNear()) { + try { + if (!hasPrevVal) + v = cached.innerGet(this, + /*swap*/retval, + /*read-through*/retval, + /*failFast*/false, + /*unmarshal*/retval, + /*metrics*/true, + /*event*/!dht(), + /*temporary*/false, + CU.subjectId(this, cctx), + null, + resolveTaskName(), + CU.empty()); + } + catch (GridCacheFilterFailedException e) { + e.printStackTrace(); + + assert false : "Empty filter failed: " + e; + } + } + else { + if (!hasPrevVal) + v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet(); + } + + ret.value(v); + } + + boolean pass = cacheCtx.isAll(cached, filter); + + // For remove operation we return true only if we are removing s/t, + // i.e. cached value is not null. + ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null)); + + if (onePhaseCommit()) + txEntry.filtersPassed(pass); + + if (pass) { + txEntry.markValid(); + + if (log.isDebugEnabled()) + log.debug("Filter passed in post lock for key: " + k); + } + else { + failed = skip(failed, k); + + // Revert operation to previous. (if no - NOOP, so entry will be unlocked). + txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value()); + txEntry.filters(CU.empty()); + txEntry.filtersSet(false); + } + + break; // While. + } + // If entry cached within transaction got removed before lock. + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in putAllAsync method (will retry): " + cached); + + txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes()); + } + } + } + + if (log.isDebugEnabled()) + log.debug("Entries that failed after lock filter check: " + failed); + + return failed; + } + + /** + * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} + * maps must be non-null. + * + * @param map Key-value map to store. + * @param transformMap Transform map. + * @param drMap DR map. + * @param retval Key-transform value map to store. + * @param cached Cached entry, if any. + * @param ttl Time to live. + * @param filter Filter. + * @return Operation future. + */ + private IgniteFuture> putAllAsync0( + final GridCacheContext cacheCtx, + @Nullable Map ma