ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/22] incubator-ignite git commit: #ignite-51: IgniteTxEntry implements Message.
Date Wed, 04 Mar 2015 13:26:37 GMT
#ignite-51: IgniteTxEntry implements Message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ac04da2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ac04da2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ac04da2f

Branch: refs/heads/ignite-51
Commit: ac04da2fa7ba52d0f976b77e25108f1a0b60e0d6
Parents: 4ba6e20
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Tue Mar 3 17:56:06 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Tue Mar 3 17:56:06 2015 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   4 +
 .../dht/GridDhtTxPrepareRequest.java            |  88 +++++---
 .../near/GridNearTxPrepareFuture.java           |  13 +-
 .../near/GridNearTxPrepareResponse.java         | 218 ++++++++++++++++---
 .../cache/transactions/IgniteTxEntry.java       |  70 +-----
 .../cache/transactions/IgniteTxKey.java         |  14 +-
 6 files changed, 251 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 814e380..ccf8395 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -541,6 +541,10 @@ public class GridIoMessageFactory implements MessageFactory {
                 msg = new IgniteTxEntry.TxEntryValueHolder();
 
                 break;
+            case 99:
+                msg = new GridNearTxPrepareResponse.OwnedValue();
+
+                break;
 
             default:
                 if (ext != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index ec45af1..de812c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -61,17 +61,16 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     @GridDirectTransient
     private Collection<IgniteTxEntry> nearWrites;
 
-    /** Serialized near writes. */
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> nearWritesBytes;
-
     /** Owned versions by key. */
     @GridToStringInclude
     @GridDirectTransient
     private Map<IgniteTxKey, GridCacheVersion> owned;
 
-    /** Owned versions bytes. */
-    private byte[] ownedBytes;
+    /** Owned keys. */
+    private Collection<IgniteTxKey> ownedKeys;
+
+    /** Owned values. */
+    private Collection<GridCacheVersion> ownedVals;
 
     /** Near transaction ID. */
     private GridCacheVersion nearXidVer;
@@ -272,8 +271,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException
{
         super.prepareMarshal(ctx);
 
-        if (ownedBytes == null && owned != null) {
-            ownedBytes = CU.marshal(ctx, owned);
+        if (owned != null) {
+            ownedKeys = owned.keySet();
+
+            ownedVals = owned.values();
+
+            for (IgniteTxKey key: ownedKeys)
+                key.prepareMarshal(ctx.cacheContext(key.cacheId()));
 
             if (ctx.deploymentEnabled()) {
                 for (IgniteTxKey k : owned.keySet())
@@ -281,31 +285,35 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
             }
         }
 
-        if (nearWrites != null) {
+        if (nearWrites != null)
             marshalTx(nearWrites, ctx);
-
-            nearWritesBytes = new ArrayList<>(nearWrites.size());
-
-            for (IgniteTxEntry e : nearWrites)
-                nearWritesBytes.add(ctx.marshaller().marshal(e));
-        }
     }
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws
IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (ownedBytes != null && owned == null)
-            owned = ctx.marshaller().unmarshal(ownedBytes, ldr);
+        if (ownedKeys != null && owned == null) {
+            owned = new HashMap<>();
+
+            assert ownedKeys.size() == ownedVals.size();
+
+            Iterator<IgniteTxKey> keyIter = ownedKeys.iterator();
 
-        if (nearWritesBytes != null) {
-            nearWrites = new ArrayList<>(nearWritesBytes.size());
+            Iterator<GridCacheVersion> valIter = ownedVals.iterator();
 
-            for (byte[] arr : nearWritesBytes)
-                nearWrites.add(ctx.marshaller().<IgniteTxEntry>unmarshal(arr, ldr));
+            while (keyIter.hasNext()) {
+                IgniteTxKey key = keyIter.next();
+
+                key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+
+                owned.put(key, valIter.next());
+            }
 
-            unmarshalTx(nearWrites, true, ctx, ldr);
         }
+
+        unmarshalTx(nearWrites, true, ctx, ldr);
+
     }
 
     /** {@inheritDoc} */
@@ -359,7 +367,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -371,30 +379,36 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeByteArray("ownedBytes", ownedBytes))
+                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeBitSet("preloadKeys", preloadKeys))
+                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 33:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
             case 34:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 35:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 36:
                 if (!writer.writeLong("topVer", topVer))
                     return false;
 
@@ -457,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 29:
-                nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR);
+                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -473,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 31:
-                ownedBytes = reader.readByteArray("ownedBytes");
+                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -481,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 32:
-                preloadKeys = reader.readBitSet("preloadKeys");
+                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -489,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 33:
-                subjId = reader.readUuid("subjId");
+                preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -497,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 34:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -505,6 +519,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
                 reader.incrementState();
 
             case 35:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 36:
                 topVer = reader.readLong("topVer");
 
                 if (!reader.isLastRead())
@@ -524,6 +546,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest
{
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 36;
+        return 37;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 9bb3aa7..0f39b5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -935,7 +934,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
                 else {
                     assert F.isEmpty(res.invalidPartitions());
 
-                    for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>>
entry : res.ownedValues().entrySet()) {
+                    for (Map.Entry<IgniteTxKey, GridNearTxPrepareResponse.OwnedValue>
entry : res.ownedValues().entrySet()) {
                         IgniteTxEntry txEntry = tx.entry(entry.getKey());
 
                         assert txEntry != null;
@@ -947,17 +946,17 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
                                 if (cacheCtx.isNear()) {
                                     GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();
 
-                                    IgniteBiTuple<GridCacheVersion, CacheObject> tup
= entry.getValue();
+                                    GridNearTxPrepareResponse.OwnedValue tup = entry.getValue();
 
-                                    nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(),
-                                        tup.get1(), m.node().id());
+                                    nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(),
+                                        tup.version(), m.node().id());
                                 }
                                 else if (txEntry.cached().detached()) {
                                     GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
 
-                                    IgniteBiTuple<GridCacheVersion, CacheObject> tup
= entry.getValue();
+                                    GridNearTxPrepareResponse.OwnedValue tup = entry.getValue();
 
-                                    detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion());
+                                    detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion());
                                 }
 
                                 break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 78cfb73..e30f89c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -23,9 +23,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -63,12 +61,17 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     /** Map of owned values to set on near node. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> ownedVals;
+    private Map<IgniteTxKey, OwnedValue> ownedVals;
 
-    /** Marshalled owned bytes. */
+    /** OwnedVals' keys for marshalling. */
     @GridToStringExclude
-    @GridDirectCollection(byte[].class)
-    private Collection<byte[]> ownedValsBytes;
+    @GridDirectTransient
+    private Collection<IgniteTxKey> ownedValKeys;
+
+    /** OwnedVals' values for marshalling. */
+    @GridToStringExclude
+    @GridDirectTransient
+    private Collection<OwnedValue> ownedValVals;
 
     /** Cache return value. */
     @GridDirectTransient
@@ -171,15 +174,19 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         if (ownedVals == null)
             ownedVals = new HashMap<>();
 
-        ownedVals.put(key, F.t(ver, val));
+        OwnedValue oVal = new OwnedValue();
+
+        oVal.init(ver, val);
+
+        ownedVals.put(key, oVal);
     }
 
     /**
      * @return Owned values map.
      */
-    public Map<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> ownedValues()
{
+    public Map<IgniteTxKey, OwnedValue> ownedValues() {
         return ownedVals == null ?
-            Collections.<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>>emptyMap()
:
+            Collections.<IgniteTxKey, OwnedValue>emptyMap() :
             Collections.unmodifiableMap(ownedVals);
     }
 
@@ -224,22 +231,19 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException
{
         super.prepareMarshal(ctx);
 
-        if (ownedVals != null && ownedValsBytes == null) {
-            ownedValsBytes = new ArrayList<>(ownedVals.size());
-
-            for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>>
entry : ownedVals.entrySet()) {
-                IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue();
+        if (ownedVals != null) {
+            ownedValKeys = ownedVals.keySet();
 
-                GridCacheContext cctx = ctx.cacheContext(entry.getKey().cacheId());
+            ownedValVals = ownedVals.values();
 
-                entry.getKey().prepareMarshal(cctx);
+            for (IgniteTxKey key : ownedVals.keySet()) {
+                GridCacheContext cacheCtx = ctx.cacheContext(key.cacheId());
 
-                CacheObject val = tup.get2();
+                OwnedValue value = ownedVals.get(key);
 
-                if (val != null)
-                    val.prepareMarshal(cctx.cacheObjectContext());
+                key.prepareMarshal(cacheCtx);
 
-                ownedValsBytes.add(ctx.marshaller().marshal(F.t(entry.getKey(), tup.get1(),
val)));
+                value.prepareMarshal(cacheCtx.cacheObjectContext());
             }
         }
 
@@ -259,22 +263,27 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws
IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (ownedValsBytes != null && ownedVals == null) {
+        if (ownedValKeys != null && ownedVals == null) {
             ownedVals = new HashMap<>();
 
-            for (byte[] bytes : ownedValsBytes) {
-                GridTuple3<IgniteTxKey, GridCacheVersion, CacheObject> tup = ctx.marshaller().unmarshal(bytes,
ldr);
+            assert ownedValKeys.size() == ownedValVals.size();
 
-                CacheObject val = tup.get3();
+            Iterator<IgniteTxKey> keyIter = ownedValKeys.iterator();
 
-                GridCacheContext cctx = ctx.cacheContext(tup.get1().cacheId());
+            Iterator<OwnedValue> valueIter = ownedValVals.iterator();
 
-                tup.get1().finishUnmarshal(cctx, ldr);
+            while (keyIter.hasNext()) {
+                IgniteTxKey key = keyIter.next();
+
+                GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 
-                if (val != null)
-                    val.finishUnmarshal(cctx, ldr);
+                OwnedValue value = valueIter.next();
 
-                ownedVals.put(tup.get1(), F.t(tup.get2(), val));
+                key.finishUnmarshal(cctx, ldr);
+
+                value.finishUnmarshal(cctx, ldr);
+
+                ownedVals.put(key, value);
             }
         }
 
@@ -336,18 +345,24 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("ownedValsBytes", ownedValsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("ownedValKeys", ownedValKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedValVals", ownedValVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
+                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
                 if (!writer.writeByteArray("retValBytes", retValBytes))
                     return false;
 
@@ -410,7 +425,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 15:
-                ownedValsBytes = reader.readCollection("ownedValsBytes", MessageCollectionItemType.BYTE_ARR);
+                ownedValKeys = reader.readCollection("ownedValKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -418,7 +433,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 16:
-                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+                ownedValVals = reader.readCollection("ownedValVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -426,6 +441,14 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
                 reader.incrementState();
 
             case 17:
+                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
                 retValBytes = reader.readByteArray("retValBytes");
 
                 if (!reader.isLastRead())
@@ -445,11 +468,138 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 18;
+        return 19;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
     }
+
+    /**
+     * Message for owned values to set on near node.
+     */
+    public static class OwnedValue implements Message {
+        /** Cache version. */
+        private GridCacheVersion vers;
+
+        /** Cache object. */
+        private CacheObject obj;
+
+        /**
+         * Initialize OwnedValues.
+         *
+          * @param vers Cache version.
+         * @param obj Cache object.
+         */
+        void init(GridCacheVersion vers, CacheObject obj) {
+            this.vers = vers;
+            this.obj = obj;
+        }
+
+        /**
+         * @return Cache version.
+         */
+        public GridCacheVersion version() {
+            return vers;
+        }
+
+        /**
+         * @return Cache object.
+         */
+        public CacheObject cacheObject() {
+            return obj;
+        }
+
+        /**
+         * This method is called before the whole message is sent
+         * and is responsible for pre-marshalling state.
+         *
+         * @param ctx Cache object context.
+         * @throws IgniteCheckedException If failed.
+         */
+        public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException
{
+            if (obj != null)
+                obj.prepareMarshal(ctx);
+        }
+
+        /**
+         * This method is called after the whole message is recived
+         * and is responsible for unmarshalling state.
+         *
+         * @param ctx Context.
+         * @param ldr Class loader.
+         * @throws IgniteCheckedException If failed.
+         */
+        public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException
{
+            if (obj != null)
+                obj.finishUnmarshal(ctx, ldr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            writer.setBuffer(buf);
+
+            if (!writer.isHeaderWritten()) {
+                if (!writer.writeHeader(directType(), fieldsCount()))
+                    return false;
+
+                writer.onHeaderWritten();
+            }
+
+            switch (writer.state()) {
+                case 0:
+                    if (!writer.writeMessage("vers", vers))
+                        return false;
+
+                    writer.incrementState();
+
+                case 1:
+                    if (!writer.writeMessage("obj", obj))
+                        return false;
+
+                    writer.incrementState();
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            reader.setBuffer(buf);
+
+            if (!reader.beforeMessageRead())
+                return false;
+
+            switch (reader.state()) {
+                case 0:
+                    vers = reader.readMessage("vers");
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    reader.incrementState();
+
+                case 1:
+                    obj = reader.readMessage("obj");
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    reader.incrementState();
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte directType() {
+            return 99;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte fieldsCount() {
+            return 2;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 367c586..e763444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -45,7 +45,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
  * {@link #equals(Object)} method, as transaction entries should use referential
  * equality.
  */
-public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Message, OptimizedMarshallable
{
+public class IgniteTxEntry implements GridPeerDeployAware, Message, OptimizedMarshallable
{
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -931,74 +931,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable,
Messa
         return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null
? "null" : tx.xidVersion());
     }
 
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeBoolean(depEnabled);
-
-        if (depEnabled) {
-            U.writeByteArray(out, transformClosBytes);
-            U.writeByteArray(out, filterBytes);
-        }
-        else {
-            U.writeCollection(out, entryProcessorsCol);
-            U.writeArray(out, filters);
-        }
-
-        out.writeObject(key);
-
-        out.writeInt(cacheId);
-
-        val.writeTo(out);
-
-        out.writeLong(ttl);
-
-        CU.writeVersion(out, explicitVer);
-        out.writeBoolean(grpLock);
-
-        if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) {
-            out.writeBoolean(true);
-            out.writeLong(conflictExpireTime);
-        }
-        else
-            out.writeBoolean(false);
-
-        CU.writeVersion(out, conflictVer);
-
-        out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc)
: null);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked"})
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        depEnabled = in.readBoolean();
-
-        if (depEnabled) {
-            transformClosBytes = U.readByteArray(in);
-            filterBytes = U.readByteArray(in);
-        }
-        else {
-            entryProcessorsCol = U.readCollection(in);
-            filters = GridCacheUtils.readEntryFilterArray(in);
-        }
-
-        key = (KeyCacheObject)in.readObject();
-
-        cacheId = in.readInt();
-
-        val.readFrom(in);
-
-        ttl = in.readLong();
-
-        explicitVer = CU.readVersion(in);
-        grpLock = in.readBoolean();
-
-        conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE;
-        conflictVer = CU.readVersion(in);
-
-        expiryPlc = (ExpiryPolicy)in.readObject();
-    }
-
     /**
      * Auxiliary class to hold value, value-has-been-set flag, value update operation, value
bytes.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ac04da2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java
index 3062647..7d10a66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java
@@ -30,7 +30,7 @@ import java.nio.*;
  * Cache transaction key. This wrapper is needed because same keys may be enlisted in the
same transaction
  * for multiple caches.
  */
-public class IgniteTxKey implements Externalizable, Message {
+public class IgniteTxKey implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -181,18 +181,6 @@ public class IgniteTxKey implements Externalizable, Message {
     }
 
     /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(cacheId);
-        out.writeObject(key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-        cacheId = in.readInt();
-        key = (KeyCacheObject)in.readObject();
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteTxKey.class, this);
     }


Mime
View raw message