Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2D8E717A1F for ; Thu, 16 Apr 2015 08:38:30 +0000 (UTC) Received: (qmail 56057 invoked by uid 500); 16 Apr 2015 08:38:27 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 56022 invoked by uid 500); 16 Apr 2015 08:38:27 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 56010 invoked by uid 99); 16 Apr 2015 08:38:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 08:38:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 16 Apr 2015 08:38:25 +0000 Received: (qmail 54719 invoked by uid 99); 16 Apr 2015 08:38:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 08:38:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8968AE0D66; Thu, 16 Apr 2015 08:38:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 16 Apr 2015 08:38:46 -0000 Message-Id: <58f3e8cd77b940a4b77a57089188e721@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [46/48] incubator-ignite git commit: # ignite-690 WIP X-Virus-Checked: Checked by ClamAV on apache.org # ignite-690 WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/82733d41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/82733d41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/82733d41 Branch: refs/heads/ignite-737 Commit: 82733d412f199b5fe588d3810b11da8331a25b9e Parents: e8a9dfb Author: Andrey Authored: Thu Apr 16 15:26:53 2015 +0700 Committer: Andrey Committed: Thu Apr 16 15:26:53 2015 +0700 ---------------------------------------------------------------------- .../visor/commands/ack/VisorAckCommand.scala | 11 ++- .../visor/commands/gc/VisorGcCommand.scala | 6 +- .../commands/tasks/VisorTasksCommand.scala | 22 ++--- .../scala/org/apache/ignite/visor/visor.scala | 91 ++++++++++++++------ 4 files changed, 86 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82733d41/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala index 6fa96d0..6faa276 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/ack/VisorAckCommand.scala @@ -18,15 +18,14 @@ package org.apache.ignite.visor.commands.ack import org.apache.ignite.cluster.ClusterGroupEmptyException -import org.apache.ignite.internal.visor.misc.VisorAckTask +import org.apache.ignite.visor.VisorTag +import org.apache.ignite.visor.commands.VisorConsoleCommand +import org.apache.ignite.visor.visor._ import java.util.{HashSet => JavaHashSet} -import org.apache.ignite.visor.commands.VisorConsoleCommand -import org.apache.ignite.visor.visor._ -import org.apache.ignite.visor.{VisorTag, visor} +import org.apache.ignite.internal.visor.misc.VisorAckTask -import scala.collection.JavaConversions._ import scala.language.implicitConversions /** @@ -105,7 +104,7 @@ class VisorAckCommand { adviseToConnect() else try { - executeAll(classOf[VisorAckTask], msg) + executeMulti(classOf[VisorAckTask], msg) } catch { case _: ClusterGroupEmptyException => scold("Topology is empty.") http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82733d41/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala index 1c15528..9b5fc21 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/gc/VisorGcCommand.scala @@ -18,15 +18,15 @@ package org.apache.ignite.visor.commands.gc import org.apache.ignite._ - import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterNode} -import org.apache.ignite.internal.visor.node.VisorNodeGcTask import org.apache.ignite.visor.VisorTag import org.apache.ignite.visor.commands.{VisorConsoleCommand, VisorTextTable} import org.apache.ignite.visor.visor._ import java.util.UUID +import org.apache.ignite.internal.visor.node.VisorNodeGcTask + import scala.collection.JavaConversions._ import scala.language.{implicitConversions, reflectiveCalls} import scala.util.control.Breaks._ @@ -143,7 +143,7 @@ class VisorGcCommand { val NULL: Void = null - executeRemotes(classOf[VisorNodeGcTask], NULL).foreach { + executeMulti(classOf[VisorNodeGcTask], NULL).foreach { case (nid, stat) => val roundHb = stat.get1() / (1024L * 1024L) val roundHa = stat.get2() / (1024L * 1024L) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82733d41/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala index 515864d..d7547c9 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/tasks/VisorTasksCommand.scala @@ -21,17 +21,17 @@ import org.apache.ignite._ import org.apache.ignite.events.EventType._ import org.apache.ignite.internal.util.typedef.X import org.apache.ignite.internal.util.{IgniteUtils => U} -import org.apache.ignite.internal.visor.event.{VisorGridEvent, VisorGridJobEvent, VisorGridTaskEvent} -import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask -import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg import org.apache.ignite.lang.IgniteUuid - -import java.util.UUID - import org.apache.ignite.visor.VisorTag import org.apache.ignite.visor.commands.{VisorConsoleCommand, VisorTextTable} import org.apache.ignite.visor.visor._ +import java.util.UUID + +import org.apache.ignite.internal.visor.event.{VisorGridEvent, VisorGridJobEvent, VisorGridTaskEvent} +import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask +import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg + import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.util.control.Breaks._ @@ -613,7 +613,7 @@ class VisorTasksCommand { private def list(p: Long, taskName: String, reverse: Boolean, all: Boolean) { breakable { try { - val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + val evts = executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createTasksArg(p, taskName, null)) val (tLst, eLst) = mkData(evts) @@ -817,7 +817,7 @@ class VisorTasksCommand { try { val prj = ignite.cluster.forRemotes() - val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + val evts = executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createTasksArg(null, taskName, null)) val (tLst, eLst) = mkData(evts) @@ -988,7 +988,7 @@ class VisorTasksCommand { } try { - val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + val evts = executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createTasksArg(null, null, uuid)) val (tLst, eLst) = mkData(evts) @@ -1100,7 +1100,7 @@ class VisorTasksCommand { private def nodes(f: Long) { breakable { try { - val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + val evts = executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null)) val eLst = mkData(evts)._2 @@ -1210,7 +1210,7 @@ class VisorTasksCommand { private def hosts(f: Long) { breakable { try { - val evts = executeRemotes(classOf[VisorNodeEventsCollectorTask], + val evts = executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createTasksArg(f, null, null)) val eLst = mkData(evts)._2 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/82733d41/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 431701a..321d02e 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -19,7 +19,7 @@ package org.apache.ignite.visor import org.apache.ignite.IgniteSystemProperties._ import org.apache.ignite._ -import org.apache.ignite.cluster.{ClusterGroupEmptyException, ClusterGroup, ClusterMetrics, ClusterNode} +import org.apache.ignite.cluster.{ClusterGroup, ClusterGroupEmptyException, ClusterMetrics, ClusterNode} import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.events.EventType._ import org.apache.ignite.events.{DiscoveryEvent, Event} @@ -29,19 +29,16 @@ import org.apache.ignite.internal.IgniteNodeAttributes._ import org.apache.ignite.internal.IgniteVersionUtils._ import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException import org.apache.ignite.internal.util.lang.{GridFunc => F} +import org.apache.ignite.internal.util.spring.IgniteSpringHelper import org.apache.ignite.internal.util.typedef._ import org.apache.ignite.internal.util.{GridConfigurationFinder, IgniteUtils => U} -import org.apache.ignite.logger.NullLogger -import org.apache.ignite.internal.visor.{VisorMultiNodeTask, VisorTaskArgument} -import org.apache.ignite.internal.visor.cache._ -import org.apache.ignite.internal.visor.node._ -import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg -import org.apache.ignite.internal.visor.util.VisorTaskUtils._ import org.apache.ignite.lang._ +import org.apache.ignite.logger.NullLogger import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi import org.apache.ignite.thread.IgniteThreadPoolExecutor import org.apache.ignite.visor.commands.VisorConsole.consoleReader import org.apache.ignite.visor.commands.{VisorConsoleCommand, VisorTextTable} + import org.jetbrains.annotations.Nullable import java.io._ @@ -50,11 +47,17 @@ import java.text._ import java.util.concurrent._ import java.util.{Collection => JavaCollection, HashSet => JavaHashSet, _} +import org.apache.ignite.internal.visor.cache._ +import org.apache.ignite.internal.visor.node.VisorNodeEventsCollectorTask.VisorNodeEventsCollectorTaskArg +import org.apache.ignite.internal.visor.node._ +import org.apache.ignite.internal.visor.util.VisorEventMapper +import org.apache.ignite.internal.visor.util.VisorTaskUtils._ +import org.apache.ignite.internal.visor.{VisorMultiNodeTask, VisorTaskArgument} + import scala.collection.JavaConversions._ import scala.collection.immutable import scala.language.{implicitConversions, reflectiveCalls} import scala.util.control.Breaks._ -import org.apache.ignite.internal.util.spring.IgniteSpringHelper /** * Holder for command help information. @@ -1848,24 +1851,60 @@ object visor extends VisorTag { ignite.compute(grp).withNoFailover().execute(task, toTaskArgument(grp.nodes().map(_.id()), arg)) } + /** + * Execute task on node. + * + * @param nid Node id. + * @param task Task class + * @param arg Task argument. + * @tparam A Task argument type. + * @tparam R Task result type + * @tparam J Job class. + * @return Task result. + */ def executeOne[A, R, J](nid: UUID, task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = execute(ignite.cluster.forNodeId(nid), task, arg) + /** + * Execute task on random node. + * + * @param task Task class + * @param arg Task argument. + * @tparam A Task argument type. + * @tparam R Task result type + * @tparam J Job class. + * @return Task result. + */ + def executeRandom[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + execute(ignite.cluster.forRandom(), task, arg) + + /** + * Execute task on specified nodes. + * + * @param nids Node ids. + * @param task Task class + * @param arg Task argument. + * @tparam A Task argument type. + * @tparam R Task result type + * @tparam J Job class. + * @return Task result. + */ def executeMulti[A, R, J](nids: Iterable[UUID], task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = execute(ignite.cluster.forNodeIds(nids), task, arg) - def executeLocal[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = - execute(ignite.cluster.forLocal(), task, arg) - - def executeRemotes[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = + /** + * Execute task on all nodes. + * + * @param task Task class + * @param arg Task argument. + * @tparam A Task argument type. + * @tparam R Task result type + * @tparam J Job class. + * @return Task result. + */ + def executeMulti[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = execute(ignite.cluster.forRemotes(), task, arg) - def executeAll[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = - execute(ignite.cluster, task, arg) - - def executeRandom[A, R, J](task: Class[_ <: VisorMultiNodeTask[A, R, J]], arg: A) = - execute(ignite.cluster.forRandom(), task, arg) - /** * Gets configuration from specified node. * @@ -2393,6 +2432,12 @@ object visor extends VisorTag { println(": Log stopped: " + logFile.getAbsolutePath) } + /** Unique Visor key to get events last order. */ + final val EVT_LAST_ORDER_KEY = UUID.randomUUID().toString + + /** Unique Visor key to get events throttle counter. */ + final val EVT_THROTTLE_CNTR_KEY = UUID.randomUUID().toString + /** * Starts logging. If logging is already started - no-op. * @@ -2477,16 +2522,14 @@ object visor extends VisorTag { ) override def run() { - val g = ignite - - if (g != null) { + if (ignite != null) { try { // Discovery events collected only locally. - val loc = executeLocal(classOf[VisorNodeEventsCollectorTask], - VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS ++ EVTS_DISCOVERY)).toSeq + val loc = collectEvents(ignite, EVT_LAST_ORDER_KEY, EVT_THROTTLE_CNTR_KEY, + LOG_EVTS ++ EVTS_DISCOVERY, new VisorEventMapper).toSeq val evts = if (!rmtLogDisabled) - loc ++ executeRemotes(classOf[VisorNodeEventsCollectorTask], + loc ++ executeMulti(classOf[VisorNodeEventsCollectorTask], VisorNodeEventsCollectorTaskArg.createLogArg(key, LOG_EVTS)).toSeq else loc