Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2B03810F74 for ; Wed, 4 Mar 2015 13:27:34 +0000 (UTC) Received: (qmail 5709 invoked by uid 500); 4 Mar 2015 13:26:59 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 5666 invoked by uid 500); 4 Mar 2015 13:26:59 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 5645 invoked by uid 99); 4 Mar 2015 13:26:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 13:26:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Mar 2015 13:26:56 +0000 Received: (qmail 4105 invoked by uid 99); 4 Mar 2015 13:26:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 13:26:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2AB7AE1028; Wed, 4 Mar 2015 13:26:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 04 Mar 2015 13:26:37 -0000 Message-Id: In-Reply-To: <7b4480a30b3c4d65a9182650c63f9777@git.apache.org> References: <7b4480a30b3c4d65a9182650c63f9777@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/22] incubator-ignite git commit: #ignite-51: IgniteTxEntry implements Message. X-Virus-Checked: Checked by ClamAV on apache.org #ignite-51: IgniteTxEntry implements Message. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ac04da2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ac04da2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ac04da2f Branch: refs/heads/ignite-51 Commit: ac04da2fa7ba52d0f976b77e25108f1a0b60e0d6 Parents: 4ba6e20 Author: ivasilinets Authored: Tue Mar 3 17:56:06 2015 +0300 Committer: ivasilinets Committed: Tue Mar 3 17:56:06 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 4 + .../dht/GridDhtTxPrepareRequest.java | 88 +++++--- .../near/GridNearTxPrepareFuture.java | 13 +- .../near/GridNearTxPrepareResponse.java | 218 ++++++++++++++++--- .../cache/transactions/IgniteTxEntry.java | 70 +----- .../cache/transactions/IgniteTxKey.java | 14 +- 6 files changed, 251 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 814e380..ccf8395 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -541,6 +541,10 @@ public class GridIoMessageFactory implements MessageFactory { msg = new IgniteTxEntry.TxEntryValueHolder(); break; + case 99: + msg = new GridNearTxPrepareResponse.OwnedValue(); + + break; default: if (ext != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index ec45af1..de812c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -61,17 +61,16 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { @GridDirectTransient private Collection nearWrites; - /** Serialized near writes. */ - @GridDirectCollection(byte[].class) - private Collection nearWritesBytes; - /** Owned versions by key. */ @GridToStringInclude @GridDirectTransient private Map owned; - /** Owned versions bytes. */ - private byte[] ownedBytes; + /** Owned keys. */ + private Collection ownedKeys; + + /** Owned values. */ + private Collection ownedVals; /** Near transaction ID. */ private GridCacheVersion nearXidVer; @@ -272,8 +271,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (ownedBytes == null && owned != null) { - ownedBytes = CU.marshal(ctx, owned); + if (owned != null) { + ownedKeys = owned.keySet(); + + ownedVals = owned.values(); + + for (IgniteTxKey key: ownedKeys) + key.prepareMarshal(ctx.cacheContext(key.cacheId())); if (ctx.deploymentEnabled()) { for (IgniteTxKey k : owned.keySet()) @@ -281,31 +285,35 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } } - if (nearWrites != null) { + if (nearWrites != null) marshalTx(nearWrites, ctx); - - nearWritesBytes = new ArrayList<>(nearWrites.size()); - - for (IgniteTxEntry e : nearWrites) - nearWritesBytes.add(ctx.marshaller().marshal(e)); - } } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (ownedBytes != null && owned == null) - owned = ctx.marshaller().unmarshal(ownedBytes, ldr); + if (ownedKeys != null && owned == null) { + owned = new HashMap<>(); + + assert ownedKeys.size() == ownedVals.size(); + + Iterator keyIter = ownedKeys.iterator(); - if (nearWritesBytes != null) { - nearWrites = new ArrayList<>(nearWritesBytes.size()); + Iterator valIter = ownedVals.iterator(); - for (byte[] arr : nearWritesBytes) - nearWrites.add(ctx.marshaller().unmarshal(arr, ldr)); + while (keyIter.hasNext()) { + IgniteTxKey key = keyIter.next(); + + key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + + owned.put(key, valIter.next()); + } - unmarshalTx(nearWrites, true, ctx, ldr); } + + unmarshalTx(nearWrites, true, ctx, ldr); + } /** {@inheritDoc} */ @@ -359,7 +367,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 29: - if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -371,30 +379,36 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 31: - if (!writer.writeByteArray("ownedBytes", ownedBytes)) + if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 32: - if (!writer.writeBitSet("preloadKeys", preloadKeys)) + if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 33: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); case 34: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 35: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 36: if (!writer.writeLong("topVer", topVer)) return false; @@ -457,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR); + nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -473,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: - ownedBytes = reader.readByteArray("ownedBytes"); + ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -481,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 32: - preloadKeys = reader.readBitSet("preloadKeys"); + ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -489,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 33: - subjId = reader.readUuid("subjId"); + preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) return false; @@ -497,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 34: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -505,6 +519,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 35: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 36: topVer = reader.readLong("topVer"); if (!reader.isLastRead()) @@ -524,6 +546,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 36; + return 37; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 9bb3aa7..0f39b5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -935,7 +934,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut else { assert F.isEmpty(res.invalidPartitions()); - for (Map.Entry> entry : res.ownedValues().entrySet()) { + for (Map.Entry entry : res.ownedValues().entrySet()) { IgniteTxEntry txEntry = tx.entry(entry.getKey()); assert txEntry != null; @@ -947,17 +946,17 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut if (cacheCtx.isNear()) { GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); - IgniteBiTuple tup = entry.getValue(); + GridNearTxPrepareResponse.OwnedValue tup = entry.getValue(); - nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(), - tup.get1(), m.node().id()); + nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(), + tup.version(), m.node().id()); } else if (txEntry.cached().detached()) { GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); - IgniteBiTuple tup = entry.getValue(); + GridNearTxPrepareResponse.OwnedValue tup = entry.getValue(); - detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion()); + detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion()); } break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 78cfb73..e30f89c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -23,9 +23,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -63,12 +61,17 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** Map of owned values to set on near node. */ @GridToStringInclude @GridDirectTransient - private Map> ownedVals; + private Map ownedVals; - /** Marshalled owned bytes. */ + /** OwnedVals' keys for marshalling. */ @GridToStringExclude - @GridDirectCollection(byte[].class) - private Collection ownedValsBytes; + @GridDirectTransient + private Collection ownedValKeys; + + /** OwnedVals' values for marshalling. */ + @GridToStringExclude + @GridDirectTransient + private Collection ownedValVals; /** Cache return value. */ @GridDirectTransient @@ -171,15 +174,19 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse if (ownedVals == null) ownedVals = new HashMap<>(); - ownedVals.put(key, F.t(ver, val)); + OwnedValue oVal = new OwnedValue(); + + oVal.init(ver, val); + + ownedVals.put(key, oVal); } /** * @return Owned values map. */ - public Map> ownedValues() { + public Map ownedValues() { return ownedVals == null ? - Collections.>emptyMap() : + Collections.emptyMap() : Collections.unmodifiableMap(ownedVals); } @@ -224,22 +231,19 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (ownedVals != null && ownedValsBytes == null) { - ownedValsBytes = new ArrayList<>(ownedVals.size()); - - for (Map.Entry> entry : ownedVals.entrySet()) { - IgniteBiTuple tup = entry.getValue(); + if (ownedVals != null) { + ownedValKeys = ownedVals.keySet(); - GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId()); + ownedValVals = ownedVals.values(); - entry.getKey().prepareMarshal(cctx); + for (IgniteTxKey key : ownedVals.keySet()) { + GridCacheContext cacheCtx = ctx.cacheContext(key.cacheId()); - CacheObject val = tup.get2(); + OwnedValue value = ownedVals.get(key); - if (val != null) - val.prepareMarshal(cctx.cacheObjectContext()); + key.prepareMarshal(cacheCtx); - ownedValsBytes.add(ctx.marshaller().marshal(F.t(entry.getKey(), tup.get1(), val))); + value.prepareMarshal(cacheCtx.cacheObjectContext()); } } @@ -259,22 +263,27 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (ownedValsBytes != null && ownedVals == null) { + if (ownedValKeys != null && ownedVals == null) { ownedVals = new HashMap<>(); - for (byte[] bytes : ownedValsBytes) { - GridTuple3 tup = ctx.marshaller().unmarshal(bytes, ldr); + assert ownedValKeys.size() == ownedValVals.size(); - CacheObject val = tup.get3(); + Iterator keyIter = ownedValKeys.iterator(); - GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId()); + Iterator valueIter = ownedValVals.iterator(); - tup.get1().finishUnmarshal(cctx, ldr); + while (keyIter.hasNext()) { + IgniteTxKey key = keyIter.next(); + + GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - if (val != null) - val.finishUnmarshal(cctx, ldr); + OwnedValue value = valueIter.next(); - ownedVals.put(tup.get1(), F.t(tup.get2(), val)); + key.finishUnmarshal(cctx, ldr); + + value.finishUnmarshal(cctx, ldr); + + ownedVals.put(key, value); } } @@ -336,18 +345,24 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse writer.incrementState(); case 15: - if (!writer.writeCollection("ownedValsBytes", ownedValsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 16: - if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: + if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 18: if (!writer.writeByteArray("retValBytes", retValBytes)) return false; @@ -410,7 +425,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 15: - ownedValsBytes = reader.readCollection("ownedValsBytes", MessageCollectionItemType.BYTE_ARR); + ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -418,7 +433,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 16: - pending = reader.readCollection("pending", MessageCollectionItemType.MSG); + ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -426,6 +441,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 17: + pending = reader.readCollection("pending", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: retValBytes = reader.readByteArray("retValBytes"); if (!reader.isLastRead()) @@ -445,11 +468,138 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 18; + return 19; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString()); } + + /** + * Message for owned values to set on near node. + */ + public static class OwnedValue implements Message { + /** Cache version. */ + private GridCacheVersion vers; + + /** Cache object. */ + private CacheObject obj; + + /** + * Initialize OwnedValues. + * + * @param vers Cache version. + * @param obj Cache object. + */ + void init(GridCacheVersion vers, CacheObject obj) { + this.vers = vers; + this.obj = obj; + } + + /** + * @return Cache version. + */ + public GridCacheVersion version() { + return vers; + } + + /** + * @return Cache object. + */ + public CacheObject cacheObject() { + return obj; + } + + /** + * This method is called before the whole message is sent + * and is responsible for pre-marshalling state. + * + * @param ctx Cache object context. + * @throws IgniteCheckedException If failed. + */ + public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + if (obj != null) + obj.prepareMarshal(ctx); + } + + /** + * This method is called after the whole message is recived + * and is responsible for unmarshalling state. + * + * @param ctx Context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (obj != null) + obj.finishUnmarshal(ctx, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("vers", vers)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("obj", obj)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + vers = reader.readMessage("vers"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + obj = reader.readMessage("obj"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 99; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 367c586..e763444 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -45,7 +45,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; * {@link #equals(Object)} method, as transaction entries should use referential * equality. */ -public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Message, OptimizedMarshallable { +public class IgniteTxEntry implements GridPeerDeployAware, Message, OptimizedMarshallable { /** */ private static final long serialVersionUID = 0L; @@ -931,74 +931,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Messa return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion()); } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(depEnabled); - - if (depEnabled) { - U.writeByteArray(out, transformClosBytes); - U.writeByteArray(out, filterBytes); - } - else { - U.writeCollection(out, entryProcessorsCol); - U.writeArray(out, filters); - } - - out.writeObject(key); - - out.writeInt(cacheId); - - val.writeTo(out); - - out.writeLong(ttl); - - CU.writeVersion(out, explicitVer); - out.writeBoolean(grpLock); - - if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) { - out.writeBoolean(true); - out.writeLong(conflictExpireTime); - } - else - out.writeBoolean(false); - - CU.writeVersion(out, conflictVer); - - out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - depEnabled = in.readBoolean(); - - if (depEnabled) { - transformClosBytes = U.readByteArray(in); - filterBytes = U.readByteArray(in); - } - else { - entryProcessorsCol = U.readCollection(in); - filters = GridCacheUtils.readEntryFilterArray(in); - } - - key = (KeyCacheObject)in.readObject(); - - cacheId = in.readInt(); - - val.readFrom(in); - - ttl = in.readLong(); - - explicitVer = CU.readVersion(in); - grpLock = in.readBoolean(); - - conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE; - conflictVer = CU.readVersion(in); - - expiryPlc = (ExpiryPolicy)in.readObject(); - } - /** * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java index 3062647..7d10a66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java @@ -30,7 +30,7 @@ import java.nio.*; * Cache transaction key. This wrapper is needed because same keys may be enlisted in the same transaction * for multiple caches. */ -public class IgniteTxKey implements Externalizable, Message { +public class IgniteTxKey implements Message { /** */ private static final long serialVersionUID = 0L; @@ -181,18 +181,6 @@ public class IgniteTxKey implements Externalizable, Message { } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(cacheId); - out.writeObject(key); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheId = in.readInt(); - key = (KeyCacheObject)in.readObject(); - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteTxKey.class, this); }