ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [09/42] incubator-ignite git commit: # IGNITE-737. Rework Visor code to support ClusterGroup.forXXX(cacheName) on daemon node.
Date Tue, 28 Apr 2015 14:33:56 GMT
# IGNITE-737. Rework Visor code to support ClusterGroup.forXXX(cacheName) on daemon node.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2dd74cd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2dd74cd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2dd74cd8

Branch: refs/heads/ignite-sprint-4
Commit: 2dd74cd8a6a8d7bfca12025ff030fd2f7d690b9c
Parents: 925c45a
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Thu Apr 16 15:36:20 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Thu Apr 16 15:36:20 2015 +0700

----------------------------------------------------------------------
 .../internal/visor/VisorTaskArgument.java       |  2 ++
 .../internal/visor/query/VisorQueryArg.java     | 22 +++++++------
 .../internal/visor/query/VisorQueryJob.java     | 34 ++++++++++++++++++++
 .../internal/visor/query/VisorQueryTask.java    | 19 ++++++-----
 .../commands/cache/VisorCacheClearCommand.scala |  4 +--
 .../commands/cache/VisorCacheCommand.scala      |  2 +-
 .../commands/cache/VisorCacheScanCommand.scala  |  4 +--
 .../commands/cache/VisorCacheSwapCommand.scala  |  4 +--
 .../scala/org/apache/ignite/visor/visor.scala   | 19 +++++++++++
 9 files changed, 85 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java
index 1a4e498..e029678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorTaskArgument.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.visor;
 
+import org.apache.ignite.cluster.*;
+
 import java.io.*;
 import java.util.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
index 38fac1f..2466868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.visor.query;
 
+import org.jetbrains.annotations.*;
+
 import java.io.*;
 import java.util.*;
 
@@ -27,8 +29,8 @@ public class VisorQueryArg implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Node ID in case of local cache. */
-    private final UUID locCacheNodeId;
+    /** Optional node ID. */
+    private final UUID nid;
 
     /** Cache name for query. */
     private final String cacheName;
@@ -37,26 +39,26 @@ public class VisorQueryArg implements Serializable {
     private final String qryTxt;
 
     /** Result batch size. */
-    private final Integer pageSize;
+    private final int pageSize;
 
     /**
-     * @param locCacheNodeId Node ID in case of local cache or {@code null} otherwise.
+     * @param nid Optional node ID with cache.
      * @param cacheName Cache name for query.
      * @param qryTxt Query text.
      * @param pageSize Result batch size.
      */
-    public VisorQueryArg(UUID locCacheNodeId, String cacheName, String qryTxt, Integer pageSize)
{
-        this.locCacheNodeId = locCacheNodeId;
+    public VisorQueryArg(@Nullable UUID nid, String cacheName, String qryTxt, int pageSize)
{
+        this.nid = nid;
         this.cacheName = cacheName;
         this.qryTxt = qryTxt;
         this.pageSize = pageSize;
     }
 
     /**
-     * @return Node ID in case of local cache or {@code null} otherwise.
+     * @return Optional node ID.
      */
-    public UUID localCacheNodeId() {
-        return locCacheNodeId;
+    @Nullable public UUID nodeId() {
+        return nid;
     }
 
     /**
@@ -76,7 +78,7 @@ public class VisorQueryArg implements Serializable {
     /**
      * @return Page size.
      */
-    public Integer pageSize() {
+    public int pageSize() {
         return pageSize;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index dcc2242..f5a2746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.visor.query;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -32,6 +33,7 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
 
 /**
  * Job for execute SCAN or SQL query and get first page of results.
@@ -60,9 +62,41 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<?
exten
         return cacheProcessor.jcache(cacheName);
     }
 
+    /**
+     * @return Query task class name.
+     */
+    protected Class<? extends VisorQueryTask> task() {
+        return VisorQueryTask.class;
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteBiTuple<? extends Exception, VisorQueryResultEx> run(VisorQueryArg
arg) {
         try {
+            String cacheName = arg.cacheName();
+
+            UUID nid = ignite.localNode().id();
+
+            // If node was not specified then we need to check if this node could be used
for query
+            // or we need to send task to appropriate node.
+            if (arg.nodeId() == null) {
+                ClusterGroup prj = ignite.cluster().forDataNodes(cacheName);
+
+                if (prj.node() == null)
+                    throw new IgniteException("No data nodes for cache: " + escapeName(cacheName));
+
+                // Current node does not fit.
+                if (prj.node(nid) == null) {
+                    Collection<ClusterNode> prjNodes = prj.nodes();
+
+                    Collection<UUID> nids = new ArrayList<>(prjNodes.size());
+
+                    for (ClusterNode node : prjNodes)
+                        nids.add(node.id());
+
+                    return ignite.compute(prj).withNoFailover().execute(task(), new VisorTaskArgument<>(nids,
arg, false));
+                }
+            }
+
             boolean scan = arg.queryTxt().toUpperCase().startsWith("SCAN");
 
             String qryId = (scan ? SCAN_QRY_NAME : SQL_QRY_NAME) + "-" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 6683205..2ce011e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.visor.query;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.visor.*;
 import org.apache.ignite.lang.*;
@@ -39,17 +40,19 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg,
IgniteBiTupl
     /** {@inheritDoc} */
     @Override protected Map<? extends ComputeJob, ClusterNode> map0(List<ClusterNode>
subgrid,
         VisorTaskArgument<VisorQueryArg> arg) {
-        String cacheName = taskArg.cacheName();
+        String cache = taskArg.cacheName();
 
         ClusterNode node;
 
-        if (taskArg.localCacheNodeId() == null) {
-            ClusterGroup prj = (ignite.cluster().localNode().isDaemon())
-                ? ignite.cluster().forRemotes()
-                : ignite.cluster().forDataNodes(cacheName);
+        UUID nid = taskArg.nodeId();
+
+        IgniteClusterEx cluster = ignite.cluster();
+
+        if (nid == null) {
+            ClusterGroup prj = cluster.localNode().isDaemon() ? cluster.forRemotes() : cluster.forDataNodes(cache);
 
             if (prj.nodes().isEmpty())
-                throw new IgniteException("No data nodes for cache: " + escapeName(cacheName));
+                throw new IgniteException("No data nodes for cache: " + escapeName(cache));
 
             // First try to take local node to avoid network hop.
             node = prj.node(ignite.localNode().id());
@@ -59,10 +62,10 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryArg,
IgniteBiTupl
                 node = prj.forRandom().node();
         }
         else {
-            node = ignite.cluster().node(taskArg.localCacheNodeId());
+            node = cluster.node(nid);
 
             if (node == null)
-                throw new IgniteException("No data node for local cache: " + escapeName(cacheName));
+                throw new IgniteException("Node not found: " + nid);
         }
 
         assert node != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala
index ebdaa34..f401d15 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheClearCommand.scala
@@ -100,10 +100,10 @@ class VisorCacheClearCommand {
             case Some(name) => name
         }
 
-        val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_))
+        val prj = projectionForNode(node)
 
         if (prj.nodes().isEmpty)
-            scold(node.fold("Topology is empty.")(n => "Can't find node with specified
id: " + n.id())).^^
+            scold(messageNodeNotFound(node)).^^
 
         val t = VisorTextTable()
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
index f45597e..e74cb2c 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheCommand.scala
@@ -495,7 +495,7 @@ class VisorCacheCommand {
         assert(node != null)
 
         try {
-            val prj = node.fold(ignite.cluster.forRemotes())(ignite.cluster.forNode(_))
+            val prj = projectionForNode(node)
 
             val nids = prj.nodes().map(_.id())
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 6c2da03..e9c6f6c 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -136,10 +136,10 @@ class VisorCacheScanCommand {
             case Some(name) => name
         }
 
-        val n = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_)).node()
+        val n = projectionForNode(node).node()
 
         if (n == null) {
-            scold(node.fold("Topology is empty.")(n => "Can't find node with specified
id: " + n.id())).^^
+            scold(messageNodeNotFound(node)).^^
 
             return
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala
b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala
index 0a20166..5589f8c 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheSwapCommand.scala
@@ -104,7 +104,7 @@ class VisorCacheSwapCommand {
         }
 
 
-        val prj = node.fold(ignite.cluster.forRandom())(ignite.cluster.forNode(_))
+        val prj = projectionForNode(node)
 
         if (prj.nodes().isEmpty) {
             val msg =
@@ -113,7 +113,7 @@ class VisorCacheSwapCommand {
                 else
                     "Can't find nodes with specified cache: " + cacheName
 
-            scold(msg).^^
+            scold(messageNodeNotFound(node, Some(msg))).^^
         }
 
         val t = VisorTextTable()

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2dd74cd8/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
index 431701a..72f8a32 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala
@@ -255,6 +255,25 @@ object visor extends VisorTag {
         }
     }
 
+    /**
+     * @param node Optional node.
+     * @return Projection with specified node or projection with random node if specified
node is `None`.
+     */
+    def projectionForNode(node: Option[ClusterNode]): ClusterGroup = node match {
+        case Some(n) => ignite.cluster.forNode(n)
+        case None => ignite.cluster.forRandom()
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Optional message.
+     * @return Message about why node was not found.
+     */
+    def messageNodeNotFound(node: Option[ClusterNode], msg: Option[String] = None): String
= node match {
+        case Some(n) => msg.getOrElse("Can't find node with specified id: " + n.id())
+        case None => "Topology is empty."
+    }
+
     Runtime.getRuntime.addShutdownHook(new Thread() {
         override def run() {
             try


Mime
View raw message