storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [27/30] storm git commit: added method comments
Date Tue, 15 Mar 2016 17:45:00 GMT
added method comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7b354287
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7b354287
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7b354287

Branch: refs/heads/master
Commit: 7b354287227de358cc357ac45c18ac2b1a679202
Parents: 5bd5bd7
Author: 卫乐 <weiyue.wy@taobao.com>
Authored: Wed Mar 9 13:05:36 2016 +0800
Committer: 卫乐 <weiyue.wy@taobao.com>
Committed: Wed Mar 9 13:05:36 2016 +0800

----------------------------------------------------------------------
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 313 ++++++++++++++-----
 1 file changed, 231 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7b354287/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 7650ab1..aa1b234 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -144,9 +144,10 @@ public class StatsUtil {
     }
 
     /**
-     * Aggregates number acked and complete latencies across all streams.
+     * aggregate number acked and complete latencies across all streams.
      */
-    public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double>
id2compAvg, Map<String, Long> id2numAcked) {
+    public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double>
id2compAvg,
+                                                          Map<String, Long> id2numAcked)
{
         Map<String, Number> ret = new HashMap<>();
         putKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked));
         putKV(ret, ACKED, sumValues(id2numAcked));
@@ -155,15 +156,17 @@ public class StatsUtil {
     }
 
     /**
-     * Aggregates number executed and process & execute latencies.
+     * aggregate number executed and process & execute latencies.
      */
-    public static Map aggBoltStreamsLatAndCount(Map id2execAvg, Map id2procAvg, Map id2numExec)
{
-        Map ret = new HashMap();
+    public static <K> Map<K, Map> aggBoltStreamsLatAndCount(Map<K, Double>
id2execAvg,
+                                                            Map<K, Double> id2procAvg,
+                                                            Map<K, Long> id2numExec)
{
+        Map<K, Map> ret = new HashMap<>();
         if (id2execAvg == null || id2procAvg == null || id2numExec == null) {
             return ret;
         }
-        for (Object k : id2execAvg.keySet()) {
-            Map subMap = new HashMap();
+        for (K k : id2execAvg.keySet()) {
+            Map<String, Object> subMap = new HashMap<>();
             putKV(subMap, EXEC_LAT_TOTAL, weightAvg(id2execAvg, id2numExec, k));
             putKV(subMap, PROC_LAT_TOTAL, weightAvg(id2procAvg, id2numExec, k));
             putKV(subMap, EXECUTED, id2numExec.get(k));
@@ -175,12 +178,13 @@ public class StatsUtil {
     /**
      * Aggregates number acked and complete latencies.
      */
-    public static Map aggSpoutStreamsLatAndCount(Map id2compAvg, Map id2acked) {
-        Map ret = new HashMap();
+    public static <K> Map<K, Map> aggSpoutStreamsLatAndCount(Map<K, Double>
id2compAvg,
+                                                             Map<K, Long> id2acked)
{
+        Map<K, Map> ret = new HashMap<>();
         if (id2compAvg == null || id2acked == null) {
             return ret;
         }
-        for (Object k : id2compAvg.keySet()) {
+        for (K k : id2compAvg.keySet()) {
             Map subMap = new HashMap();
             putKV(subMap, COMP_LAT_TOTAL, weightAvg(id2compAvg, id2acked, k));
             putKV(subMap, ACKED, id2acked.get(k));
@@ -189,17 +193,29 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggPreMergeCompPageBolt(Map<String, Object> m, String window,
boolean includeSys) {
-        Map ret = new HashMap();
-        putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
-        putKV(ret, HOST, getByKey(m, HOST));
-        putKV(ret, PORT, getByKey(m, PORT));
-        putKV(ret, UPTIME, getByKey(m, UPTIME));
+    /**
+     * pre-merge component page bolt stats from an executor heartbeat
+     * 1. computes component capacity
+     * 2. converts map keys of stats
+     * 3. filters streams if necessary
+     *
+     * @param beat       executor heartbeat data
+     * @param window     specified window
+     * @param includeSys whether to include system streams
+     * @return per-merged stats
+     */
+    public static Map<String, Object> aggPreMergeCompPageBolt(Map<String, Object>
beat, String window, boolean includeSys) {
+        Map<String, Object> ret = new HashMap<>();
+
+        putKV(ret, EXECUTOR_ID, getByKey(beat, "exec-id"));
+        putKV(ret, HOST, getByKey(beat, HOST));
+        putKV(ret, PORT, getByKey(beat, PORT));
+        putKV(ret, UPTIME, getByKey(beat, UPTIME));
         putKV(ret, NUM_EXECUTORS, 1);
-        putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS));
+        putKV(ret, NUM_TASKS, getByKey(beat, NUM_TASKS));
 
-        Map stat2win2sid2num = getMapByKey(m, STATS);
-        putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue()));
+        Map stat2win2sid2num = getMapByKey(beat, STATS);
+        putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(beat, UPTIME).intValue()));
 
         // calc cid+sid->input_stats
         Map inputStats = new HashMap();
@@ -236,16 +252,27 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object>
m, String window, boolean includeSys) {
+    /**
+     * pre-merge component page spout stats from an executor heartbeat
+     * 1. computes component capacity
+     * 2. converts map keys of stats
+     * 3. filters streams if necessary
+     *
+     * @param beat       executor heartbeat data
+     * @param window     specified window
+     * @param includeSys whether to include system streams
+     * @return per-merged stats
+     */
+    public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object>
beat, String window, boolean includeSys) {
         Map<String, Object> ret = new HashMap<>();
-        putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
-        putKV(ret, HOST, getByKey(m, HOST));
-        putKV(ret, PORT, getByKey(m, PORT));
-        putKV(ret, UPTIME, getByKey(m, UPTIME));
+        putKV(ret, EXECUTOR_ID, getByKey(beat, "exec-id"));
+        putKV(ret, HOST, getByKey(beat, HOST));
+        putKV(ret, PORT, getByKey(beat, PORT));
+        putKV(ret, UPTIME, getByKey(beat, UPTIME));
         putKV(ret, NUM_EXECUTORS, 1);
-        putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS));
+        putKV(ret, NUM_TASKS, getByKey(beat, NUM_TASKS));
 
-        Map stat2win2sid2num = getMapByKey(m, STATS);
+        Map stat2win2sid2num = getMapByKey(beat, STATS);
 
         // calc sid->output-stats
         Map outputStats = new HashMap();
@@ -269,16 +296,24 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * pre-merge component stats of specified bolt id
+     *
+     * @param beat       executor heartbeat data
+     * @param window     specified window
+     * @param includeSys whether to include system streams
+     * @return { comp id -> comp-stats }
+     */
     public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageBolt(
-            Map<String, Object> m, String window, boolean includeSys) {
+            Map<String, Object> beat, String window, boolean includeSys) {
         Map<String, Object> ret = new HashMap<>();
 
         Map<String, Object> subRet = new HashMap<>();
         putKV(subRet, NUM_EXECUTORS, 1);
-        putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS));
+        putKV(subRet, NUM_TASKS, getByKey(beat, NUM_TASKS));
 
-        Map<String, Object> stat2win2sid2num = getMapByKey(m, STATS);
-        putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue()));
+        Map<String, Object> stat2win2sid2num = getMapByKey(beat, STATS);
+        putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(beat, UPTIME).intValue()));
 
         for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) {
             Map<String, Map<K, V>> stat = windowSetConverter(getMapByKey(stat2win2sid2num,
key), TO_STRING);
@@ -304,12 +339,12 @@ public class StatsUtil {
         subRet.putAll(aggBoltLatAndCount(
                 win2sid2execLat.get(window), win2sid2procLat.get(window), win2sid2exec.get(window)));
 
-        ret.put((String) getByKey(m, "comp-id"), subRet);
+        ret.put((String) getByKey(beat, "comp-id"), subRet);
         return ret;
     }
 
     /**
-     * returns { comp id -> comp-stats }
+     * pre-merge component stats of specified spout id and returns { comp id -> comp-stats
}
      */
     public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageSpout(
             Map<String, Object> m, String window, boolean includeSys) {
@@ -346,6 +381,13 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * merge accumulated bolt stats with pre-merged component stats
+     *
+     * @param accBoltStats accumulated bolt stats
+     * @param boltStats    pre-merged component stats
+     * @return merged stats
+     */
     public static Map<String, Object> mergeAggCompStatsCompPageBolt(
             Map<String, Object> accBoltStats, Map<String, Object> boltStats)
{
         Map<String, Object> ret = new HashMap<>();
@@ -395,6 +437,9 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * merge accumulated bolt stats with pre-merged component stats
+     */
     public static Map<String, Object> mergeAggCompStatsCompPageSpout(
             Map<String, Object> accSpoutStats, Map<String, Object> spoutStats)
{
         Map<String, Object> ret = new HashMap<>();
@@ -432,7 +477,15 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String,
Object> accBoltStats, Map<String, Object> boltStats) {
+    /**
+     * merge accumulated bolt stats with new bolt stats
+     *
+     * @param accBoltStats accumulated bolt stats
+     * @param boltStats    new input bolt stats
+     * @return merged bolt stats
+     */
+    public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String,
Object> accBoltStats,
+                                                                    Map<String, Object>
boltStats) {
         Map<String, Object> ret = new HashMap<>();
 
         Integer numExecutors = getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue();
@@ -459,7 +512,11 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String,
Object> accSpoutStats, Map<String, Object> spoutStats) {
+    /**
+     * merge accumulated bolt stats with new bolt stats
+     */
+    public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String,
Object> accSpoutStats,
+                                                                     Map<String, Object>
spoutStats) {
         Map<String, Object> ret = new HashMap<>();
 
         Integer numExecutors = getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue();
@@ -485,7 +542,7 @@ public class StatsUtil {
      * executor with the given map for the topology page.
      */
     public static Map<String, Object> aggTopoExecStats(
-            String window, boolean includeSys, Map<String, Object> accStats, Map<String,
Object> newData, String compType) {
+            String window, boolean includeSys, Map<String, Object> accStats, Map<String,
Object> beat, String compType) {
         Map<String, Object> ret = new HashMap<>();
 
         Set workerSet = (Set) accStats.get(WORKERS_SET);
@@ -501,12 +558,12 @@ public class StatsUtil {
         // component id -> stats
         Map<String, Object> cid2stats;
         if (isSpout) {
-            cid2stats = aggPreMergeTopoPageSpout(newData, window, includeSys);
+            cid2stats = aggPreMergeTopoPageSpout(beat, window, includeSys);
         } else {
-            cid2stats = aggPreMergeTopoPageBolt(newData, window, includeSys);
+            cid2stats = aggPreMergeTopoPageBolt(beat, window, includeSys);
         }
 
-        Map stats = getMapByKey(newData, STATS);
+        Map stats = getMapByKey(beat, STATS);
         Map w2compLatWgtAvg, w2acked;
         Map compLatStats = getMapByKey(stats, COMP_LATENCIES);
         if (isSpout) { // agg spout stats
@@ -524,7 +581,7 @@ public class StatsUtil {
             w2acked = aggregateCountStreams(getMapByKey(stats, ACKED));
         }
 
-        workerSet.add(Lists.newArrayList(getByKey(newData, HOST), getByKey(newData, PORT)));
+        workerSet.add(Lists.newArrayList(getByKey(beat, HOST), getByKey(beat, PORT)));
         putKV(ret, WORKERS_SET, workerSet);
         putKV(ret, BOLT_TO_STATS, bolt2stats);
         putKV(ret, SPOUT_TO_STATS, spout2stats);
@@ -543,23 +600,23 @@ public class StatsUtil {
         // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats comp-key) cid->statk->num)
         // (acc-stats comp-key) ==> bolt2stats/spout2stats
         if (isSpout) {
-            Set<String> keySet = new HashSet<>();
-            keySet.addAll(spout2stats.keySet());
-            keySet.addAll(cid2stats.keySet());
+            Set<String> spouts = new HashSet<>();
+            spouts.addAll(spout2stats.keySet());
+            spouts.addAll(cid2stats.keySet());
 
-            Map mm = new HashMap();
-            for (String k : keySet) {
-                mm.put(k, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(k), (Map)
cid2stats.get(k)));
+            Map<String, Object> mm = new HashMap<>();
+            for (String spout : spouts) {
+                mm.put(spout, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(spout),
(Map) cid2stats.get(spout)));
             }
             putKV(ret, SPOUT_TO_STATS, mm);
         } else {
-            Set<String> keySet = new HashSet<>();
-            keySet.addAll(bolt2stats.keySet());
-            keySet.addAll(cid2stats.keySet());
+            Set<String> bolts = new HashSet<>();
+            bolts.addAll(bolt2stats.keySet());
+            bolts.addAll(cid2stats.keySet());
 
-            Map mm = new HashMap();
-            for (String k : keySet) {
-                mm.put(k, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(k), (Map) cid2stats.get(k)));
+            Map<String, Object> mm = new HashMap<>();
+            for (String bolt : bolts) {
+                mm.put(bolt, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(bolt), (Map)
cid2stats.get(bolt)));
             }
             putKV(ret, BOLT_TO_STATS, mm);
         }
@@ -674,12 +731,13 @@ public class StatsUtil {
      *
      * @param statsSeq   a seq of ExecutorStats
      * @param includeSys whether to include system streams
-     * @return aggregated bolt stats
+     * @return aggregated bolt stats: {metric -> win -> global stream id -> value}
      */
     public static <T> Map<String, Map> aggregateBoltStats(List<ExecutorSummary>
statsSeq, boolean includeSys) {
         Map<String, Map> ret = new HashMap<>();
 
         Map<String, Map<String, Map<T, Long>>> commonStats = aggregateCommonStats(statsSeq);
+        // filter sys streams if necessary
         commonStats = preProcessStreamSummary(commonStats, includeSys);
 
         List<Map<String, Map<GlobalStreamId, Long>>> acked = new ArrayList<>();
@@ -710,13 +768,14 @@ public class StatsUtil {
      *
      * @param statsSeq   a seq of ExecutorStats
      * @param includeSys whether to include system streams
-     * @return aggregated spout stats
+     * @return aggregated spout stats: {metric -> win -> global stream id -> value}
      */
     public static Map<String, Map> aggregateSpoutStats(List<ExecutorSummary>
statsSeq, boolean includeSys) {
         // actually Map<String, Map<String, Map<String, Long/Double>>>
         Map<String, Map> ret = new HashMap<>();
 
         Map<String, Map<String, Map<String, Long>>> commonStats = aggregateCommonStats(statsSeq);
+        // filter sys streams if necessary
         commonStats = preProcessStreamSummary(commonStats, includeSys);
 
         List<Map<String, Map<String, Long>>> acked = new ArrayList<>();
@@ -736,6 +795,9 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats
+     */
     public static <T> Map<String, Map<String, Map<T, Long>>> aggregateCommonStats(List<ExecutorSummary>
statsSeq) {
         Map<String, Map<String, Map<T, Long>>> ret = new HashMap<>();
 
@@ -751,6 +813,9 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * filter system streams of aggregated spout/bolt stats if necessary
+     */
     public static <T> Map<String, Map<String, Map<T, Long>>> preProcessStreamSummary(
             Map<String, Map<String, Map<T, Long>>> streamSummary, boolean
includeSys) {
         Map<String, Map<T, Long>> emitted = getMapByKey(streamSummary, EMITTED);
@@ -762,6 +827,12 @@ public class StatsUtil {
         return streamSummary;
     }
 
+    /**
+     * aggregate count streams by window
+     *
+     * @param stats a Map of value: {win -> stream -> value}
+     * @return a Map of value: {win -> value}
+     */
     public static <K, V extends Number> Map<String, Long> aggregateCountStreams(
             Map<String, Map<K, V>> stats) {
         Map<String, Long> ret = new HashMap<>();
@@ -776,6 +847,14 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * compute an weighted average from a list of average maps and a corresponding count
maps
+     * extracted from a list of ExecutorSummary
+     *
+     * @param avgSeq   a list of {win -> global stream id -> avg value}
+     * @param countSeq a list of {win -> global stream id -> count value}
+     * @return a Map of {win -> global stream id -> weighted avg value}
+     */
     public static <K> Map<String, Map<K, Double>> aggregateAverages(List<Map<String,
Map<K, Double>>> avgSeq,
                                                                     List<Map<String,
Map<K, Long>>> countSeq) {
         Map<String, Map<K, Double>> ret = new HashMap<>();
@@ -796,8 +875,15 @@ public class StatsUtil {
         return ret;
     }
 
-    public static <K> Map<String, Double> aggregateAvgStreams(
-            Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>>
counts) {
+    /**
+     * aggregate weighted average of all streams
+     *
+     * @param avgs   a Map of {win -> stream -> average value}
+     * @param counts a Map of {win -> stream -> count value}
+     * @return a Map of {win -> aggregated value}
+     */
+    public static <K> Map<String, Double> aggregateAvgStreams(Map<String,
Map<K, Double>> avgs,
+                                                              Map<String, Map<K, Long>>
counts) {
         Map<String, Double> ret = new HashMap<>();
 
         Map<String, Map<K, List>> expands = expandAverages(avgs, counts);
@@ -818,14 +904,21 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * aggregates spout stream stats, returns a Map of {metric -> win -> aggregated
value}
+     */
     public static Map<String, Map> spoutStreamsStats(List<ExecutorSummary> summs,
boolean includeSys) {
         if (summs == null) {
             return new HashMap<>();
         }
+        // filter ExecutorSummary's with empty stats
         List<ExecutorSummary> statsSeq = getFilledStats(summs);
         return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, includeSys));
     }
 
+    /**
+     * aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated
value}
+     */
     public static Map<String, Map> boltStreamsStats(List<ExecutorSummary> summs,
boolean includeSys) {
         if (summs == null) {
             return new HashMap<>();
@@ -834,6 +927,12 @@ public class StatsUtil {
         return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys));
     }
 
+    /**
+     * aggregate all spout streams
+     *
+     * @param stats a Map of {metric -> win -> stream id -> value}
+     * @return a Map of {metric -> win -> aggregated value}
+     */
     public static Map<String, Map> aggregateSpoutStreams(Map<String, Map> stats)
{
         // actual ret is Map<String, Map<String, Long/Double>>
         Map<String, Map> ret = new HashMap<>();
@@ -846,6 +945,12 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * aggregate all bolt streams
+     *
+     * @param stats a Map of {metric -> win -> stream id -> value}
+     * @return a Map of {metric -> win -> aggregated value}
+     */
     public static Map<String, Map> aggregateBoltStreams(Map<String, Map> stats)
{
         Map<String, Map> ret = new HashMap<>();
         putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED)));
@@ -861,7 +966,7 @@ public class StatsUtil {
     }
 
     /**
-     * A helper function that aggregates windowed stats from one spout executor.
+     * aggregate windowed stats from a bolt executor stats with a Map of accumulated stats
      */
     public static Map<String, Object> aggBoltExecWinStats(
             Map<String, Object> accStats, Map<String, Object> newStats, boolean
includeSys) {
@@ -905,7 +1010,7 @@ public class StatsUtil {
     }
 
     /**
-     * A helper function that aggregates windowed stats from one spout executor.
+     * aggregate windowed stats from a spout executor stats with a Map of accumulated stats
      */
     public static Map<String, Object> aggSpoutExecWinStats(
             Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys)
{
@@ -944,7 +1049,7 @@ public class StatsUtil {
 
 
     /**
-     * aggregate counts
+     * aggregate a list of count maps into one map
      *
      * @param countsSeq a seq of {win -> GlobalStreamId -> value}
      */
@@ -973,8 +1078,8 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map<String, Object> aggregateCompStats(String window, boolean includeSys,
-                                                         List<Map<String, Object>>
beats, String compType) {
+    public static Map<String, Object> aggregateCompStats(
+            String window, boolean includeSys, List<Map<String, Object>> beats,
String compType) {
         boolean isSpout = SPOUT.equals(compType);
 
         Map<String, Object> initVal = new HashMap<>();
@@ -998,6 +1103,7 @@ public class StatsUtil {
         }
         putKV(initVal, STATS, stats);
 
+        // iterate through all executor heartbeats
         for (Map<String, Object> beat : beats) {
             initVal = aggCompExecStats(window, includeSys, initVal, beat, compType);
         }
@@ -1029,14 +1135,14 @@ public class StatsUtil {
     }
 
     /**
-     * post aggregate component stats
+     * post aggregate component stats:
+     * 1. computes execute-latency/process-latency from execute/process latency total
+     * 2. computes windowed weight avgs
+     * 3. transform Map keys
      *
-     * @param task2component task -> component, note it's a clojure map
-     * @param exec2hostPort  executor -> host+port, note it's a clojure map
-     * @param compStats      accumulated comp stats
-     * @return
+     * @param compStats accumulated comp stats
      */
-    public static Map<String, Object> postAggregateCompStats(Map task2component, Map
exec2hostPort, Map<String, Object> compStats) {
+    public static Map<String, Object> postAggregateCompStats(Map<String, Object>
compStats) {
         Map<String, Object> ret = new HashMap<>();
 
         String compType = (String) compStats.get(TYPE);
@@ -1108,6 +1214,19 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * aggregate component executor stats
+     *
+     * @param exec2hostPort  a Map of {executor -> host+port}, note it's a clojure map
+     * @param task2component a Map of {task id -> component}, note it's a clojure map
+     * @param beats          a converted HashMap of executor heartbeats, {executor ->
heartbeat}
+     * @param window         specified window
+     * @param includeSys     whether to include system streams
+     * @param topologyId     topology id
+     * @param topology       storm topology
+     * @param componentId    component id
+     * @return ComponentPageInfo thrift structure
+     */
     public static ComponentPageInfo aggCompExecsStats(
             Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String,
Object>> beats,
             String window, boolean includeSys, String topologyId, StormTopology topology,
String componentId) {
@@ -1115,7 +1234,7 @@ public class StatsUtil {
         List<Map<String, Object>> beatList =
                 extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology,
componentId);
         Map<String, Object> compStats = aggregateCompStats(window, includeSys, beatList,
componentType(topology, componentId));
-        compStats = postAggregateCompStats(task2component, exec2hostPort, compStats);
+        compStats = postAggregateCompStats(compStats);
         return thriftifyCompPageData(topologyId, topology, componentId, compStats);
     }
 
@@ -1124,6 +1243,9 @@ public class StatsUtil {
     // convert thrift stats to java maps
     // =====================================================================================
 
+    /**
+     * convert thrift executor heartbeats into a java HashMap
+     */
     public static Map<List<Integer>, Map<String, Object>> convertExecutorBeats(Map<ExecutorInfo,
ExecutorBeat> beats) {
         Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
         for (Map.Entry<ExecutorInfo, ExecutorBeat> beat : beats.entrySet()) {
@@ -1150,6 +1272,12 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * convert a thrift worker heartbeat into a java HashMap
+     *
+     * @param workerHb
+     * @return
+     */
     public static Map<String, Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
{
         Map<String, Object> ret = new HashMap<>();
         if (workerHb != null) {
@@ -1224,6 +1352,15 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * extract a list of host port info for specified component
+     *
+     * @param exec2hostPort  {executor -> host+port}, note it's a clojure map
+     * @param task2component {task id -> component}, note it's a clojure map
+     * @param includeSys     whether to include system streams
+     * @param compId         component id
+     * @return a list of host+port
+     */
     public static List<Map<String, Object>> extractNodeInfosFromHbForComp(
             Map exec2hostPort, Map task2component, boolean includeSys, String compId) {
         List<Map<String, Object>> ret = new ArrayList<>();
@@ -1384,6 +1521,14 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * compute weighted avg from a Map of stats and given avg/count keys
+     *
+     * @param accData    a Map of {win -> key -> value}
+     * @param wgtAvgKey  weighted average key
+     * @param divisorKey count key
+     * @return a Map of {win -> weighted avg value}
+     */
     private static Map<String, Double> computeWeightedAveragesPerWindow(Map<String,
Object> accData,
                                                                         String wgtAvgKey,
String divisorKey) {
         Map<String, Double> ret = new HashMap<>();
@@ -1400,6 +1545,9 @@ public class StatsUtil {
     }
 
 
+    /**
+     * convert a list of clojure executors to a java Set of List<Integer>
+     */
     public static Set<List<Integer>> convertExecutors(Set executors) {
         Set<List<Integer>> convertedExecutors = new HashSet<>();
         for (Object executor : executors) {
@@ -1438,7 +1586,7 @@ public class StatsUtil {
         if (stats == null) {
             return 0.0;
         } else {
-            // Map<String, Map<String/GlobalStreamId, Long/Double>> {win ->
stream -> value}
+            // actual value of m is: Map<String, Map<String/GlobalStreamId, Long/Double>>
({win -> stream -> value})
             Map<String, Map> m = aggregateBoltStats(Lists.newArrayList(summary), true);
             // {metric -> win -> value} ==> {win -> metric -> value}
             m = swapMapOrder(aggregateBoltStreams(m));
@@ -1495,17 +1643,15 @@ public class StatsUtil {
         return sum;
     }
 
-    private static double sumStreamsDouble(Map m, String key) {
+    private static <K1, K2> double sumStreamsDouble(Map<K1, Map<K2, ?>>
m, String key) {
         double sum = 0;
         if (m == null) {
             return sum;
         }
-        for (Object v : m.values()) {
-            Map sub = (Map) v;
-            for (Object o : sub.entrySet()) {
-                Map.Entry e = (Map.Entry) o;
-                if (e.getKey().equals(key)) {
-                    sum += ((Number) e.getValue()).doubleValue();
+        for (Map<K2, ?> v : m.values()) {
+            for (Map.Entry<K2, ?> entry : v.entrySet()) {
+                if (entry.getKey().equals(key)) {
+                    sum += ((Number) entry.getValue()).doubleValue();
                 }
             }
         }
@@ -1607,7 +1753,7 @@ public class StatsUtil {
         return ret;
     }
 
-    private static <K> Map mergeWithSumLong(Map<K, Long> m1, Map<K, Long>
m2) {
+    private static <K> Map<K, Long> mergeWithSumLong(Map<K, Long> m1, Map<K,
Long> m2) {
         Map<K, Long> ret = new HashMap<>();
 
         Set<K> allKeys = new HashSet<>();
@@ -1626,7 +1772,7 @@ public class StatsUtil {
         return ret;
     }
 
-    private static <K> Map mergeWithSumDouble(Map<K, Double> m1, Map<K, Double>
m2) {
+    private static <K> Map<K, Double> mergeWithSumDouble(Map<K, Double>
m1, Map<K, Double> m2) {
         Map<K, Double> ret = new HashMap<>();
 
         Set<K> allKeys = new HashSet<>();
@@ -2042,14 +2188,12 @@ public class StatsUtil {
 
         for (Map.Entry<T, V1> entry : id2Avg.entrySet()) {
             T k = entry.getKey();
-            double v = entry.getValue().doubleValue();
-            long n = id2num.get(k).longValue();
-            ret += productOr0(v, n);
+            ret += productOr0(entry.getValue(), id2num.get(k));
         }
         return ret;
     }
 
-    private static double weightAvg(Map id2Avg, Map id2num, Object key) {
+    private static <K, V1 extends Number, V2 extends Number> double weightAvg(Map<K,
V1> id2Avg, Map<K, V2> id2num, K key) {
         if (id2Avg == null || id2num == null) {
             return 0.0;
         }
@@ -2087,7 +2231,7 @@ public class StatsUtil {
         return (Map) map.get(key);
     }
 
-    private static <T, V extends Number> long sumValues(Map<T, V> m) {
+    private static <K, V extends Number> long sumValues(Map<K, V> m) {
         long ret = 0L;
         if (m == null) {
             return ret;
@@ -2223,6 +2367,11 @@ public class StatsUtil {
         return stormClusterState.lastError(stormId, compId);
     }
 
+
+    // =====================================================================================
+    // key transformers
+    // =====================================================================================
+
     interface KeyTransformer<T> {
         T transform(Object key);
     }


Mime
View raw message