Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 566E3C5BA for ; Sun, 21 Dec 2014 23:04:22 +0000 (UTC) Received: (qmail 83622 invoked by uid 500); 21 Dec 2014 23:04:22 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 83594 invoked by uid 500); 21 Dec 2014 23:04:22 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 83581 invoked by uid 99); 21 Dec 2014 23:04:22 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Dec 2014 23:04:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 21 Dec 2014 23:04:16 +0000 Received: (qmail 82557 invoked by uid 99); 21 Dec 2014 23:03:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Dec 2014 23:03:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5FC479CC574; Sun, 21 Dec 2014 23:03:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Sun, 21 Dec 2014 23:04:33 -0000 Message-Id: <2c4ef8addc784b92b6dd45c7c4ad1557@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [40/46] incubator-ignite git commit: GG-9141 - Renaming. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalEx.java deleted file mode 100644 index 524243c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalEx.java +++ /dev/null @@ -1,166 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.processors.cache.dr.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Local transaction API. - */ -public interface GridCacheTxLocalEx extends GridCacheTxEx { - /** - * @return Minimum version involved in transaction. - */ - public GridCacheVersion minVersion(); - - /** - * @return Future for this transaction. - */ - public IgniteFuture> future(); - - /** - * @return Commit error. - */ - @Nullable public Throwable commitError(); - - /** - * @param e Commit error. - */ - public void commitError(Throwable e); - - /** - * @throws IgniteCheckedException If commit failed. - */ - public void userCommit() throws IgniteCheckedException; - - /** - * @throws IgniteCheckedException If rollback failed. - */ - public void userRollback() throws IgniteCheckedException; - - /** - * @return Group lock entry if this is a group-lock transaction. - */ - @Nullable public GridCacheTxEntry groupLockEntry(); - - /** - * @param keys Keys to get. - * @param cached Cached entry if this method is called from entry wrapper. - * Cached entry is passed if and only if there is only one key in collection of keys. - * @param deserializePortable Deserialize portable flag. - * @param filter Entry filter. - * @return Future for this get. - */ - public IgniteFuture> getAllAsync( - GridCacheContext cacheCtx, - Collection keys, - @Nullable GridCacheEntryEx cached, - boolean deserializePortable, - IgnitePredicate>[] filter); - - /** - * @param map Map to put. - * @param retval Flag indicating whether a value should be returned. - * @param cached Cached entry, if any. Will be provided only if map has size 1. - * @param filter Filter. - * @param ttl Time to live for entry. If negative, leave unchanged. - * @return Future for put operation. - */ - public IgniteFuture> putAllAsync( - GridCacheContext cacheCtx, - Map map, - boolean retval, - @Nullable GridCacheEntryEx cached, - long ttl, - IgnitePredicate>[] filter); - - /** - * @param map Map to put. - * @return Transform operation future. - */ - public IgniteFuture> transformAllAsync( - GridCacheContext cacheCtx, - @Nullable Map> map, - boolean retval, - @Nullable GridCacheEntryEx cached, - long ttl); - - /** - * @param keys Keys to remove. - * @param retval Flag indicating whether a value should be returned. - * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1. - * @param filter Filter. - * @return Future for asynchronous remove. - */ - public IgniteFuture> removeAllAsync( - GridCacheContext cacheCtx, - Collection keys, - @Nullable GridCacheEntryEx cached, - boolean retval, - IgnitePredicate>[] filter); - - /** - * @param drMap DR map to put. - * @return Future for DR put operation. - */ - public IgniteFuture putAllDrAsync( - GridCacheContext cacheCtx, - Map> drMap); - - /** - * @param drMap DR map. - * @return Future for asynchronous remove. - */ - public IgniteFuture removeAllDrAsync( - GridCacheContext cacheCtx, - Map drMap); - - /** - * Performs keys locking for affinity-based group lock transactions. - * - * @param keys Keys to lock. - * @return Lock future. - */ - public IgniteFuture groupLockAsync(GridCacheContext cacheCtx, Collection keys); - - /** - * @return {@code True} if keys from the same partition are allowed to be enlisted in group-lock transaction. - */ - public boolean partitionLock(); - - /** - * Finishes transaction (either commit or rollback). - * - * @param commit {@code True} if commit, {@code false} if rollback. - * @return {@code True} if state has been changed. - * @throws IgniteCheckedException If finish failed. - */ - public boolean finish(boolean commit) throws IgniteCheckedException; - - /** - * @param async if {@code True}, then loading will happen in a separate thread. - * @param keys Keys. - * @param c Closure. - * @param deserializePortable Deserialize portable flag. - * @return Future with {@code True} value if loading took place. - */ - public IgniteFuture loadMissing( - GridCacheContext cacheCtx, - boolean async, - Collection keys, - boolean deserializePortable, - IgniteBiInClosure c); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java deleted file mode 100644 index 7645f75..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java +++ /dev/null @@ -1,2212 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; -import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; -import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.gridgain.grid.kernal.processors.timeout.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; -import static org.gridgain.grid.util.GridConcurrentFactory.*; - -/** - * Cache transaction manager. - */ -public class GridCacheTxManager extends GridCacheSharedManagerAdapter { - /** Default maximum number of transactions that have completed. */ - private static final int DFLT_MAX_COMPLETED_TX_CNT = 262144; // 2^18 - - /** Slow tx warn timeout (initialized to 0). */ - private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(GG_SLOW_TX_WARN_TIMEOUT, 0); - - /** Tx salvage timeout (default 3s). */ - private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(GG_TX_SALVAGE_TIMEOUT, 100); - - /** Committing transactions. */ - private final ThreadLocal threadCtx = new GridThreadLocalEx<>(); - - /** Per-thread transaction map. */ - private final ConcurrentMap> threadMap = newMap(); - - /** Per-ID map. */ - private final ConcurrentMap> idMap = newMap(); - - /** Per-ID map for near transactions. */ - private final ConcurrentMap> nearIdMap = newMap(); - - /** TX handler. */ - private GridCacheTxHandler txHandler; - - /** All transactions. */ - private final Queue> committedQ = new ConcurrentLinkedDeque8<>(); - - /** Preparing transactions. */ - private final Queue> prepareQ = new ConcurrentLinkedDeque8<>(); - - /** Minimum start version. */ - private final ConcurrentNavigableMap startVerCnts = - new ConcurrentSkipListMap<>(); - - /** Committed local transactions. */ - private final GridBoundedConcurrentOrderedSet committedVers = - new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); - - /** Rolled back local transactions. */ - private final NavigableSet rolledbackVers = - new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); - - /** Pessimistic commit buffer. */ - private GridCacheTxCommitBuffer pessimisticRecoveryBuf; - - /** Transaction synchronizations. */ - private final Collection syncs = - new GridConcurrentHashSet<>(); - - /** Transaction finish synchronizer. */ - private GridCacheTxFinishSync txFinishSync; - - /** For test purposes only. */ - private boolean finishSyncDisabled; - - /** Slow tx warn timeout. */ - private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT; - - /** - * Near version to DHT version map. Note that we initialize to 5K size from get go, - * to avoid future map resizings. - */ - private final ConcurrentMap mappedVers = - new ConcurrentHashMap8<>(5120); - - /** {@inheritDoc} */ - @Override protected void onKernalStart0() { - cctx.gridEvents().addLocalEventListener( - new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - assert evt instanceof IgniteDiscoveryEvent; - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - - cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(discoEvt.eventNode().id())); - - if (txFinishSync != null) - txFinishSync.onNodeLeft(discoEvt.eventNode().id()); - } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT); - - for (GridCacheTxEx tx : idMap.values()) { - if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) { - if (log.isDebugEnabled()) - log.debug("Remaining transaction from left node: " + tx); - - salvageTx(tx, true, USER_FINISH); - } - } - } - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - pessimisticRecoveryBuf = new GridCachePerThreadTxCommitBuffer<>(cctx); - - txFinishSync = new GridCacheTxFinishSync<>(cctx); - - txHandler = new GridCacheTxHandler<>(cctx); - } - - /** - * @return TX handler. - */ - public GridCacheTxHandler txHandler() { - return txHandler; - } - - /** - * Invalidates transaction. - * - * @param tx Transaction. - * @return {@code True} if transaction was salvaged by this call. - */ - public boolean salvageTx(GridCacheTxEx tx) { - return salvageTx(tx, false, USER_FINISH); - } - - /** - * Invalidates transaction. - * - * @param tx Transaction. - * @param warn {@code True} if warning should be logged. - * @param status Finalization status. - * @return {@code True} if transaction was salvaged by this call. - */ - private boolean salvageTx(GridCacheTxEx tx, boolean warn, GridCacheTxEx.FinalizationStatus status) { - assert tx != null; - - IgniteTxState state = tx.state(); - - if (state == ACTIVE || state == PREPARING || state == PREPARED) { - try { - if (!tx.markFinalizing(status)) { - if (log.isDebugEnabled()) - log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); - - return false; - } - - tx.systemInvalidate(true); - - tx.prepare(); - - if (tx.state() == PREPARING) { - if (log.isDebugEnabled()) - log.debug("Ignoring transaction in PREPARING state as it is currently handled " + - "by another thread: " + tx); - - return false; - } - - if (tx instanceof GridCacheTxRemoteEx) { - GridCacheTxRemoteEx rmtTx = (GridCacheTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), Collections.emptyList(), - Collections.emptyList(), Collections.emptyList()); - } - - tx.commit(); - - if (warn) { - // This print out cannot print any peer-deployed entity either - // directly or indirectly. - U.warn(log, "Invalidated transaction because originating node either " + - "crashed or left grid: " + CU.txString(tx)); - } - } - catch (IgniteTxOptimisticException ignore) { - if (log.isDebugEnabled()) - log.debug("Optimistic failure while invalidating transaction (will rollback): " + - tx.xidVersion()); - - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to invalidate transaction: " + tx, e); - } - } - else if (state == MARKED_ROLLBACK) { - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); - } - } - - return true; - } - - /** - * Prints out memory stats to standard out. - *

- * USE ONLY FOR MEMORY PROFILING DURING TESTS. - */ - @Override public void printMemoryStats() { - GridCacheTxEx firstTx = committedQ.peek(); - - int committedSize = committedQ.size(); - - Map.Entry startVerEntry = startVerCnts.firstEntry(); - - GridCacheVersion minStartVer = null; - long dur = 0; - - if (committedSize > 3000) { - minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0); - - GridCacheTxEx stuck = null; - - for (GridCacheTxEx tx : txs()) - if (tx.startVersion().isLess(minStartVer)) { - minStartVer = tx.startVersion(); - dur = U.currentTimeMillis() - tx.startTime(); - - stuck = tx; - } - - X.println("Stuck transaction: " + stuck); - } - - X.println(">>> "); - X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']'); - X.println(">>> threadMapSize: " + threadMap.size()); - X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer + ", dur=" + dur + "ms]"); - X.println(">>> committedQueue [size=" + committedSize + - ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion()) + - ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']'); - X.println(">>> prepareQueueSize: " + prepareQ.size()); - X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + - ", firstVer=" + startVerEntry + ']'); - X.println(">>> committedVersSize: " + committedVers.size()); - X.println(">>> rolledbackVersSize: " + rolledbackVers.size()); - - if (pessimisticRecoveryBuf != null) - X.println(">>> pessimsticCommitBufSize: " + pessimisticRecoveryBuf.size()); - } - - /** - * @return Thread map size. - */ - public int threadMapSize() { - return threadMap.size(); - } - - /** - * @return ID map size. - */ - public int idMapSize() { - return idMap.size(); - } - - /** - * @return Committed queue size. - */ - public int commitQueueSize() { - return committedQ.size(); - } - - /** - * @return Prepare queue size. - */ - public int prepareQueueSize() { - return prepareQ.size(); - } - - /** - * @return Start version counts. - */ - public int startVersionCountsSize() { - return startVerCnts.size(); - } - - /** - * @return Committed versions size. - */ - public int committedVersionsSize() { - return committedVers.size(); - } - - /** - * @return Rolled back versions size. - */ - public int rolledbackVersionsSize() { - return rolledbackVers.size(); - } - - /** - * - * @param tx Transaction to check. - * @return {@code True} if transaction has been committed or rolled back, - * {@code false} otherwise. - */ - public boolean isCompleted(GridCacheTxEx tx) { - return committedVers.contains(tx.xidVersion()) || rolledbackVers.contains(tx.xidVersion()); - } - - /** - * @param implicit {@code True} if transaction is implicit. - * @param implicitSingle Implicit-with-single-key flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout transaction timeout. - * @param txSize Expected transaction size. - * @param grpLockKey Group lock key if this is a group-lock transaction. - * @param partLock {@code True} if partition is locked. - * @return New transaction. - */ - public GridCacheTxLocalAdapter newTx( - boolean implicit, - boolean implicitSingle, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - boolean invalidate, - boolean storeEnabled, - int txSize, - @Nullable GridCacheTxKey grpLockKey, - boolean partLock) { - UUID subjId = null; // TODO GG-9141 how to get subj ID? - - int taskNameHash = cctx.kernalContext().job().currentTaskNameHash(); - - GridNearTxLocal tx = new GridNearTxLocal<>( - cctx, - implicit, - implicitSingle, - sys, - concurrency, - isolation, - timeout, - invalidate, - storeEnabled, - txSize, - grpLockKey, - partLock, - subjId, - taskNameHash); - - return onCreated(tx); - } - - /** - * @param tx Created transaction. - * @return Started transaction. - */ - @Nullable public > T onCreated(T tx) { - ConcurrentMap> txIdMap = transactionMap(tx); - - // Start clean. - txContextReset(); - - if (isCompleted(tx)) { - if (log.isDebugEnabled()) - log.debug("Attempt to create a completed transaction (will ignore): " + tx); - - return null; - } - - GridCacheTxEx t; - - if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) { - // Add both, explicit and implicit transactions. - // Do not add remote and dht local transactions as remote node may have the same thread ID - // and overwrite local transaction. - if (tx.local() && !tx.dht()) - threadMap.put(tx.threadId(), tx); - - // Handle mapped versions. - if (tx instanceof GridCacheMappedVersion) { - GridCacheMappedVersion mapped = (GridCacheMappedVersion)tx; - - GridCacheVersion from = mapped.mappedVersion(); - - if (from != null) - mappedVers.put(from, tx.xidVersion()); - - if (log.isDebugEnabled()) - log.debug("Added transaction version mapping [from=" + from + ", to=" + tx.xidVersion() + - ", tx=" + tx + ']'); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Attempt to create an existing transaction (will ignore) [newTx=" + tx + ", existingTx=" + - t + ']'); - - return null; - } - - if (cctx.txConfig().isTxSerializableEnabled()) { - AtomicInt next = new AtomicInt(1); - - boolean loop = true; - - while (loop) { - AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next); - - if (prev == null) - break; // Put succeeded - exit. - - // Previous value was 0, which means that it will be deleted - // by another thread in "decrementStartVersionCount(..)" method. - // In that case, we delete here too, so we can safely try again. - for (;;) { - int p = prev.get(); - - assert p >= 0 : p; - - if (p == 0) { - if (startVerCnts.remove(tx.startVersion(), prev)) - if (log.isDebugEnabled()) - log.debug("Removed count from onCreated callback: " + tx); - - break; // Retry outer loop. - } - - if (prev.compareAndSet(p, p + 1)) { - loop = false; // Increment succeeded - exit outer loop. - - break; - } - } - } - } - - if (tx.timeout() > 0) { - cctx.time().addTimeoutObject(tx); - - if (log.isDebugEnabled()) - log.debug("Registered transaction with timeout processor: " + tx); - } - - if (log.isDebugEnabled()) - log.debug("Transaction created: " + tx); - - return tx; - } - - /** - * Creates a future that will wait for all ongoing transactions that maybe affected by topology update - * to be finished. This set of transactions include - *

    - *
  • All {@link IgniteTxConcurrency#PESSIMISTIC} transactions with topology version - * less or equal to {@code topVer}. - *
  • {@link IgniteTxConcurrency#OPTIMISTIC} transactions in PREPARING state with topology - * version less or equal to {@code topVer} and having transaction key with entry that belongs to - * one of partitions in {@code parts}. - *
- * - * @param topVer Topology version. - * @return Future that will be completed when all ongoing transactions are finished. - */ - public IgniteFuture finishTxs(long topVer) { - GridCompoundFuture res = - new GridCompoundFuture<>(context().kernalContext(), - new IgniteReducer() { - @Override public boolean collect(IgniteTx e) { - return true; - } - - @Override public Boolean reduce() { - return true; - } - }); - - for (GridCacheTxEx tx : txs()) { - // Must wait for all transactions, even for DHT local and DHT remote since preloading may acquire - // values pending to be overwritten by prepared transaction. - - if (tx.concurrency() == PESSIMISTIC) { - if (tx.topologyVersion() > 0 && tx.topologyVersion() < topVer) - // For PESSIMISTIC mode we must wait for all uncompleted txs - // as we do not know in advance which keys will participate in tx. - res.add(tx.finishFuture()); - } - else if (tx.concurrency() == OPTIMISTIC) { - // For OPTIMISTIC mode we wait only for txs in PREPARING state that - // have keys for given partitions. - IgniteTxState state = tx.state(); - long txTopVer = tx.topologyVersion(); - - if ((state == PREPARING || state == PREPARED || state == COMMITTING) - && txTopVer > 0 && txTopVer < topVer) { - res.add(tx.finishFuture()); - } - } - } - - res.markInitialized(); - - return res; - } - - /** - * Transaction start callback (has to do with when any operation was - * performed on this transaction). - * - * @param tx Started transaction. - * @return {@code True} if transaction is not in completed set. - */ - public boolean onStarted(GridCacheTxEx tx) { - assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() + - ", tx=" + tx + ']'; - - if (isCompleted(tx)) { - if (log.isDebugEnabled()) - log.debug("Attempt to start a completed transaction (will ignore): " + tx); - - return false; - } - - onTxStateChange(null, ACTIVE, tx); - - if (log.isDebugEnabled()) - log.debug("Transaction started: " + tx); - - return true; - } - - /** - * Reverse mapped version look up. - * - * @param dhtVer Dht version. - * @return Near version. - */ - @Nullable public GridCacheVersion nearVersion(GridCacheVersion dhtVer) { - GridCacheTxEx tx = idMap.get(dhtVer); - - if (tx != null) - return tx.nearXidVersion(); - - return null; - } - - /** - * @param from Near version. - * @return DHT version for a near version. - */ - public GridCacheVersion mappedVersion(GridCacheVersion from) { - GridCacheVersion to = mappedVers.get(from); - - if (log.isDebugEnabled()) - log.debug("Found mapped version [from=" + from + ", to=" + to); - - return to; - } - - /** - * - * @param ver Alternate version. - * @param tx Transaction. - */ - public void addAlternateVersion(GridCacheVersion ver, GridCacheTxEx tx) { - if (idMap.putIfAbsent(ver, tx) == null) - if (log.isDebugEnabled()) - log.debug("Registered alternate transaction version [ver=" + ver + ", tx=" + tx + ']'); - } - - /** - * @return Local transaction. - */ - @SuppressWarnings({"unchecked"}) - @Nullable public T localTx() { - GridCacheTxEx tx = tx(); - - return tx != null && tx.local() ? (T)tx : null; - } - - /** - * @return Transaction for current thread. - */ - @SuppressWarnings({"unchecked"}) - public T threadLocalTx() { - GridCacheTxEx tx = tx(Thread.currentThread().getId()); - - return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null; - } - - /** - * @return Transaction for current thread. - */ - @SuppressWarnings({"unchecked", "RedundantCast"}) - public T tx() { - GridCacheTxEx tx = txContext(); - - return tx != null ? (T)tx : (T)tx(Thread.currentThread().getId()); - } - - /** - * @return Local transaction. - */ - @Nullable public GridCacheTxEx localTxx() { - GridCacheTxEx tx = txx(); - - return tx != null && tx.local() ? tx : null; - } - - /** - * @return Transaction for current thread. - */ - @SuppressWarnings({"unchecked"}) - public GridCacheTxEx txx() { - return tx(); - } - - /** - * @return User transaction for current thread. - */ - @Nullable public IgniteTx userTx() { - GridCacheTxEx tx = txContext(); - - if (tx != null && tx.user() && tx.state() == ACTIVE) - return tx; - - tx = tx(Thread.currentThread().getId()); - - return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; - } - - /** - * @return User transaction. - */ - @SuppressWarnings({"unchecked"}) - @Nullable public > T userTxx() { - return (T)userTx(); - } - - /** - * @param threadId Id of thread for transaction. - * @return Transaction for thread with given ID. - */ - @SuppressWarnings({"unchecked"}) - public T tx(long threadId) { - return (T)threadMap.get(threadId); - } - - /** - * @return {@code True} if current thread is currently within transaction. - */ - public boolean inUserTx() { - return userTx() != null; - } - - /** - * @param txId Transaction ID. - * @return Transaction with given ID. - */ - @SuppressWarnings({"unchecked"}) - @Nullable public > T tx(GridCacheVersion txId) { - return (T)idMap.get(txId); - } - - /** - * @param txId Transaction ID. - * @return Transaction with given ID. - */ - @SuppressWarnings({"unchecked"}) - @Nullable public > T nearTx(GridCacheVersion txId) { - return (T)nearIdMap.get(txId); - } - - /** - * @param txId Transaction ID. - * @return Transaction with given ID. - */ - @Nullable public GridCacheTxEx txx(GridCacheVersion txId) { - return idMap.get(txId); - } - - /** - * Handles prepare stage of 2PC. - * - * @param tx Transaction to prepare. - * @throws IgniteCheckedException If preparation failed. - */ - public void prepareTx(GridCacheTxEx tx) throws IgniteCheckedException { - if (tx.state() == MARKED_ROLLBACK) { - if (tx.timedOut()) - throw new IgniteTxTimeoutException("Transaction timed out: " + this); - - throw new IgniteCheckedException("Transaction is marked for rollback: " + tx); - } - - if (tx.remainingTime() == 0) { - tx.setRollbackOnly(); - - throw new IgniteTxTimeoutException("Transaction timed out: " + this); - } - - boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); - - // Clean up committed transactions queue. - if (tx.pessimistic()) { - if (tx.enforceSerializable() && txSerializableEnabled) { - for (Iterator> it = committedQ.iterator(); it.hasNext();) { - GridCacheTxEx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) - it.remove(); - } - } - - // Nothing else to do in pessimistic mode. - return; - } - - if (txSerializableEnabled && tx.optimistic() && tx.enforceSerializable()) { - Set> readSet = tx.readSet(); - Set> writeSet = tx.writeSet(); - - GridCacheVersion startTn = tx.startVersion(); - - GridCacheVersion finishTn = cctx.versions().last(); - - // Add future to prepare queue only on first prepare call. - if (tx.markPreparing()) - prepareQ.offer(tx); - - // Check that our read set does not intersect with write set - // of all transactions that completed their write phase - // while our transaction was in read phase. - for (Iterator> it = committedQ.iterator(); it.hasNext();) { - GridCacheTxEx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) { - it.remove(); - - continue; - } - - GridCacheVersion tn = committedTx.endVersion(); - - // We only care about transactions - // with tn > startTn and tn <= finishTn - if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0) - continue; - - if (tx.serializable()) { - if (GridFunc.intersects(committedTx.writeSet(), readSet)) { - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticException("Failed to prepare transaction " + - "(committed vs. read-set conflict): " + tx); - } - } - } - - // Check that our read and write sets do not intersect with write - // sets of all active transactions. - for (Iterator> iter = prepareQ.iterator(); iter.hasNext();) { - GridCacheTxEx prepareTx = iter.next(); - - if (prepareTx == tx) - // Skip yourself. - continue; - - // Optimistically remove completed transactions. - if (prepareTx.done()) { - iter.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed finished transaction from active queue: " + prepareTx); - - continue; - } - - // Check if originating node left. - if (cctx.discovery().node(prepareTx.nodeId()) == null) { - iter.remove(); - - rollbackTx(prepareTx); - - if (log.isDebugEnabled()) - log.debug("Removed and rolled back transaction because sender node left grid: " + - CU.txString(prepareTx)); - - continue; - } - - if (tx.serializable() && !prepareTx.isRollbackOnly()) { - Set> prepareWriteSet = prepareTx.writeSet(); - - if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { - // Remove from active set. - iter.remove(); - - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticException( - "Failed to prepare transaction (read-set/write-set conflict): " + tx); - } - } - } - } - - // Optimistic. - assert tx.optimistic(); - - if (!lockMultiple(tx, tx.optimisticLockEntries())) { - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticException("Failed to prepare transaction (lock conflict): " + tx); - } - } - - /** - * @param tx Transaction to check. - * @return {@code True} if transaction can be discarded. - */ - private boolean isSafeToForget(GridCacheTxEx tx) { - Map.Entry e = startVerCnts.firstEntry(); - - if (e == null) - return true; - - assert e.getValue().get() >= 0; - - return tx.endVersion().compareTo(e.getKey()) <= 0; - } - - /** - * Decrement start version count. - * - * @param tx Cache transaction. - */ - private void decrementStartVersionCount(GridCacheTxEx tx) { - AtomicInt cnt = startVerCnts.get(tx.startVersion()); - - assert cnt != null : "Failed to find start version count for transaction [startVerCnts=" + startVerCnts + - ", tx=" + tx + ']'; - - assert cnt.get() > 0; - - if (cnt.decrementAndGet() == 0) - if (startVerCnts.remove(tx.startVersion(), cnt)) - if (log.isDebugEnabled()) - log.debug("Removed start version for transaction: " + tx); - } - - /** - * @param tx Transaction. - */ - private void removeObsolete(GridCacheTxEx tx) { - Collection> entries = (tx.local() && !tx.dht()) ? tx.allEntries() : tx.writeEntries(); - - for (GridCacheTxEntry entry : entries) { - GridCacheEntryEx cached = entry.cached(); - - GridCacheContext cacheCtx = entry.context(); - - if (cached == null) - cached = cacheCtx.cache().peekEx(entry.key()); - - if (cached.detached()) - continue; - - try { - if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion())) - cacheCtx.cache().removeEntry(cached); - - if (!tx.near() && isNearEnabled(cacheCtx)) { - GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); - - GridNearCacheEntry e = near.peekExx(entry.key()); - - if (e != null && e.markObsoleteIfEmpty(tx.xidVersion())) - near.removeEntry(e); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to remove obsolete entry from cache: " + cached, e); - } - } - } - - /** - * @param c Collection to copy. - * @return Copy of the collection. - */ - private Collection copyOf(Iterable c) { - Collection l = new LinkedList<>(); - - for (GridCacheVersion v : c) - l.add(v); - - return l; - } - - /** - * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). - */ - public Collection committedVersions(GridCacheVersion min) { - Set set = committedVers.tailSet(min, true); - - return set == null || set.isEmpty() ? Collections.emptyList() : copyOf(set); - } - - /** - * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). - */ - public Collection rolledbackVersions(GridCacheVersion min) { - Set set = rolledbackVers.tailSet(min, true); - - return set == null || set.isEmpty() ? Collections.emptyList() : copyOf(set); - } - - /** - * @param tx Tx to remove. - */ - public void removeCommittedTx(GridCacheTxEx tx) { - committedVers.remove(tx.xidVersion()); - } - - /** - * @param tx Committed transaction. - * @return If transaction was not already present in committed set. - */ - public boolean addCommittedTx(GridCacheTxEx tx) { - return addCommittedTx(tx.xidVersion(), tx.nearXidVersion()); - } - - /** - * @param tx Committed transaction. - * @return If transaction was not already present in committed set. - */ - public boolean addRolledbackTx(GridCacheTxEx tx) { - return addRolledbackTx(tx.xidVersion()); - } - - /** - * @param xidVer Completed transaction version. - * @param nearXidVer Optional near transaction ID. - * @return If transaction was not already present in completed set. - */ - public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { - assert !rolledbackVers.contains(xidVer) : "Version was rolled back: " + xidVer; - - if (nearXidVer != null) - xidVer = new CommittedVersion(xidVer, nearXidVer); - - if (committedVers.add(xidVer)) { - if (log.isDebugEnabled()) - log.debug("Added transaction to committed version set: " + xidVer); - - return true; - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in committed version set: " + xidVer); - - return false; - } - } - - /** - * @param xidVer Completed transaction version. - * @return If transaction was not already present in completed set. - */ - public boolean addRolledbackTx(GridCacheVersion xidVer) { - assert !committedVers.contains(xidVer); - - if (rolledbackVers.add(xidVer)) { - if (log.isDebugEnabled()) - log.debug("Added transaction to rolled back version set: " + xidVer); - - return true; - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in rolled back version set: " + xidVer); - - return false; - } - } - - /** - * @param tx Transaction. - */ - private void processCompletedEntries(GridCacheTxEx tx) { - if (tx.needsCompletedVersions()) { - GridCacheVersion min = minVersion(tx.readEntries(), tx.xidVersion(), tx); - - min = minVersion(tx.writeEntries(), min, tx); - - assert min != null; - - tx.completedVersions(min, committedVersions(min), rolledbackVersions(min)); - } - } - - /** - * Collects versions for all pending locks for all entries within transaction - * - * @param dhtTxLoc Transaction being committed. - */ - private void collectPendingVersions(GridDhtTxLocal dhtTxLoc) { - if (dhtTxLoc.needsCompletedVersions()) { - if (log.isDebugEnabled()) - log.debug("Checking for pending locks with version less then tx version: " + dhtTxLoc); - - Set vers = new LinkedHashSet<>(); - - collectPendingVersions(dhtTxLoc.readEntries(), dhtTxLoc.xidVersion(), vers); - collectPendingVersions(dhtTxLoc.writeEntries(), dhtTxLoc.xidVersion(), vers); - - if (!vers.isEmpty()) - dhtTxLoc.pendingVersions(vers); - } - } - - /** - * Gets versions of all not acquired locks for collection of tx entries that are less then base version. - * - * @param entries Tx entries to process. - * @param baseVer Base version to compare with. - * @param vers Collection of versions that will be populated. - */ - @SuppressWarnings("TypeMayBeWeakened") - private void collectPendingVersions(Iterable> entries, - GridCacheVersion baseVer, Set vers) { - - // The locks are not released yet, so we can safely list pending candidates versions. - for (GridCacheTxEntry txEntry : entries) { - GridCacheEntryEx cached = txEntry.cached(); - - try { - // If check should be faster then exception handling. - if (!cached.obsolete()) { - for (GridCacheMvccCandidate cand : cached.localCandidates()) { - if (!cand.owner() && cand.version().compareTo(baseVer) < 0) { - if (log.isDebugEnabled()) - log.debug("Adding candidate version to pending set: " + cand); - - vers.add(cand.version()); - } - } - } - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("There are no pending locks for entry (entry was deleted in transaction): " + txEntry); - } - } - } - - /** - * Go through all candidates for entries involved in transaction and find their min - * version. We know that these candidates will commit after this transaction, and - * therefore we can grab the min version so we can send all committed and rolled - * back versions from min to current to remote nodes for re-ordering. - * - * @param entries Entries. - * @param min Min version so far. - * @param tx Transaction. - * @return Minimal available version. - */ - private GridCacheVersion minVersion(Iterable> entries, GridCacheVersion min, - GridCacheTxEx tx) { - for (GridCacheTxEntry txEntry : entries) { - GridCacheEntryEx cached = txEntry.cached(); - - // We are assuming that this method is only called on commit. In that - // case, if lock is held, entry can never be removed. - assert txEntry.isRead() || !cached.obsolete(tx.xidVersion()) : - "Invalid obsolete version for transaction [entry=" + cached + ", tx=" + tx + ']'; - - for (GridCacheMvccCandidate cand : cached.remoteMvccSnapshot()) - if (min == null || cand.version().isLess(min)) - min = cand.version(); - } - - return min; - } - - /** - * Commits a transaction. - * - * @param tx Transaction to commit. - */ - public void commitTx(GridCacheTxEx tx) { - assert tx != null; - assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() + - ", expected=COMMITTING, tx=" + tx + ']'; - - if (log.isDebugEnabled()) - log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); - - if (tx.timeout() > 0) { - cctx.time().removeTimeoutObject(tx); - - if (log.isDebugEnabled()) - log.debug("Unregistered transaction with timeout processor: " + tx); - } - - /* - * Note that write phase is handled by transaction adapter itself, - * so we don't do it here. - */ - - // 1. Make sure that committed version has been recorded. - if (!(committedVers.contains(tx.xidVersion()) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { - uncommitTx(tx); - - throw new IgniteException("Missing commit version (consider increasing " + - GG_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + - committedVers.firstx() + ", lastVer=" + committedVers.lastx() + ", tx=" + tx.xid() + ']'); - } - - ConcurrentMap> txIdMap = transactionMap(tx); - - if (txIdMap.remove(tx.xidVersion(), tx)) { - // 2. Must process completed entries before unlocking! - processCompletedEntries(tx); - - if (tx instanceof GridDhtTxLocal) { - GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx; - - collectPendingVersions(dhtTxLoc); - } - - // 3.1 Call dataStructures manager. - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.dataStructures().onTxCommitted(tx); - - // 3.2 Add to pessimistic commit buffer if needed. - addPessimisticRecovery(tx); - - // 4. Unlock write resources. - if (tx.groupLock()) - unlockGroupLocks(tx); - else - unlockMultiple(tx, tx.writeEntries()); - - // 5. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted() && !tx.groupLock()) - unlockMultiple(tx, tx.readEntries()); - - // 6. Notify evictions. - notifyEvitions(tx); - - // 7. Remove obsolete entries from cache. - removeObsolete(tx); - - // 8. Assign transaction number at the end of transaction. - tx.endVersion(cctx.versions().next(tx.topologyVersion())); - - // 9. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 10. Add to committed queue only if it is possible - // that this transaction can affect other ones. - if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable() && !isSafeToForget(tx)) - committedQ.add(tx); - - // 11. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); - - // 12. Unregister explicit locks. - if (!tx.alternateVersions().isEmpty()) { - for (GridCacheVersion ver : tx.alternateVersions()) - idMap.remove(ver); - } - - // 13. Remove Near-2-DHT mappings. - if (tx instanceof GridCacheMappedVersion) { - GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); - - if (mapped != null) - mappedVers.remove(mapped); - } - - // 14. Clear context. - txContextReset(); - - // 15. Update metrics. - if (!tx.dht() && tx.local()) { - cctx.txMetrics().onTxCommit(); - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - cacheCtx.cache().metrics0().onTxCommit(); - } - } - - if (slowTxWarnTimeout > 0 && tx.local() && - U.currentTimeMillis() - tx.startTime() > slowTxWarnTimeout) - U.warn(log, "Slow transaction detected [tx=" + tx + - ", slowTxWarnTimeout=" + slowTxWarnTimeout + ']') ; - - if (log.isDebugEnabled()) - log.debug("Committed from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); - } - else if (log.isDebugEnabled()) - log.debug("Did not commit from TM (was already committed): " + tx); - } - - /** - * Rolls back a transaction. - * - * @param tx Transaction to rollback. - */ - public void rollbackTx(GridCacheTxEx tx) { - assert tx != null; - - if (log.isDebugEnabled()) - log.debug("Rolling back from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); - - // 1. Record transaction version to avoid duplicates. - addRolledbackTx(tx); - - ConcurrentMap> txIdMap = transactionMap(tx); - - if (txIdMap.remove(tx.xidVersion(), tx)) { - // 2. Unlock write resources. - unlockMultiple(tx, tx.writeEntries()); - - // 3. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) - unlockMultiple(tx, tx.readEntries()); - - // 4. Notify evictions. - notifyEvitions(tx); - - // 5. Remove obsolete entries. - removeObsolete(tx); - - // 6. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 7. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); - - // 8. Unregister explicit locks. - if (!tx.alternateVersions().isEmpty()) - for (GridCacheVersion ver : tx.alternateVersions()) - idMap.remove(ver); - - // 9. Remove Near-2-DHT mappings. - if (tx instanceof GridCacheMappedVersion) - mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - - // 10. Clear context. - txContextReset(); - - // 11. Update metrics. - if (!tx.dht() && tx.local()) { - cctx.txMetrics().onTxRollback(); - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - cacheCtx.cache().metrics0().onTxRollback(); - } - } - - if (log.isDebugEnabled()) - log.debug("Rolled back from TM: " + tx); - } - else if (log.isDebugEnabled()) - log.debug("Did not rollback from TM (was already rolled back): " + tx); - } - - /** - * Tries to minimize damage from partially-committed transaction. - * - * @param tx Tx to uncommit. - */ - public void uncommitTx(GridCacheTxEx tx) { - assert tx != null; - - if (log.isDebugEnabled()) - log.debug("Uncommiting from TM: " + tx); - - ConcurrentMap> txIdMap = transactionMap(tx); - - if (txIdMap.remove(tx.xidVersion(), tx)) { - // 1. Unlock write resources. - unlockMultiple(tx, tx.writeEntries()); - - // 2. For pessimistic transaction, unlock read resources if required. - if (tx.pessimistic() && !tx.readCommitted()) - unlockMultiple(tx, tx.readEntries()); - - // 3. Notify evictions. - notifyEvitions(tx); - - // 4. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 5. Remove from per-thread storage. - if (tx.local() && !tx.dht()) - threadMap.remove(tx.threadId(), tx); - - // 6. Unregister explicit locks. - if (!tx.alternateVersions().isEmpty()) - for (GridCacheVersion ver : tx.alternateVersions()) - idMap.remove(ver); - - // 7. Remove Near-2-DHT mappings. - if (tx instanceof GridCacheMappedVersion) - mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - - // 8. Clear context. - txContextReset(); - - if (log.isDebugEnabled()) - log.debug("Uncommitted from TM: " + tx); - } - else if (log.isDebugEnabled()) - log.debug("Did not uncommit from TM (was already committed or rolled back): " + tx); - } - - /** - * Gets transaction ID map depending on transaction type. - * - * @param tx Transaction. - * @return Transaction map. - */ - private ConcurrentMap> transactionMap(GridCacheTxEx tx) { - return (tx.near() && !tx.local()) ? nearIdMap : idMap; - } - - /** - * @param tx Transaction to notify evictions for. - */ - private void notifyEvitions(GridCacheTxEx tx) { - if (tx.internal() && !tx.groupLock()) - return; - - for (GridCacheTxEntry txEntry : tx.allEntries()) - txEntry.cached().context().evicts().touch(txEntry, tx.local()); - } - - /** - * Callback invoked whenever a member of a transaction acquires - * lock ownership. - * - * @param entry Cache entry. - * @param owner Candidate that won ownership. - * @return {@code True} if transaction was notified, {@code false} otherwise. - */ - public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - // We only care about acquired locks. - if (owner != null) { - GridCacheTxAdapter tx = tx(owner.version()); - - if (tx == null) - tx = nearTx(owner.version()); - - if (tx != null) { - if (!tx.local()) { - if (log.isDebugEnabled()) - log.debug("Found transaction for owner changed event [owner=" + owner + ", entry=" + entry + - ", tx=" + tx + ']'); - - tx.onOwnerChanged(entry, owner); - - return true; - } - else if (log.isDebugEnabled()) - log.debug("Ignoring local transaction for owner change event: " + tx); - } - else if (log.isDebugEnabled()) - log.debug("Transaction not found for owner changed event [owner=" + owner + ", entry=" + entry + ']'); - } - - return false; - } - - /** - * Callback called by near finish future before sending near finish request to remote node. Will increment - * per-thread counter so that further awaitAck call will wait for finish response. - * - * @param rmtNodeId Remote node ID for which finish request is being sent. - * @param threadId Near tx thread ID. - */ - public void beforeFinishRemote(UUID rmtNodeId, long threadId) { - if (finishSyncDisabled) - return; - - assert txFinishSync != null; - - txFinishSync.onFinishSend(rmtNodeId, threadId); - } - - /** - * Callback invoked when near finish response is received from remote node. - * - * @param rmtNodeId Remote node ID from which response is received. - * @param threadId Near tx thread ID. - */ - public void onFinishedRemote(UUID rmtNodeId, long threadId) { - if (finishSyncDisabled) - return; - - assert txFinishSync != null; - - txFinishSync.onAckReceived(rmtNodeId, threadId); - } - - /** - * Asynchronously waits for last finish request ack. - * - * @param rmtNodeId Remote node ID. - * @param threadId Near tx thread ID. - * @return {@code null} if ack was received or future that will be completed when ack is received. - */ - @Nullable public IgniteFuture awaitFinishAckAsync(UUID rmtNodeId, long threadId) { - if (finishSyncDisabled) - return null; - - assert txFinishSync != null; - - return txFinishSync.awaitAckAsync(rmtNodeId, threadId); - } - - /** - * For test purposes only. - * - * @param finishSyncDisabled {@code True} if finish sync should be disabled. - */ - public void finishSyncDisabled(boolean finishSyncDisabled) { - this.finishSyncDisabled = finishSyncDisabled; - } - - /** - * @param tx Transaction. - * @param entries Entries to lock. - * @return {@code True} if all keys were locked. - * @throws IgniteCheckedException If lock has been cancelled. - */ - private boolean lockMultiple(GridCacheTxEx tx, Iterable> entries) - throws IgniteCheckedException { - assert tx.optimistic(); - - long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); - - // For serializable transactions, failure to acquire lock means - // that there is a serializable conflict. For all other isolation levels, - // we wait for the lock. - long timeout = tx.timeout() == 0 ? 0 : remainingTime; - - for (GridCacheTxEntry txEntry1 : entries) { - // Check if this entry was prepared before. - if (!txEntry1.markPrepared()) - continue; - - GridCacheContext cacheCtx = txEntry1.context(); - - while (true) { - try { - GridCacheEntryEx entry1 = txEntry1.cached(); - - assert !entry1.detached() : "Expected non-detached entry for near transaction " + - "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; - - if (!entry1.tmLock(tx, timeout)) { - // Unlock locks locked so far. - for (GridCacheTxEntry txEntry2 : entries) { - if (txEntry2 == txEntry1) - break; - - txEntry2.cached().txUnlock(tx); - } - - return false; - } - - entry1.unswap(); - - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in TM lockMultiple(..) method (will retry): " + txEntry1); - - try { - // Renew cache entry. - txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key()), txEntry1.keyBytes()); - } - catch (GridDhtInvalidPartitionException e) { - assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" + - tx + ", invalidPart=" + e.partition() + ']'; - - // If partition is invalid, we ignore this entry. - tx.addInvalidPartition(cacheCtx, e.partition()); - - break; - } - } - catch (GridDistributedLockCancelledException ignore) { - tx.setRollbackOnly(); - - throw new IgniteCheckedException("Entry lock has been cancelled for transaction: " + tx); - } - } - } - - return true; - } - - /** - * Unlocks entries locked by group transaction. - * - * @param txx Transaction. - */ - @SuppressWarnings("unchecked") - private void unlockGroupLocks(GridCacheTxEx txx) { - GridCacheTxKey grpLockKey = txx.groupLockKey(); - - assert grpLockKey != null; - - if (grpLockKey == null) - return; - - GridCacheTxEntry txEntry = txx.entry(grpLockKey); - - assert txEntry != null || (txx.near() && !txx.local()); - - if (txEntry != null) { - GridCacheContext cacheCtx = txEntry.context(); - - // Group-locked entries must be locked. - while (true) { - try { - GridCacheEntryEx entry = txEntry.cached(); - - assert entry != null; - - entry.txUnlock(txx); - - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in TM unlockGroupLocks(..) method (will retry): " + txEntry); - - GridCacheAdapter cache = cacheCtx.cache(); - - // Renew cache entry. - txEntry.cached(cache.entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - } - - /** - * @param tx Owning transaction. - * @param entries Entries to unlock. - */ - private void unlockMultiple(GridCacheTxEx tx, Iterable> entries) { - for (GridCacheTxEntry txEntry : entries) { - GridCacheContext cacheCtx = txEntry.context(); - - while (true) { - try { - GridCacheEntryEx entry = txEntry.cached(); - - if (entry.detached()) - break; - - assert entry != null; - - entry.txUnlock(tx); - - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in TM unlockMultiple(..) method (will retry): " + txEntry); - - // Renew cache entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), txEntry.keyBytes()); - } - } - } - } - - /** - * @param sync Transaction synchronizations to add. - */ - public void addSynchronizations(IgniteTxSynchronization... sync) { - if (F.isEmpty(sync)) - return; - - F.copy(syncs, sync); - } - - /** - * @param sync Transaction synchronizations to remove. - */ - public void removeSynchronizations(IgniteTxSynchronization... sync) { - if (F.isEmpty(sync)) - return; - - F.lose(syncs, false, Arrays.asList(sync)); - } - - /** - * @return Registered transaction synchronizations - */ - public Collection synchronizations() { - return Collections.unmodifiableList(new LinkedList<>(syncs)); - } - - /** - * @param prevState Previous state. - * @param newState New state. - * @param tx Cache transaction. - */ - public void onTxStateChange(@Nullable IgniteTxState prevState, IgniteTxState newState, IgniteTx tx) { - // Notify synchronizations. - for (IgniteTxSynchronization s : syncs) - s.onStateChanged(prevState, newState, tx); - } - - /** - * @param tx Committing transaction. - */ - public void txContext(GridCacheTxEx tx) { - threadCtx.set(tx); - } - - /** - * @return Currently committing transaction. - */ - @SuppressWarnings({"unchecked"}) - private GridCacheTxEx txContext() { - return threadCtx.get(); - } - - /** - * Gets version of transaction in tx context or {@code null} - * if tx context is empty. - *

- * This is a convenience method provided mostly for debugging. - * - * @return Transaction version from transaction context. - */ - @Nullable public GridCacheVersion txContextVersion() { - GridCacheTxEx tx = txContext(); - - return tx == null ? null : tx.xidVersion(); - } - - /** - * Commit ended. - */ - public void txContextReset() { - threadCtx.set(null); - } - - /** - * @return All transactions. - */ - public Collection> txs() { - return F.concat(false, idMap.values(), nearIdMap.values()); - } - - /** - * @return Slow tx warn timeout. - */ - public int slowTxWarnTimeout() { - return slowTxWarnTimeout; - } - - /** - * @param slowTxWarnTimeout Slow tx warn timeout. - */ - public void slowTxWarnTimeout(int slowTxWarnTimeout) { - this.slowTxWarnTimeout = slowTxWarnTimeout; - } - - /** - * Checks if transactions with given near version ID was prepared or committed. - * - * @param nearVer Near version ID. - * @param txNum Number of transactions. - * @return {@code True} if transactions were prepared or committed. - */ - public boolean txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) { - Collection processedVers = null; - - for (GridCacheTxEx tx : txs()) { - if (nearVer.equals(tx.nearXidVersion())) { - IgniteTxState state = tx.state(); - - if (state == PREPARED || state == COMMITTING || state == COMMITTED) { - if (--txNum == 0) - return true; - } - else { - if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) { - tx.rollbackAsync(); - - if (log.isDebugEnabled()) - log.debug("Transaction was not prepared (rolled back): " + tx); - - return false; - } - else { - if (tx.state() == COMMITTED) { - if (--txNum == 0) - return true; - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is not prepared: " + tx); - - return false; - } - } - } - - if (processedVers == null) - processedVers = new HashSet<>(txNum, 1.0f); - - processedVers.add(tx.xidVersion()); - } - } - - // Not all transactions were found. Need to scan committed versions to check - // if transaction was already committed. - for (GridCacheVersion ver : committedVers) { - if (processedVers != null && processedVers.contains(ver)) - continue; - - if (ver instanceof CommittedVersion) { - CommittedVersion commitVer = (CommittedVersion)ver; - - if (commitVer.nearVer.equals(nearVer)) { - if (--txNum == 0) - return true; - } - } - } - - return false; - } - - /** - * Adds transaction to pessimistic recovery buffer if needed. - * - * @param tx Committed transaction to add. - */ - private void addPessimisticRecovery(GridCacheTxEx tx) { - if (pessimisticRecoveryBuf == null) - return; - - // Do not store recovery information for optimistic or replicated local transactions. - if (tx.optimistic() || (tx.local() && tx.replicated())) - return; - - pessimisticRecoveryBuf.addCommittedTx(tx); - } - - /** - * Checks whether transaction with given near version was committed on this node and returns commit info. - * - * @param nearTxVer Near tx version. - * @param originatingNodeId Originating node ID. - * @param originatingThreadId Originating thread ID. - * @return Commit info, if present. - */ - @Nullable public GridCacheCommittedTxInfo txCommitted(GridCacheVersion nearTxVer, - UUID originatingNodeId, long originatingThreadId) { - assert pessimisticRecoveryBuf != null : "Should not be called for LOCAL cache."; - - return pessimisticRecoveryBuf.committedTx(nearTxVer, originatingNodeId, originatingThreadId); - } - - /** - * Gets local transaction for pessimistic tx recovery. - * - * @param nearXidVer Near tx ID. - * @return Near local or colocated local transaction. - */ - @Nullable public GridCacheTxEx localTxForRecovery(GridCacheVersion nearXidVer, boolean markFinalizing) { - // First check if we have near transaction with this ID. - GridCacheTxEx tx = idMap.get(nearXidVer); - - if (tx == null) { - // Check all local transactions and mark them as waiting for recovery to prevent finish race. - for (GridCacheTxEx txEx : idMap.values()) { - if (nearXidVer.equals(txEx.nearXidVersion())) { - if (!markFinalizing || !txEx.markFinalizing(RECOVERY_WAIT)) - tx = txEx; - } - } - } - - // Either we found near transaction or one of transactions is being committed by user. - // Wait for it and send reply. - if (tx != null && tx.local()) - return tx; - - return null; - } - - /** - * Commits or rolls back prepared transaction. - * - * @param tx Transaction. - * @param commit Whether transaction should be committed or rolled back. - */ - public void finishOptimisticTxOnRecovery(final GridCacheTxEx tx, boolean commit) { - if (log.isDebugEnabled()) - log.debug("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']'); - - if (!tx.markFinalizing(RECOVERY_FINISH)) { - if (log.isDebugEnabled()) - log.debug("Will not try to commit prepared transaction (could not mark finalized): " + tx); - - return; - } - - if (tx instanceof GridDistributedTxRemoteAdapter) { - GridCacheTxRemoteEx rmtTx = (GridCacheTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), Collections.emptyList(), Collections.emptyList(), - Collections.emptyList()); - } - - if (commit) - tx.commitAsync().listenAsync(new CommitListener(tx)); - else - tx.rollbackAsync(); - } - - /** - * Commits or rolls back pessimistic transaction. - * - * @param tx Transaction to finish. - * @param commitInfo Commit information. - */ - public void finishPessimisticTxOnRecovery(final GridCacheTxEx tx, GridCacheCommittedTxInfo commitInfo) { - if (!tx.markFinalizing(RECOVERY_FINISH)) { - if (log.isDebugEnabled()) - log.debug("Will not try to finish pessimistic transaction (could not mark as finalizing): " + tx); - - return; - } - - if (tx instanceof GridDistributedTxRemoteAdapter) { - GridCacheTxRemoteEx rmtTx = (GridCacheTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), Collections.emptyList(), Collections.emptyList(), - Collections.emptyList()); - } - - try { - tx.prepare(); - - if (commitInfo != null) { - for (GridCacheTxEntry entry : commitInfo.recoveryWrites()) { - GridCacheTxEntry write = tx.writeMap().get(entry.txKey()); - - if (write != null) { - GridCacheEntryEx cached = write.cached(); - - GridCacheTxEntry recovered = entry.cleanCopy(write.context()); - - if (cached == null || cached.detached()) - cached = write.context().cache().entryEx(entry.key(), tx.topologyVersion()); - - recovered.cached(cached, cached.keyBytes()); - - tx.writeMap().put(entry.txKey(), recovered); - - continue; - } - - ((GridCacheTxAdapter)tx).recoveryWrites(commitInfo.recoveryWrites()); - - // If write was not found, check read. - GridCacheTxEntry read = tx.readMap().remove(entry.txKey()); - - if (read != null) - tx.writeMap().put(entry.txKey(), entry); - } - - tx.commitAsync().listenAsync(new CommitListener(tx)); - } - else - tx.rollbackAsync(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to prepare pessimistic transaction (will invalidate): " + tx, e); - - salvageTx(tx); - } - } - - /** - * @param req Check committed request. - * @return Check committed future. - */ - public IgniteFuture> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) { - // First check if we have near transaction with this ID. - GridCacheTxEx tx = localTxForRecovery(req.nearXidVersion(), !req.nearOnlyCheck()); - - // Either we found near transaction or one of transactions is being committed by user. - // Wait for it and send reply. - if (tx != null) { - assert tx.local(); - - if (log.isDebugEnabled()) - log.debug("Found active near transaction, will wait for completion [req=" + req + ", tx=" + tx + ']'); - - final GridCacheTxEx tx0 = tx; - - return tx.finishFuture().chain(new C1, GridCacheCommittedTxInfo>() { - @Override public GridCacheCommittedTxInfo apply(IgniteFuture txFut) { - GridCacheCommittedTxInfo info = null; - - if (tx0.state() == COMMITTED) - info = new GridCacheCommittedTxInfo<>(tx0); - - return info; - } - }); - } - - GridCacheCommittedTxInfo info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), - req.originatingThreadId()); - - if (info == null) - info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), req.originatingThreadId()); - - return new GridFinishedFutureEx<>(info); - } - - /** - * Timeout object for node failure handler. - */ - private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { - /** Left or failed node. */ - private final UUID evtNodeId; - - /** - * @param evtNodeId Event node ID. - */ - private NodeFailureTimeoutObject(UUID evtNodeId) { - super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT); - - this.evtNodeId = evtNodeId; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - try { - cctx.kernalContext().gateway().readLock(); - } - catch (IllegalStateException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to acquire kernal gateway (grid is stopping)."); - - return; - } - - try { - if (log.isDebugEnabled()) - log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() + - ", failedNodeId=" + evtNodeId + ']'); - - for (GridCacheTxEx tx : txs()) { - if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) { - // Invalidate transactions. - salvageTx(tx, false, RECOVERY_FINISH); - } - else if (tx.optimistic()) { - // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. - if (tx.originatingNodeId().equals(evtNodeId)) { - if (tx.state() == PREPARED) - commitIfPrepared(tx); - else { - if (tx.setRollbackOnly()) - tx.rollbackAsync(); - // If we could not mark tx as rollback, it means that transaction is being committed. - } - } - } - else { - // Pessimistic. - if (tx.originatingNodeId().equals(evtNodeId)) { - if (tx.state() != COMMITTING && tx.state() != COMMITTED) - commitIfRemotelyCommitted(tx); - else { - if (log.isDebugEnabled()) - log.debug("Skipping pessimistic transaction check (transaction is being committed) " + - "[tx=" + tx + ", locNodeId=" + cctx.localNodeId() + ']'); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Skipping pessimistic transaction check [tx=" + tx + - ", evtNodeId=" + evtNodeId + ", locNodeId=" + cctx.localNodeId() + ']'); - } - } - } - } - finally { - cctx.kernalContext().gateway().readUnlock(); - } - } - - /** - * Commits optimistic transaction in case when node started transaction failed, but all related - * transactions were prepared (invalidates transaction if it is not fully prepared). - * - * @param tx Transaction. - */ - private void commitIfPrepared(GridCacheTxEx tx) { - assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; - assert !F.isEmpty(tx.transactionNodes()); - assert tx.nearXidVersion() != null; - - - GridCacheOptimisticCheckPreparedTxFuture fut = new GridCacheOptimisticCheckPreparedTxFuture<>( - cctx, tx, evtNodeId, tx.transactionNodes()); - - cctx.mvcc().addFuture(fut); - - if (log.isDebugEnabled()) - log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); - - fut.prepare(); - } - - /** - * Commits pessimistic transaction if at least one of remote nodes has committed this transaction. - * - * @param tx Transaction. - */ - private void commitIfRemotelyCommitted(GridCacheTxEx tx) { - assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; - - GridCachePessimisticCheckCommittedTxFuture fut = new GridCachePessimisticCheckCommittedTxFuture<>( - cctx, tx, evtNodeId); - - cctx.mvcc().addFuture(fut); - - if (log.isDebugEnabled()) - log.debug("Checking pessimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); - - fut.prepare(); - } - } - - /** - * - */ - private static class CommittedVersion extends GridCacheVersion { - /** */ - private static final long serialVersionUID = 0L; - - /** Corresponding near version. Transient. */ - private GridCacheVersion nearVer; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public CommittedVersion() { - // No-op. - } - - /** - * @param ver Committed version. - * @param nearVer Near transaction version. - */ - private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) { - super(ver.topologyVersion(), ver.globalTime(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); - - assert nearVer != null; - - this.nearVer = nearVer; - } - } - - /** - * Atomic integer that compares only using references, not values. - */ - private static final class AtomicInt extends AtomicInteger { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param initVal Initial value. - */ - private AtomicInt(int initVal) { - super(initVal); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - // Reference only. - return obj == this; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return super.hashCode(); - } - } - - /** - * Commit listener. Checks if commit succeeded and rollbacks if case of error. - */ - private class CommitListener implements CI1> { - /** */ - private static final long serialVersionUID = 0L; - - /** Transaction. */ - private final GridCacheTxEx tx; - - /** - * @param tx Transaction. - */ - private CommitListener(GridCacheTxEx tx) { - this.tx = tx; - } - - /** {@inheritDoc} */ - @Override public void apply(IgniteFuture t) { - try { - t.get(); - } - catch (IgniteTxOptimisticException ignore) { - if (log.isDebugEnabled()) - log.debug("Optimistic failure while committing prepared transaction (will rollback): " + - tx); - - tx.rollbackAsync(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to commit transaction during failover: " + tx, e); - } - } - } -}