Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F356E10760 for ; Tue, 23 Dec 2014 00:06:55 +0000 (UTC) Received: (qmail 77768 invoked by uid 500); 23 Dec 2014 00:06:55 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 77700 invoked by uid 500); 23 Dec 2014 00:06:55 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 77684 invoked by uid 99); 23 Dec 2014 00:06:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Dec 2014 00:06:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 23 Dec 2014 00:06:38 +0000 Received: (qmail 65257 invoked by uid 99); 22 Dec 2014 23:59:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Dec 2014 23:59:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 43979A33E03; Mon, 22 Dec 2014 23:59:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 23 Dec 2014 00:00:11 -0000 Message-Id: In-Reply-To: <26feadb5eea944938f598249aa42f8a1@git.apache.org> References: <26feadb5eea944938f598249aa42f8a1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [37/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java new file mode 100644 index 0000000..1cf0c33 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java @@ -0,0 +1,1060 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*; + +/** + * Transaction entry. Note that it is essential that this class does not override + * {@link #equals(Object)} method, as transaction entries should use referential + * equality. + */ +public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, IgniteOptimizedMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"}) + private static Object GG_CLASS_ID; + + /** Owning transaction. */ + @GridToStringExclude + private IgniteTxEx tx; + + /** Cache key. */ + @GridToStringInclude + private K key; + + /** Key bytes. */ + private byte[] keyBytes; + + /** Cache ID. */ + private int cacheId; + + /** Transient tx key. */ + private IgniteTxKey txKey; + + /** Cache value. */ + @GridToStringInclude + private TxEntryValueHolder val = new TxEntryValueHolder<>(); + + /** Visible value for peek. */ + @GridToStringInclude + private TxEntryValueHolder prevVal = new TxEntryValueHolder<>(); + + /** Filter bytes. */ + private byte[] filterBytes; + + /** Transform. */ + @GridToStringInclude + private Collection> transformClosCol; + + /** Transform closure bytes. */ + @GridToStringExclude + private byte[] transformClosBytes; + + /** Time to live. */ + private long ttl; + + /** DR expire time (explicit) */ + private long drExpireTime = -1L; + + /** Explicit lock version if there is one. */ + @GridToStringInclude + private GridCacheVersion explicitVer; + + /** DHT version. */ + private transient volatile GridCacheVersion dhtVer; + + /** Put filters. */ + @GridToStringInclude + private IgnitePredicate>[] filters; + + /** Flag indicating whether filters passed. Used for fast-commit transactions. */ + private boolean filtersPassed; + + /** Flag indicating that filter is set and can not be replaced. */ + private transient boolean filtersSet; + + /** Underlying cache entry. */ + private transient volatile GridCacheEntryEx entry; + + /** Cache registry. */ + private transient GridCacheContext ctx; + + /** Prepared flag to prevent multiple candidate add. */ + @SuppressWarnings({"TransientFieldNotInitialized"}) + private transient AtomicBoolean prepared = new AtomicBoolean(); + + /** Lock flag for colocated cache. */ + private transient boolean locked; + + /** Assigned node ID (required only for partitioned cache). */ + private transient UUID nodeId; + + /** Flag if this node is a back up node. */ + private boolean locMapped; + + /** Group lock entry flag. */ + private boolean grpLock; + + /** Flag indicating if this entry should be transferred to remote node. */ + private boolean transferRequired; + + /** Deployment enabled flag. */ + private boolean depEnabled; + + /** Data center replication version. */ + private GridCacheVersion drVer; + + /** + * Required by {@link Externalizable} + */ + public IgniteTxEntry() { + /* No-op. */ + } + + /** + * This constructor is meant for remote transactions. + * + * @param ctx Cache registry. + * @param tx Owning transaction. + * @param op Operation. + * @param val Value. + * @param ttl Time to live. + * @param drExpireTime DR expire time. + * @param entry Cache entry. + * @param drVer Data center replication version. + */ + public IgniteTxEntry(GridCacheContext ctx, IgniteTxEx tx, GridCacheOperation op, V val, + long ttl, long drExpireTime, GridCacheEntryEx entry, @Nullable GridCacheVersion drVer) { + assert ctx != null; + assert tx != null; + assert op != null; + assert entry != null; + + this.ctx = ctx; + this.tx = tx; + this.val.value(op, val, false, false); + this.entry = entry; + this.ttl = ttl; + this.drExpireTime = drExpireTime; + this.drVer = drVer; + + key = entry.key(); + keyBytes = entry.keyBytes(); + + cacheId = entry.context().cacheId(); + + depEnabled = ctx.gridDeploy().enabled(); + } + + /** + * This constructor is meant for local transactions. + * + * @param ctx Cache registry. + * @param tx Owning transaction. + * @param op Operation. + * @param val Value. + * @param transformClos Transform closure. + * @param ttl Time to live. + * @param entry Cache entry. + * @param filters Put filters. + * @param drVer Data center replication version. + */ + public IgniteTxEntry(GridCacheContext ctx, IgniteTxEx tx, GridCacheOperation op, + V val, IgniteClosure transformClos, long ttl, GridCacheEntryEx entry, + IgnitePredicate>[] filters, GridCacheVersion drVer) { + assert ctx != null; + assert tx != null; + assert op != null; + assert entry != null; + + this.ctx = ctx; + this.tx = tx; + this.val.value(op, val, false, false); + this.entry = entry; + this.ttl = ttl; + this.filters = filters; + this.drVer = drVer; + + if (transformClos != null) + addTransformClosure(transformClos); + + key = entry.key(); + keyBytes = entry.keyBytes(); + + cacheId = entry.context().cacheId(); + + depEnabled = ctx.gridDeploy().enabled(); + } + + /** + * @return Cache context for this tx entry. + */ + public GridCacheContext context() { + return ctx; + } + + /** + * @return Flag indicating if this entry is affinity mapped to the same node. + */ + public boolean locallyMapped() { + return locMapped; + } + + /** + * @param locMapped Flag indicating if this entry is affinity mapped to the same node. + */ + public void locallyMapped(boolean locMapped) { + this.locMapped = locMapped; + } + + /** + * @return {@code True} if this entry was added in group lock transaction and + * this is not a group lock entry. + */ + public boolean groupLockEntry() { + return grpLock; + } + + /** + * @param grpLock {@code True} if this entry was added in group lock transaction and + * this is not a group lock entry. + */ + public void groupLockEntry(boolean grpLock) { + this.grpLock = grpLock; + } + + /** + * @param transferRequired Sets flag indicating that transfer is required to remote node. + */ + public void transferRequired(boolean transferRequired) { + this.transferRequired = transferRequired; + } + + /** + * @return Flag indicating whether transfer is required to remote nodes. + */ + public boolean transferRequired() { + return transferRequired; + } + + /** + * @param ctx Context. + * @return Clean copy of this entry. + */ + public IgniteTxEntry cleanCopy(GridCacheContext ctx) { + IgniteTxEntry cp = new IgniteTxEntry<>(); + + cp.key = key; + cp.cacheId = cacheId; + cp.ctx = ctx; + + cp.val = new TxEntryValueHolder<>(); + + cp.keyBytes = keyBytes; + cp.filters = filters; + cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); + cp.val.valueBytes(val.valueBytes()); + cp.transformClosCol = transformClosCol; + cp.ttl = ttl; + cp.drExpireTime = drExpireTime; + cp.explicitVer = explicitVer; + cp.grpLock = grpLock; + cp.depEnabled = depEnabled; + cp.drVer = drVer; + + return cp; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @param nodeId Node ID. + */ + public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return DHT version. + */ + public GridCacheVersion dhtVersion() { + return dhtVer; + } + + /** + * @param dhtVer DHT version. + */ + public void dhtVersion(GridCacheVersion dhtVer) { + this.dhtVer = dhtVer; + } + + /** + * @return {@code True} if tx entry was marked as locked. + */ + public boolean locked() { + return locked; + } + + /** + * Marks tx entry as locked. + */ + public void markLocked() { + locked = true; + } + + /** + * @param val Value to set. + */ + void setAndMarkValid(V val) { + setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue()); + } + + /** + * @param op Operation. + * @param val Value to set. + */ + void setAndMarkValid(GridCacheOperation op, V val) { + setAndMarkValid(op, val, this.val.hasWriteValue(), this.val.hasReadValue()); + } + + /** + * @param op Operation. + * @param val Value to set. + * @param hasReadVal Has read value flag. + * @param hasWriteVal Has write value flag. + */ + void setAndMarkValid(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) { + this.val.value(op, val, hasWriteVal, hasReadVal); + + markValid(); + } + + /** + * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible + * to further peek operations. + */ + void markValid() { + prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); + } + + /** + * Marks entry as prepared. + * + * @return True if entry was marked prepared by this call. + */ + boolean markPrepared() { + return prepared.compareAndSet(false, true); + } + + /** + * @return Entry key. + */ + public K key() { + return key; + } + + /** + * @return Cache ID. + */ + public int cacheId() { + return cacheId; + } + + /** + * @return Tx key. + */ + public IgniteTxKey txKey() { + if (txKey == null) + txKey = new IgniteTxKey<>(key, cacheId); + + return txKey; + } + + /** + * + * @return Key bytes. + */ + @Nullable public byte[] keyBytes() { + byte[] bytes = keyBytes; + + if (bytes == null && entry != null) { + bytes = entry.keyBytes(); + + keyBytes = bytes; + } + + return bytes; + } + + /** + * @param keyBytes Key bytes. + */ + public void keyBytes(byte[] keyBytes) { + initKeyBytes(keyBytes); + } + + /** + * @return Underlying cache entry. + */ + public GridCacheEntryEx cached() { + return entry; + } + + /** + * @param entry Cache entry. + * @param keyBytes Key bytes, possibly {@code null}. + */ + public void cached(GridCacheEntryEx entry, @Nullable byte[] keyBytes) { + assert entry != null; + + assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this + + ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']'; + + this.entry = entry; + + initKeyBytes(keyBytes); + } + + /** + * Initialized key bytes locally and on the underlying entry. + * + * @param bytes Key bytes to initialize. + */ + private void initKeyBytes(@Nullable byte[] bytes) { + if (bytes != null) { + keyBytes = bytes; + + while (true) { + try { + if (entry != null) + entry.keyBytes(bytes); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry = ctx.cache().entryEx(key); + } + } + } + else if (entry != null) { + bytes = entry.keyBytes(); + + if (bytes != null) + keyBytes = bytes; + } + } + + /** + * @return Entry value. + */ + @Nullable public V value() { + return val.value(); + } + + /** + * @return {@code True} if has value explicitly set. + */ + public boolean hasValue() { + return val.hasValue(); + } + + /** + * @return {@code True} if has write value set. + */ + public boolean hasWriteValue() { + return val.hasWriteValue(); + } + + /** + * @return {@code True} if has read value set. + */ + public boolean hasReadValue() { + return val.hasReadValue(); + } + + /** + * @return Value visible for peek. + */ + @Nullable public V previousValue() { + return prevVal.value(); + } + + /** + * @return {@code True} if has previous value explicitly set. + */ + boolean hasPreviousValue() { + return prevVal.hasValue(); + } + + /** + * @return Previous operation to revert entry in case of filter failure. + */ + @Nullable public GridCacheOperation previousOperation() { + return prevVal.op(); + } + + /** + * @return Value bytes. + */ + @Nullable public byte[] valueBytes() { + return val.valueBytes(); + } + + /** + * @param valBytes Value bytes. + */ + public void valueBytes(@Nullable byte[] valBytes) { + val.valueBytes(valBytes); + } + + /** + * @return Time to live. + */ + public long ttl() { + return ttl; + } + + /** + * @param ttl Time to live. + */ + public void ttl(long ttl) { + this.ttl = ttl; + } + + /** + * @return DR expire time. + */ + public long drExpireTime() { + return drExpireTime; + } + + /** + * @param drExpireTime DR expire time. + */ + public void drExpireTime(long drExpireTime) { + this.drExpireTime = drExpireTime; + } + + /** + * @param val Entry value. + * @param writeVal Write value flag. + * @param readVal Read value flag. + */ + public void value(@Nullable V val, boolean writeVal, boolean readVal) { + this.val.value(this.val.op(), val, writeVal, readVal); + } + + /** + * Sets read value if this tx entrty does not have write value yet. + * + * @param val Read value to set. + */ + public void readValue(@Nullable V val) { + this.val.value(this.val.op(), val, false, true); + } + + /** + * @param transformClos Transform closure. + */ + public void addTransformClosure(IgniteClosure transformClos) { + if (transformClosCol == null) + transformClosCol = new LinkedList<>(); + + transformClosCol.add(transformClos); + + // Must clear transform closure bytes since collection has changed. + transformClosBytes = null; + + val.op(TRANSFORM); + } + + /** + * @return Collection of transform closures. + */ + public Collection> transformClosures() { + return transformClosCol; + } + + /** + * @param transformClosCol Collection of transform closures. + */ + public void transformClosures(@Nullable Collection> transformClosCol) { + this.transformClosCol = transformClosCol; + + // Must clear transform closure bytes since collection has changed. + transformClosBytes = null; + } + + /** + * @return Cache operation. + */ + public GridCacheOperation op() { + return val.op(); + } + + /** + * @param op Cache operation. + */ + public void op(GridCacheOperation op) { + val.op(op); + } + + /** + * @return {@code True} if read entry. + */ + public boolean isRead() { + return op() == READ; + } + + /** + * @param explicitVer Explicit version. + */ + public void explicitVersion(GridCacheVersion explicitVer) { + this.explicitVer = explicitVer; + } + + /** + * @return Explicit version. + */ + public GridCacheVersion explicitVersion() { + return explicitVer; + } + + /** + * @return DR version. + */ + @Nullable public GridCacheVersion drVersion() { + return drVer; + } + + /** + * @param drVer DR version. + */ + public void drVersion(@Nullable GridCacheVersion drVer) { + this.drVer = drVer; + } + + /** + * @return Put filters. + */ + public IgnitePredicate>[] filters() { + return filters; + } + + /** + * @param filters Put filters. + */ + public void filters(IgnitePredicate>[] filters) { + filterBytes = null; + + this.filters = filters; + } + + /** + * @return {@code True} if filters passed for fast-commit transactions. + */ + public boolean filtersPassed() { + return filtersPassed; + } + + /** + * @param filtersPassed {@code True} if filters passed for fast-commit transactions. + */ + public void filtersPassed(boolean filtersPassed) { + this.filtersPassed = filtersPassed; + } + + /** + * @return {@code True} if filters are set. + */ + public boolean filtersSet() { + return filtersSet; + } + + /** + * @param filtersSet {@code True} if filters are set and should not be replaced. + */ + public void filtersSet(boolean filtersSet) { + this.filtersSet = filtersSet; + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void marshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + // Do not serialize filters if they are null. + if (depEnabled) { + if (keyBytes == null) + keyBytes = entry.getOrMarshalKeyBytes(); + + if (transformClosBytes == null && transformClosCol != null) + transformClosBytes = CU.marshal(ctx, transformClosCol); + + if (F.isEmptyOrNulls(filters)) + filterBytes = null; + else if (filterBytes == null) + filterBytes = CU.marshal(ctx, filters); + } + + val.marshal(ctx, context(), depEnabled); + } + + /** + * Unmarshalls entry. + * + * @param ctx Cache context. + * @param clsLdr Class loader. + * @throws IgniteCheckedException If un-marshalling failed. + */ + public void unmarshal(GridCacheSharedContext ctx, boolean near, ClassLoader clsLdr) throws IgniteCheckedException { + if (this.ctx == null) { + GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + + if (cacheCtx.isNear() && !near) + cacheCtx = cacheCtx.near().dht().context(); + else if (!cacheCtx.isNear() && near) + cacheCtx = cacheCtx.dht().near().context(); + + this.ctx = cacheCtx; + } + + if (depEnabled) { + // Don't unmarshal more than once by checking key for null. + if (key == null) + key = ctx.marshaller().unmarshal(keyBytes, clsLdr); + + // Unmarshal transform closure anyway if it exists. + if (transformClosBytes != null && transformClosCol == null) + transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr); + + if (filters == null && filterBytes != null) { + filters = ctx.marshaller().unmarshal(filterBytes, clsLdr); + + if (filters == null) + filters = CU.empty(); + } + } + + val.unmarshal(this.ctx, clsLdr, depEnabled); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(depEnabled); + + if (depEnabled) { + U.writeByteArray(out, keyBytes); + U.writeByteArray(out, transformClosBytes); + U.writeByteArray(out, filterBytes); + } + else { + out.writeObject(key); + U.writeCollection(out, transformClosCol); + U.writeArray(out, filters); + } + + out.writeInt(cacheId); + + val.writeTo(out); + + out.writeLong(ttl); + out.writeLong(drExpireTime); + + CU.writeVersion(out, explicitVer); + out.writeBoolean(grpLock); + CU.writeVersion(out, drVer); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + depEnabled = in.readBoolean(); + + if (depEnabled) { + keyBytes = U.readByteArray(in); + transformClosBytes = U.readByteArray(in); + filterBytes = U.readByteArray(in); + } + else { + key = (K)in.readObject(); + transformClosCol = U.readCollection(in); + filters = U.readEntryFilterArray(in); + } + + cacheId = in.readInt(); + + val.readFrom(in); + + ttl = in.readLong(); + drExpireTime = in.readLong(); + + explicitVer = CU.readVersion(in); + grpLock = in.readBoolean(); + drVer = CU.readVersion(in); + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public Class deployClass() { + ClassLoader clsLdr = getClass().getClassLoader(); + + V val = value(); + + // First of all check classes that may be loaded by class loader other than application one. + return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ? + key.getClass() : val != null ? val.getClass() : getClass(); + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + return deployClass().getClassLoader(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(IgniteTxEntry.class, this, + "keyBytesSize", keyBytes == null ? "null" : Integer.toString(keyBytes.length), + "xidVer", tx == null ? "null" : tx.xidVersion()); + } + + /** + * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes. + */ + private static class TxEntryValueHolder { + /** */ + @GridToStringInclude + private V val; + + /** */ + @GridToStringExclude + private byte[] valBytes; + + /** */ + @GridToStringInclude + private GridCacheOperation op = NOOP; + + /** Flag indicating that value has been set for write. */ + private boolean hasWriteVal; + + /** Flag indicating that value has been set for read. */ + private boolean hasReadVal; + + /** Flag indicating that bytes were sent. */ + private boolean valBytesSent; + + /** + * @param op Cache operation. + * @param val Value. + * @param hasWriteVal Write value presence flag. + * @param hasReadVal Read value presence flag. + */ + public void value(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) { + if (hasReadVal && this.hasWriteVal) + return; + + boolean clean = this.val != null; + + this.op = op; + this.val = val; + + if (clean) + valBytes = null; + + this.hasWriteVal = hasWriteVal || op == CREATE || op == UPDATE || op == DELETE; + this.hasReadVal = hasReadVal || op == READ; + } + + /** + * @return {@code True} if has read or write value. + */ + public boolean hasValue() { + return hasWriteVal || hasReadVal; + } + + /** + * Gets stored value. + * + * @return Value. + */ + public V value() { + return val; + } + + /** + * @param val Stored value. + */ + public void value(@Nullable V val) { + boolean clean = this.val != null; + + this.val = val; + + if (clean) + valBytes = null; + } + + /** + * Gets cache operation. + * + * @return Cache operation. + */ + public GridCacheOperation op() { + return op; + } + + /** + * Sets cache operation. + * + * @param op Cache operation. + */ + public void op(GridCacheOperation op) { + this.op = op; + } + + /** + * @return {@code True} if write value was set. + */ + public boolean hasWriteValue() { + return hasWriteVal; + } + + /** + * @return {@code True} if read value was set. + */ + public boolean hasReadValue() { + return hasReadVal; + } + + /** + * Sets value bytes. + * + * @param valBytes Value bytes to set. + */ + public void valueBytes(@Nullable byte[] valBytes) { + this.valBytes = valBytes; + } + + /** + * Gets value bytes. + * + * @return Value bytes. + */ + public byte[] valueBytes() { + return valBytes; + } + + /** + * @param ctx Cache context. + * @param depEnabled Deployment enabled flag. + * @throws IgniteCheckedException If marshaling failed. + */ + public void marshal(GridCacheSharedContext sharedCtx, GridCacheContext ctx, boolean depEnabled) + throws IgniteCheckedException { + boolean valIsByteArr = val != null && val instanceof byte[]; + + // Do not send write values to remote nodes. + if (hasWriteVal && val != null && !valIsByteArr && valBytes == null && + (depEnabled || !ctx.isUnmarshalValues())) + valBytes = CU.marshal(sharedCtx, val); + + valBytesSent = hasWriteVal && !valIsByteArr && valBytes != null && (depEnabled || !ctx.isUnmarshalValues()); + } + + /** + * @param ctx Cache context. + * @param ldr Class loader. + * @param depEnabled Deployment enabled flag. + * @throws IgniteCheckedException If unmarshalling failed. + */ + public void unmarshal(GridCacheContext ctx, ClassLoader ldr, boolean depEnabled) throws IgniteCheckedException { + if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled)) + val = ctx.marshaller().unmarshal(valBytes, ldr); + } + + /** + * @param out Data output. + * @throws IOException If failed. + */ + public void writeTo(ObjectOutput out) throws IOException { + out.writeBoolean(hasWriteVal); + out.writeBoolean(valBytesSent); + + if (hasWriteVal) { + if (valBytesSent) + U.writeByteArray(out, valBytes); + else { + if (val != null && val instanceof byte[]) { + out.writeBoolean(true); + + U.writeByteArray(out, (byte[])val); + } + else { + out.writeBoolean(false); + + out.writeObject(val); + } + } + } + + out.writeInt(op.ordinal()); + } + + /** + * @param in Data input. + * @throws IOException If failed. + * @throws ClassNotFoundException If failed. + */ + public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException { + hasWriteVal = in.readBoolean(); + valBytesSent = in.readBoolean(); + + if (hasWriteVal) { + if (valBytesSent) + valBytes = U.readByteArray(in); + else + val = in.readBoolean() ? (V)U.readByteArray(in) : (V)in.readObject(); + } + + op = fromOrdinal(in.readInt()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "[op=" + op +", val=" + val + ", valBytesLen=" + (valBytes == null ? 0 : valBytes.length) + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEx.java new file mode 100644 index 0000000..8c49f57 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEx.java @@ -0,0 +1,520 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache.transactions; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.timeout.*; +import org.gridgain.grid.util.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Transaction managed by cache ({@code 'Ex'} stands for external). + */ +public interface IgniteTxEx extends IgniteTx, GridTimeoutObject { + @SuppressWarnings("PublicInnerClass") + public enum FinalizationStatus { + /** Transaction was not finalized yet. */ + NONE, + + /** Transaction is being finalized by user. */ + USER_FINISH, + + /** Recovery request is received, user finish requests should be ignored. */ + RECOVERY_WAIT, + + /** Transaction is being finalized by recovery procedure. */ + RECOVERY_FINISH + } + + /** + * @return Size of the transaction. + */ + public int size(); + + /** + * @return {@code True} if transaction is allowed to use store. + */ + public boolean storeEnabled(); + + /** + * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with + * store enabled. + */ + public boolean storeUsed(); + + /** + * Checks if this is system cache transaction. System transactions are isolated from user transactions + * because some of the public API methods may be invoked inside user transactions and internally start + * system cache transactions. + * + * @return {@code True} if transaction is started for system cache. + */ + public boolean system(); + + /** + * @return Last recorded topology version. + */ + public long topologyVersion(); + + /** + * @return Flag indicating whether transaction is implicit with only one key. + */ + public boolean implicitSingle(); + + /** + * @return Collection of cache IDs involved in this transaction. + */ + public Collection activeCacheIds(); + + /** + * Attempts to set topology version and returns the current value. + * If topology version was previously set, then it's value will + * be returned (but not updated). + * + * @param topVer Topology version. + * @return Recorded topology version. + */ + public long topologyVersion(long topVer); + + /** + * @return {@code True} if transaction is empty. + */ + public boolean empty(); + + /** + * @return {@code True} if transaction group-locked. + */ + public boolean groupLock(); + + /** + * @return Group lock key if {@link #groupLock()} is {@code true}. + */ + @Nullable public IgniteTxKey groupLockKey(); + + /** + * @return {@code True} if preparing flag was set with this call. + */ + public boolean markPreparing(); + + /** + * @param status Finalization status to set. + * @return {@code True} if could mark was set. + */ + public boolean markFinalizing(FinalizationStatus status); + + /** + * @param part Invalid partition. + */ + public void addInvalidPartition(GridCacheContext cacheCtx, int part); + + /** + * @return Invalid partitions. + */ + public Set invalidPartitions(); + + /** + * Gets owned version for near remote transaction. + * + * @param key Key to get version for. + * @return Owned version, if any. + */ + @Nullable public GridCacheVersion ownedVersion(IgniteTxKey key); + + /** + * Gets ID of additional node involved. For example, in DHT case, other node is + * near node ID. + * + * @return Parent node IDs. + */ + @Nullable public UUID otherNodeId(); + + /** + * @return Event node ID. + */ + public UUID eventNodeId(); + + /** + * Gets node ID which directly started this transaction. In case of DHT local transaction it will be + * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote + * transaction it will be starter node ID. + * + * @return Originating node ID. + */ + public UUID originatingNodeId(); + + /** + * @return Master node IDs. + */ + public Collection masterNodeIds(); + + /** + * @return Near transaction ID. + */ + @Nullable public GridCacheVersion nearXidVersion(); + + /** + * @return Transaction nodes mapping (primary node -> related backup nodes). + */ + @Nullable public Map> transactionNodes(); + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + * @throws GridCacheEntryRemovedException If entry has been removed. + */ + public boolean ownsLock(GridCacheEntryEx entry) throws GridCacheEntryRemovedException; + + /** + * @param entry Entry to check. + * @return {@code True} if lock is owned. + */ + public boolean ownsLockUnsafe(GridCacheEntryEx entry); + + /** + * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR + * transactions because serializability of transaction is enforced on primary node. All + * other transaction types must enforce it. + * + * @return Enforce serializable flag. + */ + public boolean enforceSerializable(); + + /** + * @return {@code True} if near transaction. + */ + public boolean near(); + + /** + * @return {@code True} if DHT transaction. + */ + public boolean dht(); + + /** + * @return {@code True} if dht colocated transaction. + */ + public boolean colocated(); + + /** + * @return {@code True} if transaction is local, {@code false} if it's remote. + */ + public boolean local(); + + /** + * @return {@code True} if transaction is replicated. + */ + public boolean replicated(); + + /** + * @return Subject ID initiated this transaction. + */ + public UUID subjectId(); + + /** + * Task name hash in case if transaction was initiated within task execution. + * + * @return Task name hash. + */ + public int taskNameHash(); + + /** + * @return {@code True} if transaction is user transaction, which means: + *
    + *
  • Explicit
  • + *
  • Local
  • + *
  • Not DHT
  • + *
+ */ + public boolean user(); + + /** + * @return {@code True} if transaction is configured with synchronous commit flag. + */ + public boolean syncCommit(); + + /** + * @return {@code True} if transaction is configured with synchronous rollback flag. + */ + public boolean syncRollback(); + + /** + * @param key Key to check. + * @return {@code True} if key is present. + */ + public boolean hasWriteKey(IgniteTxKey key); + + /** + * @return Read set. + */ + public Set> readSet(); + + /** + * @return Write set. + */ + public Set> writeSet(); + + /** + * @return All transaction entries. + */ + public Collection> allEntries(); + + /** + * @return Write entries. + */ + public Collection> writeEntries(); + + /** + * @return Read entries. + */ + public Collection> readEntries(); + + /** + * @return Transaction write map. + */ + public Map, IgniteTxEntry> writeMap(); + + /** + * @return Transaction read map. + */ + public Map, IgniteTxEntry> readMap(); + + /** + * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests. + * + * @return Collection of recovery writes. + */ + public Collection> recoveryWrites(); + + /** + * Gets a list of entries that needs to be locked on the next step of prepare stage of + * optimistic transaction. + * + * @return List of tx entries for optimistic locking. + */ + public Collection> optimisticLockEntries(); + + /** + * Seals transaction for updates. + */ + public void seal(); + + /** + * @param key Key for the entry. + * @return Entry for the key (either from write set or read set). + */ + @Nullable public IgniteTxEntry entry(IgniteTxKey key); + + /** + * @param failFast Fail-fast flag. + * @param key Key to look up. + * @param filter Filter to check. + * @return Current value for the key within transaction. + * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}. + */ + @Nullable public GridTuple peek( + GridCacheContext ctx, + boolean failFast, + K key, + @Nullable IgnitePredicate>[] filter) throws GridCacheFilterFailedException; + + /** + * @return Start version. + */ + public GridCacheVersion startVersion(); + + /** + * @return Transaction version. + */ + public GridCacheVersion xidVersion(); + + /** + * @return Version created at commit time. + */ + public GridCacheVersion commitVersion(); + + /** + * @param commitVer Commit version. + * @return {@code True} if version was set. + */ + public boolean commitVersion(GridCacheVersion commitVer); + + /** + * @return End version (a.k.a. 'tnc' or 'transaction number counter') + * assigned to this transaction at the end of write phase. + */ + public GridCacheVersion endVersion(); + + /** + * Prepare state. + * + * @throws IgniteCheckedException If failed. + */ + public void prepare() throws IgniteCheckedException; + + /** + * Prepare stage. + * + * @return Future for prepare step. + */ + public IgniteFuture> prepareAsync(); + + /** + * @param endVer End version (a.k.a. 'tnc' or 'transaction number counter') + * assigned to this transaction at the end of write phase. + */ + public void endVersion(GridCacheVersion endVer); + + /** + * @return Transaction write version. For all transactions except DHT transactions, will be equal to + * {@link #xidVersion()}. + */ + public GridCacheVersion writeVersion(); + + /** + * Sets write version. + * + * @param ver Write version. + */ + public void writeVersion(GridCacheVersion ver); + + /** + * @return Future for transaction completion. + */ + public IgniteFuture finishFuture(); + + /** + * @param state Transaction state. + * @return {@code True} if transition was valid, {@code false} otherwise. + */ + public boolean state(IgniteTxState state); + + /** + * @param invalidate Invalidate flag. + */ + public void invalidate(boolean invalidate); + + /** + * @param sysInvalidate System invalidate flag. + */ + public void systemInvalidate(boolean sysInvalidate); + + /** + * @return System invalidate flag. + */ + public boolean isSystemInvalidate(); + + /** + * TODO-gg-4004 Put rollback async on public API? + * Asynchronously rollback this transaction. + * + * @return Rollback future. + */ + public IgniteFuture rollbackAsync(); + + /** + * Callback invoked whenever there is a lock that has been acquired + * by this transaction for any of the participating entries. + * + * @param entry Cache entry. + * @param owner Lock candidate that won ownership of the lock. + * @return {@code True} if transaction cared about notification. + */ + public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner); + + /** + * @return {@code True} if transaction timed out. + */ + public boolean timedOut(); + + /** + * @return {@code True} if transaction had completed successfully or unsuccessfully. + */ + public boolean done(); + + /** + * @return {@code True} for OPTIMISTIC transactions. + */ + public boolean optimistic(); + + /** + * @return {@code True} for PESSIMISTIC transactions. + */ + public boolean pessimistic(); + + /** + * @return {@code True} if read-committed. + */ + public boolean readCommitted(); + + /** + * @return {@code True} if repeatable-read. + */ + public boolean repeatableRead(); + + /** + * @return {@code True} if serializable. + */ + public boolean serializable(); + + /** + * Checks whether given key has been removed within transaction. + * + * @param key Key to check. + * @return {@code True} if key has been removed. + */ + public boolean removed(IgniteTxKey key); + + /** + * Gets allowed remaining time for this transaction. + * + * @return Remaining time. + * @throws IgniteTxTimeoutException If transaction timed out. + */ + public long remainingTime() throws IgniteTxTimeoutException; + + /** + * @return Alternate transaction versions. + */ + public Collection alternateVersions(); + + /** + * @return {@code True} if transaction needs completed versions for processing. + */ + public boolean needsCompletedVersions(); + + /** + * @param base Base for committed versions. + * @param committed Committed transactions relative to base. + * @param rolledback Rolled back transactions relative to base. + */ + public void completedVersions(GridCacheVersion base, Collection committed, + Collection rolledback); + + /** + * @return {@code True} if transaction has at least one internal entry. + */ + public boolean internal(); + + /** + * @return {@code True} if transaction is a one-phase-commit transaction. + */ + public boolean onePhaseCommit(); + + /** + * @return {@code True} if transaction has transform entries. This flag will be only set for local + * transactions. + */ + public boolean hasTransforms(); +}