Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B336310780 for ; Tue, 23 Dec 2014 00:07:01 +0000 (UTC) Received: (qmail 78644 invoked by uid 500); 23 Dec 2014 00:07:01 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 78556 invoked by uid 500); 23 Dec 2014 00:07:01 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 78527 invoked by uid 99); 23 Dec 2014 00:07:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Dec 2014 00:07:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 23 Dec 2014 00:06:36 +0000 Received: (qmail 65271 invoked by uid 99); 22 Dec 2014 23:59:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Dec 2014 23:59:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7664DA33E11; Mon, 22 Dec 2014 23:59:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 23 Dec 2014 00:00:18 -0000 Message-Id: <8ad59c850b544019ad3b21b7ed91bd51@git.apache.org> In-Reply-To: <26feadb5eea944938f598249aa42f8a1@git.apache.org> References: <26feadb5eea944938f598249aa42f8a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java deleted file mode 100644 index f81cdec..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java +++ /dev/null @@ -1,1523 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.gridgain.grid.util.future.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; -import static org.apache.ignite.transactions.IgniteTxState.*; -import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; - -/** - * Managed transaction adapter. - */ -public abstract class GridCacheTxAdapter extends GridMetadataAwareAdapter - implements GridCacheTxEx, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Static logger to avoid re-creation. */ - private static final AtomicReference logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Transaction ID. */ - @GridToStringInclude - protected GridCacheVersion xidVer; - - /** Entries write version. */ - @GridToStringInclude - protected GridCacheVersion writeVer; - - /** Implicit flag. */ - @GridToStringInclude - protected boolean implicit; - - /** Implicit with one key flag. */ - @GridToStringInclude - protected boolean implicitSingle; - - /** Local flag. */ - @GridToStringInclude - protected boolean loc; - - /** Thread ID. */ - @GridToStringInclude - protected long threadId; - - /** Transaction start time. */ - @GridToStringInclude - protected long startTime = U.currentTimeMillis(); - - /** Node ID. */ - @GridToStringInclude - protected UUID nodeId; - - /** Transaction counter value at the start of transaction. */ - @GridToStringInclude - protected GridCacheVersion startVer; - - /** Cache registry. */ - @GridToStringExclude - protected GridCacheSharedContext cctx; - - /** - * End version (a.k.a. 'tnc' or 'transaction number counter') - * assigned to this transaction at the end of write phase. - */ - @GridToStringInclude - protected GridCacheVersion endVer; - - /** Isolation. */ - @GridToStringInclude - protected IgniteTxIsolation isolation = READ_COMMITTED; - - /** Concurrency. */ - @GridToStringInclude - protected IgniteTxConcurrency concurrency = PESSIMISTIC; - - /** Transaction timeout. */ - @GridToStringInclude - protected long timeout; - - /** Invalidate flag. */ - protected volatile boolean invalidate; - - /** Invalidation flag for system invalidations (not user-based ones). */ - private boolean sysInvalidate; - - /** Internal flag. */ - protected boolean internal; - - /** System transaction flag. */ - private boolean sys; - - /** */ - protected boolean onePhaseCommit; - - /** */ - protected boolean syncCommit; - - /** */ - protected boolean syncRollback; - - /** If this transaction contains transform entries. */ - protected boolean transform; - - /** Commit version. */ - private AtomicReference commitVer = new AtomicReference<>(null); - - /** Done marker. */ - protected final AtomicBoolean isDone = new AtomicBoolean(false); - - /** */ - private AtomicReference finalizing = new AtomicReference<>(NONE); - - /** Preparing flag. */ - private AtomicBoolean preparing = new AtomicBoolean(); - - /** */ - private Set invalidParts = new GridLeanSet<>(); - - /** Recover writes. */ - private Collection> recoveryWrites; - - /** - * Transaction state. Note that state is not protected, as we want to - * always use {@link #state()} and {@link #state(IgniteTxState)} - * methods. - */ - @GridToStringInclude - private volatile IgniteTxState state = ACTIVE; - - /** Timed out flag. */ - private volatile boolean timedOut; - - /** */ - protected int txSize; - - /** Group lock key, if any. */ - protected GridCacheTxKey grpLockKey; - - /** */ - @GridToStringExclude - private AtomicReference> finFut = new AtomicReference<>(); - - /** Topology version. */ - private AtomicLong topVer = new AtomicLong(-1); - - /** Mutex. */ - private final Lock lock = new ReentrantLock(); - - /** Lock condition. */ - private final Condition cond = lock.newCondition(); - - /** Subject ID initiated this transaction. */ - protected UUID subjId; - - /** Task name hash code. */ - protected int taskNameHash; - - /** Task name. */ - protected String taskName; - - /** Store used flag. */ - protected boolean storeEnabled = true; - - /** - * Empty constructor required for {@link Externalizable}. - */ - protected GridCacheTxAdapter() { - // No-op. - } - - /** - * @param cctx Cache registry. - * @param xidVer Transaction ID. - * @param implicit Implicit flag. - * @param implicitSingle Implicit with one key flag. - * @param loc Local flag. - * @param sys System transaction flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Transaction size. - * @param grpLockKey Group lock key if this is group-lock transaction. - */ - protected GridCacheTxAdapter( - GridCacheSharedContext cctx, - GridCacheVersion xidVer, - boolean implicit, - boolean implicitSingle, - boolean loc, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - boolean invalidate, - boolean storeEnabled, - int txSize, - @Nullable GridCacheTxKey grpLockKey, - @Nullable UUID subjId, - int taskNameHash - ) { - assert xidVer != null; - assert cctx != null; - - this.cctx = cctx; - this.xidVer = xidVer; - this.implicit = implicit; - this.implicitSingle = implicitSingle; - this.loc = loc; - this.sys = sys; - this.concurrency = concurrency; - this.isolation = isolation; - this.timeout = timeout; - this.invalidate = invalidate; - this.storeEnabled = storeEnabled; - this.txSize = txSize; - this.grpLockKey = grpLockKey; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - - startVer = cctx.versions().last(); - - nodeId = cctx.discovery().localNode().id(); - - threadId = Thread.currentThread().getId(); - - log = U.logger(cctx.kernalContext(), logRef, this); - } - - /** - * @param cctx Cache registry. - * @param nodeId Node ID. - * @param xidVer Transaction ID. - * @param startVer Start version mark. - * @param threadId Thread ID. - * @param sys System transaction flag. - * @param concurrency Concurrency. - * @param isolation Isolation. - * @param timeout Timeout. - * @param txSize Transaction size. - * @param grpLockKey Group lock key if this is group-lock transaction. - */ - protected GridCacheTxAdapter( - GridCacheSharedContext cctx, - UUID nodeId, - GridCacheVersion xidVer, - GridCacheVersion startVer, - long threadId, - boolean sys, - IgniteTxConcurrency concurrency, - IgniteTxIsolation isolation, - long timeout, - int txSize, - @Nullable GridCacheTxKey grpLockKey, - @Nullable UUID subjId, - int taskNameHash - ) { - this.cctx = cctx; - this.nodeId = nodeId; - this.threadId = threadId; - this.xidVer = xidVer; - this.startVer = startVer; - this.sys = sys; - this.concurrency = concurrency; - this.isolation = isolation; - this.timeout = timeout; - this.txSize = txSize; - this.grpLockKey = grpLockKey; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - - implicit = false; - implicitSingle = false; - loc = false; - - log = U.logger(cctx.kernalContext(), logRef, this); - } - - /** - * Acquires lock. - */ - @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) - protected final void lock() { - lock.lock(); - } - - /** - * Releases lock. - */ - protected final void unlock() { - lock.unlock(); - } - - /** - * Signals all waiters. - */ - protected final void signalAll() { - cond.signalAll(); - } - - /** - * Waits for signal. - * - * @throws InterruptedException If interrupted. - */ - protected final void awaitSignal() throws InterruptedException { - cond.await(); - } - - /** - * Checks whether near cache should be updated. - * - * @return Flag indicating whether near cache should be updated. - */ - protected boolean updateNearCache(GridCacheContext cacheCtx, K key, long topVer) { - return false; - } - - /** {@inheritDoc} */ - @Override public Collection> optimisticLockEntries() { - assert optimistic(); - - if (!groupLock()) - return writeEntries(); - else { - if (!F.isEmpty(invalidParts)) { - assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " + - "[tx=" + this + ", invalidParts=" + invalidParts + ']'; - assert groupLockEntry() == null : "Group lock key should be rejected " + - "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']'; - assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " + - "[tx=" + this + ", writes=" + writeMap() + ']'; - - return Collections.emptyList(); - } - - GridCacheTxEntry grpLockEntry = groupLockEntry(); - - assert grpLockEntry != null || (near() && !local()): - "Group lock entry was not enlisted into transaction [tx=" + this + - ", grpLockKey=" + groupLockKey() + ']'; - - return grpLockEntry == null ? - Collections.>emptyList() : - Collections.singletonList(grpLockEntry); - } - } - - /** - * @param recoveryWrites Recover write entries. - */ - public void recoveryWrites(Collection> recoveryWrites) { - this.recoveryWrites = recoveryWrites; - } - - /** - * @return Recover write entries. - */ - @Override public Collection> recoveryWrites() { - return recoveryWrites; - } - - /** {@inheritDoc} */ - @Override public boolean storeEnabled() { - return storeEnabled; - } - - /** - * @param storeEnabled Store enabled flag. - */ - public void storeEnabled(boolean storeEnabled) { - this.storeEnabled = storeEnabled; - } - - /** {@inheritDoc} */ - @Override public boolean system() { - return sys; - } - - /** {@inheritDoc} */ - @Override public boolean storeUsed() { - return storeEnabled() && store() != null; - } - - /** - * Store manager for current transaction. - * - * @return Store manager. - */ - protected GridCacheStoreManager store() { - if (!activeCacheIds().isEmpty()) { - int cacheId = F.first(activeCacheIds()); - - GridCacheStoreManager store = cctx.cacheContext(cacheId).store(); - - return store.configured() ? store : null; - } - - return null; - } - - /** - * This method uses unchecked assignment to cast group lock key entry to transaction generic signature. - * - * @return Group lock tx entry. - */ - @SuppressWarnings("unchecked") - public GridCacheTxEntry groupLockEntry() { - return ((GridCacheTxAdapter)this).entry(groupLockKey()); - } - - /** {@inheritDoc} */ - @Override public UUID otherNodeId() { - return null; - } - - /** {@inheritDoc} */ - @Override public UUID subjectId() { - if (subjId != null) - return subjId; - - return originatingNodeId(); - } - - /** {@inheritDoc} */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** {@inheritDoc} */ - @Override public long topologyVersion() { - long res = topVer.get(); - - if (res == -1) - return cctx.exchange().topologyVersion(); - - return res; - } - - /** {@inheritDoc} */ - @Override public long topologyVersion(long topVer) { - this.topVer.compareAndSet(-1, topVer); - - return this.topVer.get(); - } - - /** {@inheritDoc} */ - @Override public boolean hasTransforms() { - return transform; - } - - /** {@inheritDoc} */ - @Override public boolean markPreparing() { - return preparing.compareAndSet(false, true); - } - - /** - * @return {@code True} if marked. - */ - @Override public boolean markFinalizing(FinalizationStatus status) { - boolean res; - - switch (status) { - case USER_FINISH: - res = finalizing.compareAndSet(NONE, USER_FINISH); - - break; - - case RECOVERY_WAIT: - finalizing.compareAndSet(NONE, RECOVERY_WAIT); - - FinalizationStatus cur = finalizing.get(); - - res = cur == RECOVERY_WAIT || cur == RECOVERY_FINISH; - - break; - - case RECOVERY_FINISH: - FinalizationStatus old = finalizing.get(); - - res = old != USER_FINISH && finalizing.compareAndSet(old, status); - - break; - - default: - throw new IllegalArgumentException("Cannot set finalization status: " + status); - - } - - if (res) { - if (log.isDebugEnabled()) - log.debug("Marked transaction as finalized: " + this); - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction was not marked finalized: " + this); - } - - return res; - } - - /** - * @return Finalization status. - */ - protected FinalizationStatus finalizationStatus() { - return finalizing.get(); - } - - /** - * @return {@code True} if transaction has at least one key enlisted. - */ - public abstract boolean isStarted(); - - /** {@inheritDoc} */ - @Override public boolean groupLock() { - return grpLockKey != null; - } - - /** {@inheritDoc} */ - @Override public GridCacheTxKey groupLockKey() { - return grpLockKey; - } - - /** {@inheritDoc} */ - @Override public int size() { - return txSize; - } - - /** - * @return Logger. - */ - protected IgniteLogger log() { - return log; - } - - /** {@inheritDoc} */ - @Override public boolean near() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean implicit() { - return implicit; - } - - /** {@inheritDoc} */ - @Override public boolean implicitSingle() { - return implicitSingle; - } - - /** {@inheritDoc} */ - @Override public boolean local() { - return loc; - } - - /** {@inheritDoc} */ - @Override public final boolean user() { - return !implicit() && local() && !dht() && !internal(); - } - - /** {@inheritDoc} */ - @Override public boolean dht() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean colocated() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean replicated() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean enforceSerializable() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean syncCommit() { - return syncCommit; - } - - /** {@inheritDoc} */ - @Override public boolean syncRollback() { - return syncRollback; - } - - /** - * @param syncCommit Synchronous commit flag. - */ - public void syncCommit(boolean syncCommit) { - this.syncCommit = syncCommit; - } - - /** - * @param syncRollback Synchronous rollback flag. - */ - public void syncRollback(boolean syncRollback) { - this.syncRollback = syncRollback; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid xid() { - return xidVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public Set invalidPartitions() { - return invalidParts; - } - - /** {@inheritDoc} */ - @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) { - invalidParts.add(part); - - if (log.isDebugEnabled()) - log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']'); - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion ownedVersion(GridCacheTxKey key) { - return null; - } - - /** {@inheritDoc} */ - @Override public long startTime() { - return startTime; - } - - /** - * Gets remaining allowed transaction time. - * - * @return Remaining transaction time. - */ - @Override public long remainingTime() { - if (timeout() <= 0) - return -1; - - long timeLeft = timeout() - (U.currentTimeMillis() - startTime()); - - if (timeLeft < 0) - return 0; - - return timeLeft; - } - - /** - * @return Lock timeout. - */ - protected long lockTimeout() { - long timeout = remainingTime(); - - return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion xidVersion() { - return xidVer; - } - - /** {@inheritDoc} */ - @Override public long threadId() { - return threadId; - } - - /** {@inheritDoc} */ - @Override public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public IgniteTxIsolation isolation() { - return isolation; - } - - /** {@inheritDoc} */ - @Override public IgniteTxConcurrency concurrency() { - return concurrency; - } - - /** {@inheritDoc} */ - @Override public long timeout() { - return timeout; - } - - /** {@inheritDoc} */ - @Override public long timeout(long timeout) { - if (isStarted()) - throw new IllegalStateException("Cannot change timeout after transaction has started: " + this); - - long old = this.timeout; - - this.timeout = timeout; - - return old; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException { - GridCacheContext cacheCtx = entry.context(); - - GridCacheTxEntry txEntry = entry(entry.txKey()); - - GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion(); - - assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " + - "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']'; - - return local() && !cacheCtx.isDht() ? - entry.lockedByThread(threadId()) || (explicit != null && entry.lockedBy(explicit)) : - // If candidate is not there, then lock was explicit. - // Otherwise, check if entry is owned by version. - !entry.hasLockCandidate(xidVersion()) || entry.lockedBy(xidVersion()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) { - GridCacheContext cacheCtx = entry.context(); - - GridCacheTxEntry txEntry = entry(entry.txKey()); - - GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion(); - - assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " + - "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']'; - - return local() && !cacheCtx.isDht() ? - entry.lockedByThreadUnsafe(threadId()) || (explicit != null && entry.lockedByUnsafe(explicit)) : - // If candidate is not there, then lock was explicit. - // Otherwise, check if entry is owned by version. - !entry.hasLockCandidateUnsafe(xidVersion()) || entry.lockedByUnsafe(xidVersion()); - } - - /** {@inheritDoc} */ - @Override public IgniteTxState state() { - return state; - } - - /** {@inheritDoc} */ - @Override public boolean setRollbackOnly() { - return state(MARKED_ROLLBACK); - } - - /** - * @return {@code True} if rollback only flag is set. - */ - @Override public boolean isRollbackOnly() { - return state == MARKED_ROLLBACK || state == ROLLING_BACK || state == ROLLED_BACK; - } - - /** {@inheritDoc} */ - @Override public boolean done() { - return isDone.get(); - } - - /** - * @return Commit version. - */ - @Override public GridCacheVersion commitVersion() { - initCommitVersion(); - - return commitVer.get(); - } - - /** - * @param commitVer Commit version. - * @return {@code True} if set to not null value. - */ - @Override public boolean commitVersion(GridCacheVersion commitVer) { - return commitVer != null && this.commitVer.compareAndSet(null, commitVer); - } - - /** - * - */ - public void initCommitVersion() { - if (commitVer.get() == null) - commitVer.compareAndSet(null, xidVer); - } - - /** - * - */ - @Override public void close() throws IgniteCheckedException { - IgniteTxState state = state(); - - if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) - rollback(); - - awaitCompletion(); - } - - /** {@inheritDoc} */ - @Override public boolean needsCompletedVersions() { - return false; - } - - /** {@inheritDoc} */ - @Override public void completedVersions(GridCacheVersion base, Collection committed, - Collection txs) { - /* No-op. */ - } - - /** - * Awaits transaction completion. - * - * @throws IgniteCheckedException If waiting failed. - */ - protected void awaitCompletion() throws IgniteCheckedException { - lock(); - - try { - while (!done()) - awaitSignal(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - if (!done()) - throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e); - } - finally { - unlock(); - } - } - - /** {@inheritDoc} */ - @Override public boolean internal() { - return internal; - } - - /** - * @param key Key. - * @return {@code True} if key is internal. - */ - protected boolean checkInternal(GridCacheTxKey key) { - if (key.key() instanceof GridCacheInternal) { - internal = true; - - return true; - } - - return false; - } - - /** - * @param onePhaseCommit {@code True} if transaction commit should be performed in short-path way. - */ - public void onePhaseCommit(boolean onePhaseCommit) { - this.onePhaseCommit = onePhaseCommit; - } - - /** - * @return Fast commit flag. - */ - @Override public boolean onePhaseCommit() { - return onePhaseCommit; - } - - /** {@inheritDoc} */ - @Override public boolean optimistic() { - return concurrency == OPTIMISTIC; - } - - /** {@inheritDoc} */ - @Override public boolean pessimistic() { - return concurrency == PESSIMISTIC; - } - - /** {@inheritDoc} */ - @Override public boolean serializable() { - return isolation == SERIALIZABLE; - } - - /** {@inheritDoc} */ - @Override public boolean repeatableRead() { - return isolation == REPEATABLE_READ; - } - - /** {@inheritDoc} */ - @Override public boolean readCommitted() { - return isolation == READ_COMMITTED; - } - - /** {@inheritDoc} */ - @Override public boolean state(IgniteTxState state) { - return state(state, false); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - @Override public IgniteFuture finishFuture() { - GridFutureAdapter fut = finFut.get(); - - if (fut == null) { - fut = new GridFutureAdapter(cctx.kernalContext()) { - @Override public String toString() { - return S.toString(GridFutureAdapter.class, this, "tx", GridCacheTxAdapter.this); - } - }; - - if (!finFut.compareAndSet(null, fut)) - fut = finFut.get(); - } - - assert fut != null; - - if (isDone.get()) - fut.onDone(this); - - return fut; - } - - /** - * - * @param state State to set. - * @param timedOut Timeout flag. - * @return {@code True} if state changed. - */ - @SuppressWarnings({"TooBroadScope"}) - private boolean state(IgniteTxState state, boolean timedOut) { - boolean valid = false; - - IgniteTxState prev; - - boolean notify = false; - - lock(); - - try { - prev = this.state; - - switch (state) { - case ACTIVE: { - valid = false; - - break; - } // Active is initial state and cannot be transitioned to. - case PREPARING: { - valid = prev == ACTIVE; - - break; - } - case PREPARED: { - valid = prev == PREPARING; - - break; - } - case COMMITTING: { - valid = prev == PREPARED; - - break; - } - - case UNKNOWN: { - if (isDone.compareAndSet(false, true)) - notify = true; - - valid = prev == ROLLING_BACK || prev == COMMITTING; - - break; - } - - case COMMITTED: { - if (isDone.compareAndSet(false, true)) - notify = true; - - valid = prev == COMMITTING; - - break; - } - - case ROLLED_BACK: { - if (isDone.compareAndSet(false, true)) - notify = true; - - valid = prev == ROLLING_BACK; - - break; - } - - case MARKED_ROLLBACK: { - valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING; - - break; - } - - case ROLLING_BACK: { - valid = - prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING || - prev == PREPARED || (prev == COMMITTING && local() && !dht()); - - break; - } - } - - if (valid) { - this.state = state; - this.timedOut = timedOut; - - if (log.isDebugEnabled()) - log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); - - // Notify of state change. - signalAll(); - } - else { - if (log.isDebugEnabled()) - log.debug("Invalid transaction state transition [invalid=" + state + ", cur=" + this.state + - ", tx=" + this + ']'); - } - } - finally { - unlock(); - } - - if (notify) { - GridFutureAdapter fut = finFut.get(); - - if (fut != null) - fut.onDone(this); - } - - if (valid) { - // Seal transactions maps. - if (state != ACTIVE) - seal(); - - cctx.tm().onTxStateChange(prev, state, this); - } - - return valid; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion startVersion() { - return startVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion endVersion() { - return endVer; - } - - /** {@inheritDoc} */ - @Override public void endVersion(GridCacheVersion endVer) { - this.endVer = endVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion writeVersion() { - return writeVer == null ? commitVersion() : writeVer; - } - - /** {@inheritDoc} */ - @Override public void writeVersion(GridCacheVersion writeVer) { - this.writeVer = writeVer; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return xidVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout; - - return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - state(MARKED_ROLLBACK, true); - } - - /** {@inheritDoc} */ - @Override public boolean timedOut() { - return timedOut; - } - - /** {@inheritDoc} */ - @Override public void invalidate(boolean invalidate) { - if (isStarted() && !dht()) - throw new IllegalStateException("Cannot change invalidation flag after transaction has started: " + this); - - this.invalidate = invalidate; - } - - /** {@inheritDoc} */ - @Override public boolean isInvalidate() { - return invalidate; - } - - /** {@inheritDoc} */ - @Override public boolean isSystemInvalidate() { - return sysInvalidate; - } - - /** {@inheritDoc} */ - @Override public void systemInvalidate(boolean sysInvalidate) { - this.sysInvalidate = sysInvalidate; - } - - /** {@inheritDoc} */ - @Nullable @Override public Map> transactionNodes() { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public GridCacheVersion nearXidVersion() { - return null; - } - - /** - * @param txEntry Entry to process. - * @param metrics {@code True} if metrics should be updated. - * @return Tuple containing transformation results. - * @throws IgniteCheckedException If failed to get previous value for transform. - * @throws GridCacheEntryRemovedException If entry was concurrently deleted. - */ - protected GridTuple3 applyTransformClosures(GridCacheTxEntry txEntry, - boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException { - GridCacheContext cacheCtx = txEntry.context(); - - assert cacheCtx != null; - - if (isSystemInvalidate()) - return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null); - if (F.isEmpty(txEntry.transformClosures())) - return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes()); - else { - try { - boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); - - V val = txEntry.hasValue() ? txEntry.value() : - txEntry.cached().innerGet(this, - /*swap*/false, - /*read through*/false, - /*fail fast*/true, - /*unmarshal*/true, - /*metrics*/metrics, - /*event*/recordEvt, - /*temporary*/true, - /*subjId*/subjId, - /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null, - resolveTaskName(), - CU.empty()); - - try { - for (IgniteClosure clos : txEntry.transformClosures()) - val = clos.apply(val); - } - catch (Throwable e) { - throw new IgniteException("Transform closure must not throw any exceptions " + - "(transaction will be invalidated)", e); - } - - GridCacheOperation op = val == null ? DELETE : UPDATE; - - return F.t(op, (V)cacheCtx.unwrapTemporary(val), null); - } - catch (GridCacheFilterFailedException e) { - assert false : "Empty filter failed for innerGet: " + e; - - return null; - } - } - } - - /** - * @return Resolves task name. - */ - public String resolveTaskName() { - if (taskName != null) - return taskName; - - return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash)); - } - - /** - * @param e Transaction entry. - * @param primaryOnly Flag to include backups into check or not. - * @return {@code True} if entry is locally mapped as a primary or back up node. - */ - protected boolean isNearLocallyMapped(GridCacheTxEntry e, boolean primaryOnly) { - GridCacheContext cacheCtx = e.context(); - - if (!cacheCtx.isNear()) - return false; - - // Try to take either entry-recorded primary node ID, - // or transaction node ID from near-local transactions. - UUID nodeId = e.nodeId() == null ? local() ? this.nodeId : null : e.nodeId(); - - if (nodeId != null && nodeId.equals(cctx.localNodeId())) - return true; - - GridCacheEntryEx cached = e.cached(); - - int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key()); - - List affNodes = cacheCtx.affinity().nodes(part, topologyVersion()); - - e.locallyMapped(F.contains(affNodes, cctx.localNode())); - - if (primaryOnly) { - ClusterNode primary = F.first(affNodes); - - if (primary == null && !isAffinityNode(cacheCtx.config())) - return false; - - assert primary != null : "Primary node is null for affinity nodes: " + affNodes; - - return primary.isLocal(); - } - else - return e.locallyMapped(); - } - - /** - * @param e Entry to evict if it qualifies for eviction. - * @param primaryOnly Flag to try to evict only on primary node. - * @return {@code True} if attempt was made to evict the entry. - * @throws IgniteCheckedException If failed. - */ - protected boolean evictNearEntry(GridCacheTxEntry e, boolean primaryOnly) throws IgniteCheckedException { - assert e != null; - - if (isNearLocallyMapped(e, primaryOnly)) { - GridCacheEntryEx cached = e.cached(); - - assert cached instanceof GridNearCacheEntry : "Invalid cache entry: " + e; - - if (log.isDebugEnabled()) - log.debug("Evicting dht-local entry from near cache [entry=" + cached + ", tx=" + this + ']'); - - if (cached != null && cached.markObsolete(xidVer)) - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - writeExternalMeta(out); - - out.writeObject(xidVer); - out.writeBoolean(invalidate); - out.writeLong(timeout); - out.writeLong(threadId); - out.writeLong(startTime); - - U.writeUuid(out, nodeId); - - out.write(isolation.ordinal()); - out.write(concurrency.ordinal()); - out.write(state().ordinal()); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - readExternalMeta(in); - - xidVer = (GridCacheVersion)in.readObject(); - invalidate = in.readBoolean(); - timeout = in.readLong(); - threadId = in.readLong(); - startTime = in.readLong(); - - nodeId = U.readUuid(in); - - isolation = IgniteTxIsolation.fromOrdinal(in.read()); - concurrency = IgniteTxConcurrency.fromOrdinal(in.read()); - - state = IgniteTxState.fromOrdinal(in.read()); - } - - /** - * Reconstructs object on unmarshalling. - * - * @return Reconstructed object. - * @throws ObjectStreamException Thrown in case of unmarshalling error. - */ - protected Object readResolve() throws ObjectStreamException { - return new TxShadow( - xidVer.asGridUuid(), - nodeId, - threadId, - startTime, - isolation, - concurrency, - invalidate, - implicit, - timeout, - state(), - isRollbackOnly() - ); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return o == this || (o instanceof GridCacheTxAdapter && xidVer.equals(((GridCacheTxAdapter)o).xidVer)); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return xidVer.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridCacheTxAdapter.class, this, - "duration", (U.currentTimeMillis() - startTime) + "ms", "grpLock", groupLock(), - "onePhaseCommit", onePhaseCommit); - } - - /** - * Transaction shadow class to be used for deserialization. - */ - private static class TxShadow extends GridMetadataAwareAdapter implements IgniteTx { - /** */ - private static final long serialVersionUID = 0L; - - /** Xid. */ - private final IgniteUuid xid; - - /** Node ID. */ - private final UUID nodeId; - - /** Thread ID. */ - private final long threadId; - - /** Start time. */ - private final long startTime; - - /** Transaction isolation. */ - private final IgniteTxIsolation isolation; - - /** Concurrency. */ - private final IgniteTxConcurrency concurrency; - - /** Invalidate flag. */ - private final boolean invalidate; - - /** Timeout. */ - private final long timeout; - - /** State. */ - private final IgniteTxState state; - - /** Rollback only flag. */ - private final boolean rollbackOnly; - - /** Implicit flag. */ - private final boolean implicit; - - /** - * @param xid Xid. - * @param nodeId Node ID. - * @param threadId Thread ID. - * @param startTime Start time. - * @param isolation Isolation. - * @param concurrency Concurrency. - * @param invalidate Invalidate flag. - * @param implicit Implicit flag. - * @param timeout Transaction timeout. - * @param state Transaction state. - * @param rollbackOnly Rollback-only flag. - */ - TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, IgniteTxIsolation isolation, - IgniteTxConcurrency concurrency, boolean invalidate, boolean implicit, long timeout, - IgniteTxState state, boolean rollbackOnly) { - this.xid = xid; - this.nodeId = nodeId; - this.threadId = threadId; - this.startTime = startTime; - this.isolation = isolation; - this.concurrency = concurrency; - this.invalidate = invalidate; - this.implicit = implicit; - this.timeout = timeout; - this.state = state; - this.rollbackOnly = rollbackOnly; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid xid() { - return xid; - } - - /** {@inheritDoc} */ - @Override public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public long threadId() { - return threadId; - } - - /** {@inheritDoc} */ - @Override public long startTime() { - return startTime; - } - - /** {@inheritDoc} */ - @Override public IgniteTxIsolation isolation() { - return isolation; - } - - /** {@inheritDoc} */ - @Override public IgniteTxConcurrency concurrency() { - return concurrency; - } - - /** {@inheritDoc} */ - @Override public boolean isInvalidate() { - return invalidate; - } - - /** {@inheritDoc} */ - @Override public boolean implicit() { - return implicit; - } - - /** {@inheritDoc} */ - @Override public long timeout() { - return timeout; - } - - /** {@inheritDoc} */ - @Override public IgniteTxState state() { - return state; - } - - /** {@inheritDoc} */ - @Override public boolean isRollbackOnly() { - return rollbackOnly; - } - - /** {@inheritDoc} */ - @Override public long timeout(long timeout) { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public boolean setRollbackOnly() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void commit() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture commitAsync() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void rollback() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - return this == o || o instanceof IgniteTx && xid.equals(((IgniteTx)o).xid()); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return xid.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TxShadow.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java deleted file mode 100644 index 91b9cc0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java +++ /dev/null @@ -1,1059 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; - -/** - * Transaction entry. Note that it is essential that this class does not override - * {@link #equals(Object)} method, as transaction entries should use referential - * equality. - */ -public class GridCacheTxEntry implements GridPeerDeployAware, Externalizable, IgniteOptimizedMarshallable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) - private static Object GG_CLASS_ID; - - /** Owning transaction. */ - @GridToStringExclude - private GridCacheTxEx tx; - - /** Cache key. */ - @GridToStringInclude - private K key; - - /** Key bytes. */ - private byte[] keyBytes; - - /** Cache ID. */ - private int cacheId; - - /** Transient tx key. */ - private GridCacheTxKey txKey; - - /** Cache value. */ - @GridToStringInclude - private TxEntryValueHolder val = new TxEntryValueHolder<>(); - - /** Visible value for peek. */ - @GridToStringInclude - private TxEntryValueHolder prevVal = new TxEntryValueHolder<>(); - - /** Filter bytes. */ - private byte[] filterBytes; - - /** Transform. */ - @GridToStringInclude - private Collection> transformClosCol; - - /** Transform closure bytes. */ - @GridToStringExclude - private byte[] transformClosBytes; - - /** Time to live. */ - private long ttl; - - /** DR expire time (explicit) */ - private long drExpireTime = -1L; - - /** Explicit lock version if there is one. */ - @GridToStringInclude - private GridCacheVersion explicitVer; - - /** DHT version. */ - private transient volatile GridCacheVersion dhtVer; - - /** Put filters. */ - @GridToStringInclude - private IgnitePredicate>[] filters; - - /** Flag indicating whether filters passed. Used for fast-commit transactions. */ - private boolean filtersPassed; - - /** Flag indicating that filter is set and can not be replaced. */ - private transient boolean filtersSet; - - /** Underlying cache entry. */ - private transient volatile GridCacheEntryEx entry; - - /** Cache registry. */ - private transient GridCacheContext ctx; - - /** Prepared flag to prevent multiple candidate add. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient AtomicBoolean prepared = new AtomicBoolean(); - - /** Lock flag for colocated cache. */ - private transient boolean locked; - - /** Assigned node ID (required only for partitioned cache). */ - private transient UUID nodeId; - - /** Flag if this node is a back up node. */ - private boolean locMapped; - - /** Group lock entry flag. */ - private boolean grpLock; - - /** Flag indicating if this entry should be transferred to remote node. */ - private boolean transferRequired; - - /** Deployment enabled flag. */ - private boolean depEnabled; - - /** Data center replication version. */ - private GridCacheVersion drVer; - - /** - * Required by {@link Externalizable} - */ - public GridCacheTxEntry() { - /* No-op. */ - } - - /** - * This constructor is meant for remote transactions. - * - * @param ctx Cache registry. - * @param tx Owning transaction. - * @param op Operation. - * @param val Value. - * @param ttl Time to live. - * @param drExpireTime DR expire time. - * @param entry Cache entry. - * @param drVer Data center replication version. - */ - public GridCacheTxEntry(GridCacheContext ctx, GridCacheTxEx tx, GridCacheOperation op, V val, - long ttl, long drExpireTime, GridCacheEntryEx entry, @Nullable GridCacheVersion drVer) { - assert ctx != null; - assert tx != null; - assert op != null; - assert entry != null; - - this.ctx = ctx; - this.tx = tx; - this.val.value(op, val, false, false); - this.entry = entry; - this.ttl = ttl; - this.drExpireTime = drExpireTime; - this.drVer = drVer; - - key = entry.key(); - keyBytes = entry.keyBytes(); - - cacheId = entry.context().cacheId(); - - depEnabled = ctx.gridDeploy().enabled(); - } - - /** - * This constructor is meant for local transactions. - * - * @param ctx Cache registry. - * @param tx Owning transaction. - * @param op Operation. - * @param val Value. - * @param transformClos Transform closure. - * @param ttl Time to live. - * @param entry Cache entry. - * @param filters Put filters. - * @param drVer Data center replication version. - */ - public GridCacheTxEntry(GridCacheContext ctx, GridCacheTxEx tx, GridCacheOperation op, - V val, IgniteClosure transformClos, long ttl, GridCacheEntryEx entry, - IgnitePredicate>[] filters, GridCacheVersion drVer) { - assert ctx != null; - assert tx != null; - assert op != null; - assert entry != null; - - this.ctx = ctx; - this.tx = tx; - this.val.value(op, val, false, false); - this.entry = entry; - this.ttl = ttl; - this.filters = filters; - this.drVer = drVer; - - if (transformClos != null) - addTransformClosure(transformClos); - - key = entry.key(); - keyBytes = entry.keyBytes(); - - cacheId = entry.context().cacheId(); - - depEnabled = ctx.gridDeploy().enabled(); - } - - /** - * @return Cache context for this tx entry. - */ - public GridCacheContext context() { - return ctx; - } - - /** - * @return Flag indicating if this entry is affinity mapped to the same node. - */ - public boolean locallyMapped() { - return locMapped; - } - - /** - * @param locMapped Flag indicating if this entry is affinity mapped to the same node. - */ - public void locallyMapped(boolean locMapped) { - this.locMapped = locMapped; - } - - /** - * @return {@code True} if this entry was added in group lock transaction and - * this is not a group lock entry. - */ - public boolean groupLockEntry() { - return grpLock; - } - - /** - * @param grpLock {@code True} if this entry was added in group lock transaction and - * this is not a group lock entry. - */ - public void groupLockEntry(boolean grpLock) { - this.grpLock = grpLock; - } - - /** - * @param transferRequired Sets flag indicating that transfer is required to remote node. - */ - public void transferRequired(boolean transferRequired) { - this.transferRequired = transferRequired; - } - - /** - * @return Flag indicating whether transfer is required to remote nodes. - */ - public boolean transferRequired() { - return transferRequired; - } - - /** - * @param ctx Context. - * @return Clean copy of this entry. - */ - public GridCacheTxEntry cleanCopy(GridCacheContext ctx) { - GridCacheTxEntry cp = new GridCacheTxEntry<>(); - - cp.key = key; - cp.cacheId = cacheId; - cp.ctx = ctx; - - cp.val = new TxEntryValueHolder<>(); - - cp.keyBytes = keyBytes; - cp.filters = filters; - cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); - cp.val.valueBytes(val.valueBytes()); - cp.transformClosCol = transformClosCol; - cp.ttl = ttl; - cp.drExpireTime = drExpireTime; - cp.explicitVer = explicitVer; - cp.grpLock = grpLock; - cp.depEnabled = depEnabled; - cp.drVer = drVer; - - return cp; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @param nodeId Node ID. - */ - public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** - * @return DHT version. - */ - public GridCacheVersion dhtVersion() { - return dhtVer; - } - - /** - * @param dhtVer DHT version. - */ - public void dhtVersion(GridCacheVersion dhtVer) { - this.dhtVer = dhtVer; - } - - /** - * @return {@code True} if tx entry was marked as locked. - */ - public boolean locked() { - return locked; - } - - /** - * Marks tx entry as locked. - */ - public void markLocked() { - locked = true; - } - - /** - * @param val Value to set. - */ - void setAndMarkValid(V val) { - setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue()); - } - - /** - * @param op Operation. - * @param val Value to set. - */ - void setAndMarkValid(GridCacheOperation op, V val) { - setAndMarkValid(op, val, this.val.hasWriteValue(), this.val.hasReadValue()); - } - - /** - * @param op Operation. - * @param val Value to set. - * @param hasReadVal Has read value flag. - * @param hasWriteVal Has write value flag. - */ - void setAndMarkValid(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) { - this.val.value(op, val, hasWriteVal, hasReadVal); - - markValid(); - } - - /** - * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible - * to further peek operations. - */ - void markValid() { - prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); - } - - /** - * Marks entry as prepared. - * - * @return True if entry was marked prepared by this call. - */ - boolean markPrepared() { - return prepared.compareAndSet(false, true); - } - - /** - * @return Entry key. - */ - public K key() { - return key; - } - - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** - * @return Tx key. - */ - public GridCacheTxKey txKey() { - if (txKey == null) - txKey = new GridCacheTxKey<>(key, cacheId); - - return txKey; - } - - /** - * - * @return Key bytes. - */ - @Nullable public byte[] keyBytes() { - byte[] bytes = keyBytes; - - if (bytes == null && entry != null) { - bytes = entry.keyBytes(); - - keyBytes = bytes; - } - - return bytes; - } - - /** - * @param keyBytes Key bytes. - */ - public void keyBytes(byte[] keyBytes) { - initKeyBytes(keyBytes); - } - - /** - * @return Underlying cache entry. - */ - public GridCacheEntryEx cached() { - return entry; - } - - /** - * @param entry Cache entry. - * @param keyBytes Key bytes, possibly {@code null}. - */ - public void cached(GridCacheEntryEx entry, @Nullable byte[] keyBytes) { - assert entry != null; - - assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this + - ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']'; - - this.entry = entry; - - initKeyBytes(keyBytes); - } - - /** - * Initialized key bytes locally and on the underlying entry. - * - * @param bytes Key bytes to initialize. - */ - private void initKeyBytes(@Nullable byte[] bytes) { - if (bytes != null) { - keyBytes = bytes; - - while (true) { - try { - if (entry != null) - entry.keyBytes(bytes); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - entry = ctx.cache().entryEx(key); - } - } - } - else if (entry != null) { - bytes = entry.keyBytes(); - - if (bytes != null) - keyBytes = bytes; - } - } - - /** - * @return Entry value. - */ - @Nullable public V value() { - return val.value(); - } - - /** - * @return {@code True} if has value explicitly set. - */ - public boolean hasValue() { - return val.hasValue(); - } - - /** - * @return {@code True} if has write value set. - */ - public boolean hasWriteValue() { - return val.hasWriteValue(); - } - - /** - * @return {@code True} if has read value set. - */ - public boolean hasReadValue() { - return val.hasReadValue(); - } - - /** - * @return Value visible for peek. - */ - @Nullable public V previousValue() { - return prevVal.value(); - } - - /** - * @return {@code True} if has previous value explicitly set. - */ - boolean hasPreviousValue() { - return prevVal.hasValue(); - } - - /** - * @return Previous operation to revert entry in case of filter failure. - */ - @Nullable public GridCacheOperation previousOperation() { - return prevVal.op(); - } - - /** - * @return Value bytes. - */ - @Nullable public byte[] valueBytes() { - return val.valueBytes(); - } - - /** - * @param valBytes Value bytes. - */ - public void valueBytes(@Nullable byte[] valBytes) { - val.valueBytes(valBytes); - } - - /** - * @return Time to live. - */ - public long ttl() { - return ttl; - } - - /** - * @param ttl Time to live. - */ - public void ttl(long ttl) { - this.ttl = ttl; - } - - /** - * @return DR expire time. - */ - public long drExpireTime() { - return drExpireTime; - } - - /** - * @param drExpireTime DR expire time. - */ - public void drExpireTime(long drExpireTime) { - this.drExpireTime = drExpireTime; - } - - /** - * @param val Entry value. - * @param writeVal Write value flag. - * @param readVal Read value flag. - */ - public void value(@Nullable V val, boolean writeVal, boolean readVal) { - this.val.value(this.val.op(), val, writeVal, readVal); - } - - /** - * Sets read value if this tx entrty does not have write value yet. - * - * @param val Read value to set. - */ - public void readValue(@Nullable V val) { - this.val.value(this.val.op(), val, false, true); - } - - /** - * @param transformClos Transform closure. - */ - public void addTransformClosure(IgniteClosure transformClos) { - if (transformClosCol == null) - transformClosCol = new LinkedList<>(); - - transformClosCol.add(transformClos); - - // Must clear transform closure bytes since collection has changed. - transformClosBytes = null; - - val.op(TRANSFORM); - } - - /** - * @return Collection of transform closures. - */ - public Collection> transformClosures() { - return transformClosCol; - } - - /** - * @param transformClosCol Collection of transform closures. - */ - public void transformClosures(@Nullable Collection> transformClosCol) { - this.transformClosCol = transformClosCol; - - // Must clear transform closure bytes since collection has changed. - transformClosBytes = null; - } - - /** - * @return Cache operation. - */ - public GridCacheOperation op() { - return val.op(); - } - - /** - * @param op Cache operation. - */ - public void op(GridCacheOperation op) { - val.op(op); - } - - /** - * @return {@code True} if read entry. - */ - public boolean isRead() { - return op() == READ; - } - - /** - * @param explicitVer Explicit version. - */ - public void explicitVersion(GridCacheVersion explicitVer) { - this.explicitVer = explicitVer; - } - - /** - * @return Explicit version. - */ - public GridCacheVersion explicitVersion() { - return explicitVer; - } - - /** - * @return DR version. - */ - @Nullable public GridCacheVersion drVersion() { - return drVer; - } - - /** - * @param drVer DR version. - */ - public void drVersion(@Nullable GridCacheVersion drVer) { - this.drVer = drVer; - } - - /** - * @return Put filters. - */ - public IgnitePredicate>[] filters() { - return filters; - } - - /** - * @param filters Put filters. - */ - public void filters(IgnitePredicate>[] filters) { - filterBytes = null; - - this.filters = filters; - } - - /** - * @return {@code True} if filters passed for fast-commit transactions. - */ - public boolean filtersPassed() { - return filtersPassed; - } - - /** - * @param filtersPassed {@code True} if filters passed for fast-commit transactions. - */ - public void filtersPassed(boolean filtersPassed) { - this.filtersPassed = filtersPassed; - } - - /** - * @return {@code True} if filters are set. - */ - public boolean filtersSet() { - return filtersSet; - } - - /** - * @param filtersSet {@code True} if filters are set and should not be replaced. - */ - public void filtersSet(boolean filtersSet) { - this.filtersSet = filtersSet; - } - - /** - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void marshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - // Do not serialize filters if they are null. - if (depEnabled) { - if (keyBytes == null) - keyBytes = entry.getOrMarshalKeyBytes(); - - if (transformClosBytes == null && transformClosCol != null) - transformClosBytes = CU.marshal(ctx, transformClosCol); - - if (F.isEmptyOrNulls(filters)) - filterBytes = null; - else if (filterBytes == null) - filterBytes = CU.marshal(ctx, filters); - } - - val.marshal(ctx, context(), depEnabled); - } - - /** - * Unmarshalls entry. - * - * @param ctx Cache context. - * @param clsLdr Class loader. - * @throws IgniteCheckedException If un-marshalling failed. - */ - public void unmarshal(GridCacheSharedContext ctx, boolean near, ClassLoader clsLdr) throws IgniteCheckedException { - if (this.ctx == null) { - GridCacheContext cacheCtx = ctx.cacheContext(cacheId); - - if (cacheCtx.isNear() && !near) - cacheCtx = cacheCtx.near().dht().context(); - else if (!cacheCtx.isNear() && near) - cacheCtx = cacheCtx.dht().near().context(); - - this.ctx = cacheCtx; - } - - if (depEnabled) { - // Don't unmarshal more than once by checking key for null. - if (key == null) - key = ctx.marshaller().unmarshal(keyBytes, clsLdr); - - // Unmarshal transform closure anyway if it exists. - if (transformClosBytes != null && transformClosCol == null) - transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr); - - if (filters == null && filterBytes != null) { - filters = ctx.marshaller().unmarshal(filterBytes, clsLdr); - - if (filters == null) - filters = CU.empty(); - } - } - - val.unmarshal(this.ctx, clsLdr, depEnabled); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(depEnabled); - - if (depEnabled) { - U.writeByteArray(out, keyBytes); - U.writeByteArray(out, transformClosBytes); - U.writeByteArray(out, filterBytes); - } - else { - out.writeObject(key); - U.writeCollection(out, transformClosCol); - U.writeArray(out, filters); - } - - out.writeInt(cacheId); - - val.writeTo(out); - - out.writeLong(ttl); - out.writeLong(drExpireTime); - - CU.writeVersion(out, explicitVer); - out.writeBoolean(grpLock); - CU.writeVersion(out, drVer); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - depEnabled = in.readBoolean(); - - if (depEnabled) { - keyBytes = U.readByteArray(in); - transformClosBytes = U.readByteArray(in); - filterBytes = U.readByteArray(in); - } - else { - key = (K)in.readObject(); - transformClosCol = U.readCollection(in); - filters = U.readEntryFilterArray(in); - } - - cacheId = in.readInt(); - - val.readFrom(in); - - ttl = in.readLong(); - drExpireTime = in.readLong(); - - explicitVer = CU.readVersion(in); - grpLock = in.readBoolean(); - drVer = CU.readVersion(in); - } - - /** {@inheritDoc} */ - @Override public Object ggClassId() { - return GG_CLASS_ID; - } - - /** {@inheritDoc} */ - @Override public Class deployClass() { - ClassLoader clsLdr = getClass().getClassLoader(); - - V val = value(); - - // First of all check classes that may be loaded by class loader other than application one. - return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ? - key.getClass() : val != null ? val.getClass() : getClass(); - } - - /** {@inheritDoc} */ - @Override public ClassLoader classLoader() { - return deployClass().getClassLoader(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(GridCacheTxEntry.class, this, - "keyBytesSize", keyBytes == null ? "null" : Integer.toString(keyBytes.length), - "xidVer", tx == null ? "null" : tx.xidVersion()); - } - - /** - * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes. - */ - private static class TxEntryValueHolder { - /** */ - @GridToStringInclude - private V val; - - /** */ - @GridToStringExclude - private byte[] valBytes; - - /** */ - @GridToStringInclude - private GridCacheOperation op = NOOP; - - /** Flag indicating that value has been set for write. */ - private boolean hasWriteVal; - - /** Flag indicating that value has been set for read. */ - private boolean hasReadVal; - - /** Flag indicating that bytes were sent. */ - private boolean valBytesSent; - - /** - * @param op Cache operation. - * @param val Value. - * @param hasWriteVal Write value presence flag. - * @param hasReadVal Read value presence flag. - */ - public void value(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) { - if (hasReadVal && this.hasWriteVal) - return; - - boolean clean = this.val != null; - - this.op = op; - this.val = val; - - if (clean) - valBytes = null; - - this.hasWriteVal = hasWriteVal || op == CREATE || op == UPDATE || op == DELETE; - this.hasReadVal = hasReadVal || op == READ; - } - - /** - * @return {@code True} if has read or write value. - */ - public boolean hasValue() { - return hasWriteVal || hasReadVal; - } - - /** - * Gets stored value. - * - * @return Value. - */ - public V value() { - return val; - } - - /** - * @param val Stored value. - */ - public void value(@Nullable V val) { - boolean clean = this.val != null; - - this.val = val; - - if (clean) - valBytes = null; - } - - /** - * Gets cache operation. - * - * @return Cache operation. - */ - public GridCacheOperation op() { - return op; - } - - /** - * Sets cache operation. - * - * @param op Cache operation. - */ - public void op(GridCacheOperation op) { - this.op = op; - } - - /** - * @return {@code True} if write value was set. - */ - public boolean hasWriteValue() { - return hasWriteVal; - } - - /** - * @return {@code True} if read value was set. - */ - public boolean hasReadValue() { - return hasReadVal; - } - - /** - * Sets value bytes. - * - * @param valBytes Value bytes to set. - */ - public void valueBytes(@Nullable byte[] valBytes) { - this.valBytes = valBytes; - } - - /** - * Gets value bytes. - * - * @return Value bytes. - */ - public byte[] valueBytes() { - return valBytes; - } - - /** - * @param ctx Cache context. - * @param depEnabled Deployment enabled flag. - * @throws IgniteCheckedException If marshaling failed. - */ - public void marshal(GridCacheSharedContext sharedCtx, GridCacheContext ctx, boolean depEnabled) - throws IgniteCheckedException { - boolean valIsByteArr = val != null && val instanceof byte[]; - - // Do not send write values to remote nodes. - if (hasWriteVal && val != null && !valIsByteArr && valBytes == null && - (depEnabled || !ctx.isUnmarshalValues())) - valBytes = CU.marshal(sharedCtx, val); - - valBytesSent = hasWriteVal && !valIsByteArr && valBytes != null && (depEnabled || !ctx.isUnmarshalValues()); - } - - /** - * @param ctx Cache context. - * @param ldr Class loader. - * @param depEnabled Deployment enabled flag. - * @throws IgniteCheckedException If unmarshalling failed. - */ - public void unmarshal(GridCacheContext ctx, ClassLoader ldr, boolean depEnabled) throws IgniteCheckedException { - if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled)) - val = ctx.marshaller().unmarshal(valBytes, ldr); - } - - /** - * @param out Data output. - * @throws IOException If failed. - */ - public void writeTo(ObjectOutput out) throws IOException { - out.writeBoolean(hasWriteVal); - out.writeBoolean(valBytesSent); - - if (hasWriteVal) { - if (valBytesSent) - U.writeByteArray(out, valBytes); - else { - if (val != null && val instanceof byte[]) { - out.writeBoolean(true); - - U.writeByteArray(out, (byte[])val); - } - else { - out.writeBoolean(false); - - out.writeObject(val); - } - } - } - - out.writeInt(op.ordinal()); - } - - /** - * @param in Data input. - * @throws IOException If failed. - * @throws ClassNotFoundException If failed. - */ - public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException { - hasWriteVal = in.readBoolean(); - valBytesSent = in.readBoolean(); - - if (hasWriteVal) { - if (valBytesSent) - valBytes = U.readByteArray(in); - else - val = in.readBoolean() ? (V)U.readByteArray(in) : (V)in.readObject(); - } - - op = fromOrdinal(in.readInt()); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "[op=" + op +", val=" + val + ", valBytesLen=" + (valBytes == null ? 0 : valBytes.length) + ']'; - } - } -}