ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [02/36] incubator-ignite git commit: # IGNITE-274 Reworked events collecting.
Date Wed, 18 Feb 2015 10:06:30 GMT
# IGNITE-274 Reworked events collecting.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/677e643b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/677e643b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/677e643b

Branch: refs/heads/ignite-283
Commit: 677e643b217ecbc92c081711a688fde087e42949
Parents: 659b432
Author: AKuznetsov <akuznetsov@gridgain.com>
Authored: Tue Feb 17 14:24:28 2015 +0700
Committer: AKuznetsov <akuznetsov@gridgain.com>
Committed: Tue Feb 17 14:24:28 2015 +0700

----------------------------------------------------------------------
 .../visor/node/VisorNodeDataCollectorJob.java   | 50 +++++++++++---
 .../internal/visor/util/VisorEventMapper.java   | 73 ++++++++++++++++++++
 .../internal/visor/util/VisorTaskUtils.java     | 60 ++++++++--------
 3 files changed, 142 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index 2bedf10..5cb9039 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -55,8 +55,26 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         super(arg, debug);
     }
 
-    /** Collect events. */
-    private void events(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg
arg) {
+    /**
+     * Collect events.
+     *
+     * @param res Job result.
+     * @param evtOrderKey Unique key to take last order key from node local map.
+     * @param evtThrottleCntrKey  Unique key to take throttle count from node local map.
+     * @param all If {@code true} then collect all events otherwise collect only non task
events.
+     */
+    protected void events0(VisorNodeDataCollectorJobResult res, String evtOrderKey, String
evtThrottleCntrKey,
+        final boolean all) {
+        res.events().addAll(collectEvents(ignite, evtOrderKey, evtThrottleCntrKey, all));
+    }
+
+    /**
+     * Collect events.
+     *
+     * @param res Job result.
+     * @param arg Task argument.
+     */
+    protected void events(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg
arg) {
         try {
             // Visor events explicitly enabled in configuration.
             if (checkExplicitTaskMonitoring(ignite))
@@ -86,16 +104,20 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
                 }
             }
 
-            res.events().addAll(collectEvents(ignite, arg.eventsOrderKey(), arg.eventsThrottleCounterKey(),
-                arg.taskMonitoringEnabled()));
+            events0(res, arg.eventsOrderKey(), arg.eventsThrottleCounterKey(), arg.taskMonitoringEnabled());
         }
         catch (Throwable eventsEx) {
             res.eventsEx(eventsEx);
         }
     }
 
-    /** Collect caches. */
-    private void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg
arg) {
+    /**
+     * Collect caches.
+     *
+     * @param res Job result.
+     * @param arg Task argument.
+     */
+    protected void caches(VisorNodeDataCollectorJobResult res, VisorNodeDataCollectorTaskArg
arg) {
         try {
             IgniteConfiguration cfg = ignite.configuration();
 
@@ -120,8 +142,12 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         }
     }
 
-    /** Collect IGFS. */
-    private void igfs(VisorNodeDataCollectorJobResult res) {
+    /**
+     * Collect IGFSs.
+     *
+     * @param res Job result.
+     */
+    protected void igfs(VisorNodeDataCollectorJobResult res) {
         try {
             IgfsProcessorAdapter igfsProc = ((IgniteKernal)ignite).context().igfs();
 
@@ -151,8 +177,12 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
         }
     }
 
-    /** Collect streamers. */
-    private void streamers(VisorNodeDataCollectorJobResult res) {
+    /**
+     * Collect streamers.
+     *
+     * @param res Job result.
+     */
+    protected void streamers(VisorNodeDataCollectorJobResult res) {
         try {
             StreamerConfiguration[] cfgs = ignite.configuration().getStreamerConfiguration();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
new file mode 100644
index 0000000..574d4ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorEventMapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.util;
+
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.visor.event.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Mapper from grid event to Visor data transfer object.
+ */
+public class VisorEventMapper implements IgniteClosure<Event, VisorGridEvent> {
+    /**
+     * Map grid event to Visor data transfer object.
+     *
+     * @param evt Grid event.
+     * @param type Event's type.
+     * @param id Event id.
+     * @param name Event name.
+     * @param nid Event node ID.
+     * @param ts Event timestamp.
+     * @param msg Event message.
+     * @param shortDisplay Shortened version of {@code toString()} result.
+     * @return Visor data transfer object for event.
+     */
+    protected VisorGridEvent map(Event evt, int type, IgniteUuid id, String name, UUID nid,
long ts, String msg,
+        String shortDisplay) {
+        if (evt instanceof TaskEvent) {
+            TaskEvent te = (TaskEvent)evt;
+
+            return new VisorGridTaskEvent(type, id, name, nid, ts, msg, shortDisplay,
+                te.taskName(), te.taskClassName(), te.taskSessionId(), te.internal());
+        }
+
+        if (evt instanceof JobEvent) {
+            JobEvent je = (JobEvent)evt;
+
+            return new VisorGridJobEvent(type, id, name, nid, ts, msg, shortDisplay,
+                je.taskName(), je.taskClassName(), je.taskSessionId(), je.jobId());
+        }
+
+        if (evt instanceof DeploymentEvent) {
+            DeploymentEvent de = (DeploymentEvent)evt;
+
+            return new VisorGridDeploymentEvent(type, id, name, nid, ts, msg, shortDisplay,
de.alias());
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public VisorGridEvent apply(Event evt) {
+        return map(evt, evt.type(), evt.id(), evt.name(), evt.node().id(), evt.timestamp(),
evt.message(),
+            evt.shortDisplay());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/677e643b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 728b569..4be371c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -83,13 +83,13 @@ public class VisorTaskUtils {
     };
 
     /** Only non task event types that Visor should collect. */
-    private static final int[] VISOR_NON_TASK_EVTS = {
+    public static final int[] VISOR_NON_TASK_EVTS = {
         EVT_CLASS_DEPLOY_FAILED,
         EVT_TASK_DEPLOY_FAILED
     };
 
     /** Only non task event types that Visor should collect. */
-    private static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, VISOR_NON_TASK_EVTS);
+    public static final int[] VISOR_ALL_EVTS = concat(VISOR_TASK_EVTS, VISOR_NON_TASK_EVTS);
 
     /** Maximum folder depth. I.e. if depth is 4 we look in starting folder and 3 levels
of sub-folders. */
     public static final int MAX_FOLDER_DEPTH = 4;
@@ -321,13 +321,16 @@ public class VisorTaskUtils {
         return true;
     }
 
-    /** */
-    private static final Comparator<Event> EVENTS_ORDER_COMPARATOR = new Comparator<Event>()
{
+    /** Events comparator by event local order. */
+    private static final Comparator<Event> EVTS_ORDER_COMPARATOR = new Comparator<Event>()
{
         @Override public int compare(Event o1, Event o2) {
             return Long.compare(o1.localOrder(), o2.localOrder());
         }
     };
 
+    /** Mapper from grid event to Visor data transfer object. */
+    private static final VisorEventMapper EVT_MAPPER = new VisorEventMapper();
+
     /**
      * Grabs local events and detects if events was lost since last poll.
      *
@@ -339,7 +342,24 @@ public class VisorTaskUtils {
      */
     public static Collection<VisorGridEvent> collectEvents(Ignite ignite, String evtOrderKey,
String evtThrottleCntrKey,
         final boolean all) {
+        return collectEvents(ignite, evtOrderKey, evtThrottleCntrKey, all ? VISOR_ALL_EVTS
: VISOR_NON_TASK_EVTS,
+            EVT_MAPPER);
+    }
+
+    /**
+     * Grabs local events and detects if events was lost since last poll.
+     *
+     * @param ignite Target grid.
+     * @param evtOrderKey Unique key to take last order key from node local map.
+     * @param evtThrottleCntrKey  Unique key to take throttle count from node local map.
+     * @param evtTypes Event types to collect.
+     * @param evtMapper Closure to map grid events to Visor data transfer objects.
+     * @return Collections of node events
+     */
+    public static Collection<VisorGridEvent> collectEvents(Ignite ignite, String evtOrderKey,
String evtThrottleCntrKey,
+        final int[] evtTypes, IgniteClosure<Event, VisorGridEvent> evtMapper) {
         assert ignite != null;
+        assert evtTypes != null && evtTypes.length > 0;
 
         ClusterNodeLocalMap<String, Long> nl = ignite.cluster().nodeLocalMap();
 
@@ -361,8 +381,7 @@ public class VisorTaskUtils {
                     lastFound.set(true);
 
                 // Retains events by lastOrder, period and type.
-                return e.localOrder() > lastOrder && e.timestamp() > notOlderThan
&&
-                    (all ? F.contains(VISOR_ALL_EVTS, e.type()) : F.contains(VISOR_NON_TASK_EVTS,
e.type()));
+                return e.localOrder() > lastOrder && e.timestamp() > notOlderThan
&& F.contains(evtTypes, e.type());
             }
         };
 
@@ -370,7 +389,7 @@ public class VisorTaskUtils {
 
         // Update latest order in node local, if not empty.
         if (!evts.isEmpty()) {
-            Event maxEvt = Collections.max(evts, EVENTS_ORDER_COMPARATOR);
+            Event maxEvt = Collections.max(evts, EVTS_ORDER_COMPARATOR);
 
             nl.put(evtOrderKey, maxEvt.localOrder());
         }
@@ -387,31 +406,10 @@ public class VisorTaskUtils {
             res.add(new VisorGridEventsLost(ignite.cluster().localNode().id()));
 
         for (Event e : evts) {
-            int tid = e.type();
-            IgniteUuid id = e.id();
-            String name = e.name();
-            UUID nid = e.node().id();
-            long t = e.timestamp();
-            String msg = e.message();
-            String shortDisplay = e.shortDisplay();
-
-            if (e instanceof TaskEvent) {
-                TaskEvent te = (TaskEvent)e;
-
-                res.add(new VisorGridTaskEvent(tid, id, name, nid, t, msg, shortDisplay,
-                    te.taskName(), te.taskClassName(), te.taskSessionId(), te.internal()));
-            }
-            else if (e instanceof JobEvent) {
-                JobEvent je = (JobEvent)e;
-
-                res.add(new VisorGridJobEvent(tid, id, name, nid, t, msg, shortDisplay,
-                    je.taskName(), je.taskClassName(), je.taskSessionId(), je.jobId()));
-            }
-            else if (e instanceof DeploymentEvent) {
-                DeploymentEvent de = (DeploymentEvent)e;
+            VisorGridEvent visorEvt = evtMapper.apply(e);
 
-                res.add(new VisorGridDeploymentEvent(tid, id, name, nid, t, msg, shortDisplay,
de.alias()));
-            }
+            if (visorEvt != null)
+                res.add(visorEvt);
         }
 
         return res;


Mime
View raw message