ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1.5.4 Pass currently written message class to MessageWriter. (cherry picked from commit 612bcb6)
Date Tue, 19 Jan 2016 12:54:19 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 67791457f -> 09c1d5e4a


ignite-1.5.4 Pass currently written message class to MessageWriter.
(cherry picked from commit 612bcb6)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09c1d5e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09c1d5e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09c1d5e4

Branch: refs/heads/master
Commit: 09c1d5e4ab4c8b6a68060806e3945a40df13f360
Parents: 6779145
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jan 19 15:30:53 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jan 19 15:46:57 2016 +0300

----------------------------------------------------------------------
 .../internal/direct/DirectMessageWriter.java    |  5 ++
 .../stream/v1/DirectByteBufferStreamImplV1.java |  2 +
 .../stream/v2/DirectByteBufferStreamImplV2.java |  2 +
 .../continuous/CacheContinuousQueryEntry.java   | 70 ++++++++++----------
 .../continuous/AbstractContinuousMessage.java   |  5 +-
 .../ignite/internal/util/IgniteUtils.java       |  3 +
 .../ignite/internal/util/nio/GridNioServer.java | 12 ++++
 .../ignite/lang/IgniteProductVersion.java       | 18 +++++
 .../extensions/communication/MessageWriter.java |  9 ++-
 9 files changed, 89 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 730f9bc..b265c6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -61,6 +61,11 @@ public class DirectMessageWriter implements MessageWriter {
     }
 
     /** {@inheritDoc} */
+    @Override public void setCurrentWriteClass(Class<? extends Message> msgCls) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeHeader(byte type, byte fieldCnt) {
         DirectByteBufferStream stream = state.item().stream;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
index 5292f35..67fa9e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
@@ -522,6 +522,8 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream
{
                 try {
                     writer.beforeInnerMessageWrite();
 
+                    writer.setCurrentWriteClass(msg.getClass());
+
                     lastFinished = msg.writeTo(buf, writer);
                 }
                 finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
index ed3eb7c..1a4c4bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -621,6 +621,8 @@ public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream
{
                 try {
                     writer.beforeInnerMessageWrite();
 
+                    writer.setCurrentWriteClass(msg.getClass());
+
                     lastFinished = msg.writeTo(buf, writer);
                 }
                 finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index bcc2576..4d3786a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -84,17 +84,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
     private GridDeploymentInfo depInfo;
 
     /** Partition. */
-    private int _part;
+    private int part;
 
     /** Update counter. */
-    private long _updateCntr;
+    private long updateCntr;
 
     /** Flags. */
     private byte flags;
 
     /** */
     @GridToStringInclude
-    private AffinityTopologyVersion _topVer;
+    private AffinityTopologyVersion topVer;
 
     /** Filtered events. */
     private GridLongList filteredEvts;
@@ -134,9 +134,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
         this.key = key;
         this.newVal = newVal;
         this.oldVal = oldVal;
-        this._part = part;
-        this._updateCntr = updateCntr;
-        this._topVer = topVer;
+        this.part = part;
+        this.updateCntr = updateCntr;
+        this.topVer = topVer;
         this.keepBinary = keepBinary;
     }
 
@@ -144,7 +144,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
      * @return Topology version if applicable.
      */
     @Nullable AffinityTopologyVersion topologyVersion() {
-        return _topVer;
+        return topVer;
     }
 
     /**
@@ -165,14 +165,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
      * @return Partition.
      */
     int partition() {
-        return _part;
+        return part;
     }
 
     /**
      * @return Update counter.
      */
     long updateCounter() {
-        return _updateCntr;
+        return updateCntr;
     }
 
     /**
@@ -310,67 +310,67 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeInt("_part", _part))
+                if (!writer.writeInt("cacheId", cacheId))
                     return false;
 
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage("_topVer", _topVer))
+                if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal()
: -1))
                     return false;
 
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeLong("_updateCntr", _updateCntr))
+                if (!writer.writeMessage("filteredEvts", filteredEvts))
                     return false;
 
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeInt("cacheId", cacheId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal()
: -1))
+                if (!writer.writeBoolean("keepBinary", keepBinary))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("filteredEvts", filteredEvts))
+                if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByte("flags", flags))
+                if (!writer.writeMessage("newVal", newVal))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
+                if (!writer.writeMessage("oldVal", oldVal))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("key", key))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("newVal", newVal))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("oldVal", oldVal))
+                if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
                 writer.incrementState();
@@ -389,7 +389,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
 
         switch (reader.state()) {
             case 0:
-                _part = reader.readInt("_part");
+                cacheId = reader.readInt("cacheId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -397,15 +397,19 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 1:
-                _topVer = reader.readMessage("_topVer");
+                byte evtTypeOrd;
+
+                evtTypeOrd = reader.readByte("evtType");
 
                 if (!reader.isLastRead())
                     return false;
 
+                evtType = eventTypeFromOrdinal(evtTypeOrd);
+
                 reader.incrementState();
 
             case 2:
-                _updateCntr = reader.readLong("_updateCntr");
+                filteredEvts = reader.readMessage("filteredEvts");
 
                 if (!reader.isLastRead())
                     return false;
@@ -413,7 +417,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 3:
-                cacheId = reader.readInt("cacheId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -421,19 +425,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 4:
-                byte evtTypeOrd;
-
-                evtTypeOrd = reader.readByte("evtType");
+                keepBinary = reader.readBoolean("keepBinary");
 
                 if (!reader.isLastRead())
                     return false;
 
-                evtType = eventTypeFromOrdinal(evtTypeOrd);
-
                 reader.incrementState();
 
             case 5:
-                filteredEvts = reader.readMessage("filteredEvts");
+                key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
                     return false;
@@ -441,7 +441,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 6:
-                flags = reader.readByte("flags");
+                newVal = reader.readMessage("newVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -449,7 +449,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 7:
-                keepBinary = reader.readBoolean("keepBinary");
+                oldVal = reader.readMessage("oldVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -457,7 +457,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 8:
-                key = reader.readMessage("key");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -465,7 +465,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 9:
-                newVal = reader.readMessage("newVal");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -473,7 +473,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 10:
-                oldVal = reader.readMessage("oldVal");
+                updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
index 8aa683e..01a95df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java
@@ -25,6 +25,9 @@ import org.apache.ignite.lang.IgniteUuid;
  *
  */
 public abstract class AbstractContinuousMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 2781778657738703012L;
+
     /** Routine ID. */
     protected final UUID routineId;
 
@@ -54,4 +57,4 @@ public abstract class AbstractContinuousMessage implements DiscoveryCustomMessag
     @Override public boolean isMutable() {
         return false;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 6a7145c..0bf937d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9323,6 +9323,9 @@ public abstract class IgniteUtils {
         assert buf != null;
         assert buf.hasArray();
 
+        if (writer != null)
+            writer.setCurrentWriteClass(msg.getClass());
+
         boolean finished = false;
         int cnt = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index c366fe3..84c8157 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1001,6 +1001,9 @@ public class GridNioServer<T> {
 
                         assert msg != null;
 
+                        if (writer != null)
+                            writer.setCurrentWriteClass(msg.getClass());
+
                         finished = msg.writeTo(buf, writer);
 
                         if (finished && writer != null)
@@ -1023,6 +1026,9 @@ public class GridNioServer<T> {
 
                         assert msg != null;
 
+                        if (writer != null)
+                            writer.setCurrentWriteClass(msg.getClass());
+
                         finished = msg.writeTo(buf, writer);
 
                         if (finished && writer != null)
@@ -1169,6 +1175,9 @@ public class GridNioServer<T> {
 
                 assert msg != null;
 
+                if (writer != null)
+                    writer.setCurrentWriteClass(msg.getClass());
+
                 finished = msg.writeTo(buf, writer);
 
                 if (finished && writer != null)
@@ -1193,6 +1202,9 @@ public class GridNioServer<T> {
 
                 assert msg != null;
 
+                if (writer != null)
+                    writer.setCurrentWriteClass(msg.getClass());
+
                 finished = msg.writeTo(buf, writer);
 
                 if (finished && writer != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
index 12851ba..0abe64f 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteProductVersion.java
@@ -197,6 +197,24 @@ public class IgniteProductVersion implements Comparable<IgniteProductVersion>,
E
         return Long.compare(revTs, o.revTs);
     }
 
+    /**
+     * @param o Other version.
+     * @return Compare result.
+     */
+    public int compareToIgnoreTimestamp(@NotNull IgniteProductVersion o) {
+        int res = Integer.compare(major, o.major);
+
+        if (res != 0)
+            return res;
+
+        res = Integer.compare(minor, o.minor);
+
+        if (res != 0)
+            return res;
+
+        return Integer.compare(maintenance, o.maintenance);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)

http://git-wip-us.apache.org/repos/asf/ignite/blob/09c1d5e4/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 2fef564..bc2478b 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -38,6 +38,13 @@ public interface MessageWriter {
     public void setBuffer(ByteBuffer buf);
 
     /**
+     * Sets type of message currently written.
+     *
+     * @param msgCls Message type.
+     */
+    public void setCurrentWriteClass(Class<? extends Message> msgCls);
+
+    /**
      * Writes message header.
      *
      * @param type Message type.
@@ -315,4 +322,4 @@ public interface MessageWriter {
      * Resets this writer.
      */
     public void reset();
-}
\ No newline at end of file
+}


Mime
View raw message