ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5932
Date Fri, 13 Oct 2017 07:57:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5932 1b272cbfa -> 77e84d2e5


ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/77e84d2e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/77e84d2e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/77e84d2e

Branch: refs/heads/ignite-5932
Commit: 77e84d2e5acca51cec7559d1a2260c7dba7fefed
Parents: 1b272cb
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Oct 13 10:57:07 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 13 10:57:07 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  6 ++
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 25 ++++++-
 .../mvcc/CoordinatorActiveQueriesMessage.java   | 79 +++++++++++++++++++-
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  2 +-
 4 files changed, 105 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/77e84d2e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6a59c24..a2440f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorActiveQueriesMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
@@ -953,6 +954,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 return msg;
 
+            case 144:
+                msg = new CoordinatorActiveQueriesMessage();
+
+                return msg;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/77e84d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 636634c..7b70f2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -1068,14 +1068,33 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
      */
     public void processClientActiveQueries(UUID nodeId,
         @Nullable Map<MvccCounter, Integer> activeQueries) {
-        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+        prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
     }
 
     /**
-     * @param activeQueries
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorActiveQueriesMessage(UUID nodeId, CoordinatorActiveQueriesMessage
msg) {
+        prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
+    }
+
+    /**
+     * @param nodeId Coordinator node ID.
+     * @param activeQueries Active queries.
      */
     public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer>
activeQueries) {
+        CoordinatorActiveQueriesMessage msg = new CoordinatorActiveQueriesMessage(activeQueries);
 
+        try {
+            ctx.io().sendToGridTopic(nodeId,
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
+        }
     }
 
     /**
@@ -1324,6 +1343,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
                 processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg);
             else if (msg instanceof NewCoordinatorQueryAckRequest)
                 processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorActiveQueriesMessage)
+                processCoordinatorActiveQueriesMessage(nodeId, (CoordinatorActiveQueriesMessage)msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg
+ ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/77e84d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
index 5032593..ba4a964 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
@@ -20,9 +20,13 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -32,31 +36,98 @@ public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage
{
     @GridDirectMap(keyType = Message.class, valueType = Integer.class)
     private Map<MvccCounter, Integer> activeQrys;
 
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorActiveQueriesMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param activeQrys Active queries.
+     */
+    CoordinatorActiveQueriesMessage(Map<MvccCounter, Integer> activeQrys) {
+        this.activeQrys = activeQrys;
+    }
+
+    /**
+     * @return Active queries.
+     */
+    @Nullable Map<MvccCounter, Integer> activeQueries() {
+        return activeQrys;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean waitForCoordinatorInit() {
         return false;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean processedFromNioThread() {
         return true;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        return false;
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG,
MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
     }
 
+    /** {@inheritDoc} */
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        return false;
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG,
MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorActiveQueriesMessage.class);
     }
 
+    /** {@inheritDoc} */
     @Override public short directType() {
-        return 0;
+        return 144;
     }
 
+    /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 0;
+        return 1;
     }
 
+    /** {@inheritDoc} */
     @Override public void onAckReceived() {
+        // No-op.
+    }
 
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorActiveQueriesMessage.class, this);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/77e84d2e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index b3fc98d..5c56f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -125,7 +125,7 @@ class PreviousCoordinatorQueries {
      * @param nodeId Node ID.
      * @param nodeQueries Active queries started on node.
      */
-    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer>
nodeQueries) {
+    void addNodeActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries)
{
         synchronized (this) {
             if (initDone)
                 return;


Mime
View raw message