kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aw...@apache.org
Subject [2/3] kudu git commit: parse_metrics_log: update to the new format
Date Tue, 26 Jun 2018 01:21:29 GMT
parse_metrics_log: update to the new format

The new diagnostics logs report more than just metrics, and thus, output
a bit differently than they did in simpler times. This patch updates the
parsing to be compatible with the new logs.

The changes this patch addresses:
- each metrics log line had a different number of fields
- the metrics log will only emit metrics from entities that have
  changed; as such, the processing of metrics has been changed; where we
  previously filled in NaNs for missing data, we now pull from the
  previous snapshot, or fill in 0 if one doesn't exist

I tested this manually by running against some metrics logs I collected
across a couple of workloads. The results seem to match what was
expected.

Change-Id: If11b7ecc93a3f64db3b7c1994f47308b3ec44029
Reviewed-on: http://gerrit.cloudera.org:8080/10693
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5162a6ce
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5162a6ce
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5162a6ce

Branch: refs/heads/master
Commit: 5162a6ce2e0b951e4fc91a6dae44b738ca75d27c
Parents: b52c343
Author: Andrew Wong <awong@cloudera.com>
Authored: Wed Jun 6 15:19:32 2018 -0700
Committer: Andrew Wong <awong@cloudera.com>
Committed: Tue Jun 26 00:42:30 2018 +0000

----------------------------------------------------------------------
 src/kudu/scripts/parse_metrics_log.py | 193 +++++++++++++++++++----------
 1 file changed, 125 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5162a6ce/src/kudu/scripts/parse_metrics_log.py
----------------------------------------------------------------------
diff --git a/src/kudu/scripts/parse_metrics_log.py b/src/kudu/scripts/parse_metrics_log.py
index 75406ad..f618010 100644
--- a/src/kudu/scripts/parse_metrics_log.py
+++ b/src/kudu/scripts/parse_metrics_log.py
@@ -26,10 +26,8 @@ of metrics described below are just a starting point to work from.
 Uncomment the ones you are interested in, or add new ones.
 """
 
-from collections import Counter
+from collections import defaultdict
 import gzip
-import heapq
-import itertools
 try:
   import simplejson as json
 except:
@@ -89,88 +87,111 @@ PARSE_METRIC_KEYS.add("server.block_cache_hits_caching")
 PARSE_METRIC_KEYS.add("server.block_cache_misses_caching")
 
 NaN = float('nan')
-UNKNOWN_PERCENTILES = dict(p50=NaN, p95=NaN, p99=NaN, p999=NaN, max=NaN)
+UNKNOWN_PERCENTILES = dict(p50=0, p95=0, p99=0, p999=0, max=0)
 
-def merge_delta(m, delta):
-  """
-  Update (in-place) the metrics entry 'm' by merging another entry 'delta'.
 
-  Counts and sums are simply added.
-  Histograms require more complex processing: the 'values' array needs to be
-  merged and the then the delta's counts added to the corresponding buckets.
+def strip_metric(m):
+  """
+  Strip the input metric string, returning only the values and counts for the
+  metric, as appropriate.
   """
+  if 'value' in m:
+    return m['value']
 
-  for k, v in delta.iteritems():
-    if k in ('name', 'values', 'counts', 'min', 'max', 'mean'):
-      continue
-    m[k] += v
-
-  # Merge counts.
-  if 'counts' in delta:
-    m_zip = itertools.izip(m.get('values', []), m.get('counts', []))
-    d_zip = itertools.izip(delta.get('values', []), delta.get('counts', []))
-    new_values = []
-    new_counts = []
-    i = 0
-    for value, counts in itertools.groupby(heapq.merge(m_zip, d_zip), lambda x: x[0]):
-      new_values[i] = value
-      new_counts[i] = sum(c for v, c in counts)
-      i += 1
-    m['counts'] = new_counts
-    m['values'] = new_values
+  if 'counts' in m:
+    return dict(zip(m.get('values', []), m.get('counts')))
+  return NaN
 
 
 def json_to_map(j):
   """
-  Parse the JSON structure in the log into a python dictionary
-  keyed by <entity>.<metric name>.
+  Parse the JSON structure in the log into a python dictionary of the form:
+    { <entity>.<metric name>:String => { <entity id>:String => <values>
} }
 
-  The entity ID is currently ignored. If there is more than one
-  entity of a given type (eg tablets), the metrics will be summed
-  together using 'merge_delta' above.
+  Only returns the metrics listed in PARSE_METRIC_KEYS.
   """
-  ret = {}
+  ret = defaultdict(dict)
   for entity in j:
     for m in entity['metrics']:
-      key = entity['type'] + "." + m['name']
-      if key not in PARSE_METRIC_KEYS:
+      entity_id = entity['id']
+      metric_key = entity['type'] + "." + m['name']
+      if metric_key not in PARSE_METRIC_KEYS:
         continue
-      if key in ret:
-        merge_delta(ret[key], m)
-      else:
-        ret[key] = m
+        # Add the metric_id to the metrics map.
+      ret[metric_key][entity_id] = strip_metric(m)
   return ret
 
 def delta(prev, cur, m):
   """ Compute the delta in metric 'm' between two metric snapshots. """
   if m not in prev or m not in cur:
     return 0
-  return cur[m]['value'] - prev[m]['value']
 
-def histogram_stats(prev, cur, m):
+  return cur[m] - prev[m]
+
+def aggregate_metrics(metric_to_eid_to_vals):
+  """ Aggregates metrics across entity ids """
+  ret = {}
+  for metric_name in metric_to_eid_to_vals:
+    if metric_name == "ts":
+      ret['ts'] = metric_to_eid_to_vals['ts']
+      continue
+
+    eid_to_vals = metric_to_eid_to_vals[metric_name]
+    # Iterate through all the entities for this metric.
+    for eid in eid_to_vals:
+      # Aggregate the bucket counts for histogram metrics.
+      vals = eid_to_vals[eid]
+      if isinstance(vals, dict):
+        if metric_name not in ret:
+          # Insert the counts if none exist for the metric.
+          ret[metric_name] = vals.copy()
+        else:
+          # Otherwise, add the counts to what's there.
+          for val, count in vals.iteritems():
+            if val in ret[metric_name]:
+              ret[metric_name][val] += count
+            else:
+              ret[metric_name][val] = count
+      else:
+        # Sum the values for normal metrics.
+        if metric_name in ret:
+          ret[metric_name] += vals
+        else:
+          ret[metric_name] = vals
+  return ret
+
+def histogram_stats(aggregated_prev, aggregated_cur, m):
   """
   Compute percentile stats for the metric 'm' in the window between two
   metric snapshots.
   """
-  if m not in prev or m not in cur or 'values' not in cur[m]:
+  if m not in aggregated_prev or m not in aggregated_cur or not isinstance(aggregated_cur,
dict):
     return UNKNOWN_PERCENTILES
-  prev = prev[m]
-  cur = cur[m]
-
-  p_dict = dict(zip(prev.get('values', []),
-                    prev.get('counts', [])))
-  c_zip = zip(cur.get('values', []),
-              cur.get('counts', []))
-  delta_total = cur['total_count'] - prev['total_count']
+
+  prev = aggregated_prev[m]
+  cur = aggregated_cur[m]
+
+  # Determine the total count we should expect between the current and previous
+  # snapshots.
+  delta_total = sum([val for _, val in cur.iteritems()]) - \
+      sum([val for _, val in prev.iteritems()])
+
   if delta_total == 0:
     return UNKNOWN_PERCENTILES
   res = dict()
   cum_count = 0
-  for cur_val, cur_count in c_zip:
-    prev_count = p_dict.get(cur_val, 0)
+
+
+  # Iterate over all of the buckets for the current and previous snapshots,
+  # summing them up, and assigning percentiles to the bucket as appropriate.
+  for cur_val, cur_count in sorted(aggregated_cur[m].iteritems()):
+    prev_count = prev.get(cur_val, 0)
     delta_count = cur_count - prev_count
     cum_count += delta_count
     percentile = float(cum_count) / delta_total
+
+    # Determine which percentiles this bucket belongs to.
+    percentile = float(cum_count) / delta_total
     if 'p50' not in res and percentile > 0.50:
       res['p50'] = cur_val
     if 'p95' not in res and percentile > 0.95:
@@ -179,44 +200,51 @@ def histogram_stats(prev, cur, m):
       res['p99'] = cur_val
     if 'p999' not in res and percentile > 0.999:
       res['p999'] = cur_val
-    if cum_count == delta_total and delta_count != 0:
+    if cum_count == delta_total:
       res['max'] = cur_val
   return res
 
-def cache_hit_ratio(prev, cur):
+def cache_hit_ratio(aggregated_prev, aggregated_cur):
   """
   Calculate the cache hit ratio between the two samples.
   If there were no cache hits or misses, this returns NaN.
   """
-  delta_hits = delta(prev, cur, 'server.block_cache_hits_caching')
-  delta_misses = delta(prev, cur, 'server.block_cache_misses_caching')
+  delta_hits = delta(aggregated_prev, aggregated_cur, 'server.block_cache_hits_caching')
+  delta_misses = delta(aggregated_prev, aggregated_cur, 'server.block_cache_misses_caching')
   if delta_hits + delta_misses > 0:
     cache_ratio = float(delta_hits) / (delta_hits + delta_misses)
   else:
     cache_ratio = NaN
   return cache_ratio
 
-def process(prev, cur):
+def process(aggregated_prev, aggregated_cur):
   """ Process a pair of metric snapshots, outputting a line of TSV. """
-  delta_ts = cur['ts'] - prev['ts']
-  cache_ratio = cache_hit_ratio(prev, cur)
+  if not aggregated_prev:
+    aggregated_prev = aggregate_metrics(prev)
+
+  delta_ts = aggregated_cur['ts'] - aggregated_prev['ts']
   calc_vals = []
+  cache_ratio = cache_hit_ratio(aggregated_prev, aggregated_cur)
   for metric, _ in SIMPLE_METRICS:
-    if metric in cur:
-      calc_vals.append(cur[metric]['value'])
+    if metric in aggregated_cur:
+      calc_vals.append(aggregated_cur[metric])
     else:
-      calc_vals.append(NaN)
-  calc_vals.extend(delta(prev, cur, metric)/delta_ts for (metric, _) in RATE_METRICS)
+      calc_vals.append(0)
+
+  calc_vals.extend((delta(aggregated_prev, aggregated_cur, metric))/delta_ts \
+      for metric, _ in RATE_METRICS)
   for metric, _ in HISTOGRAM_METRICS:
-    stats = histogram_stats(prev, cur, metric)
+    stats = histogram_stats(aggregated_prev, aggregated_cur, metric)
     calc_vals.extend([stats['p50'], stats['p95'], stats['p99'], stats['p999'], stats['max']])
 
-  print (cur['ts'] + prev['ts'])/2, \
+  print (aggregated_cur['ts'] + aggregated_prev['ts'])/2, \
         cache_ratio, \
         " ".join(str(x) for x in calc_vals)
+  return aggregated_cur
 
 def main(argv):
   prev_data = None
+  aggregated_prev = None
 
   simple_headers = [header for _, header in SIMPLE_METRICS + RATE_METRICS]
   for _, header in HISTOGRAM_METRICS:
@@ -234,7 +262,13 @@ def main(argv):
     else:
       f = file(path)
     for line_number, line in enumerate(f, start=1):
-      (_, ts, metrics_json) = line.split(" ", 2)
+      # Only parse out the "metrics" lines.
+      try:
+        (_, _, log_type, ts, metrics_json) = line.split(" ")
+      except ValueError:
+        continue
+      if log_type != "metrics":
+        continue
       ts = float(ts) / 1000000.0
       prev_ts = prev_data['ts'] if prev_data else 0
       # Enforce that the samples come in time-sorted order.
@@ -243,11 +277,34 @@ def main(argv):
                         % (ts, prev_ts, path, line_number))
       if prev_data and ts < prev_ts + GRANULARITY_SECS:
         continue
+
+      # Parse the metrics json into a map of the form:
+      #   { metric key => { entity id => metric value } }
       data = json_to_map(json.loads(metrics_json))
       data['ts'] = ts
       if prev_data:
-        process(prev_data, data)
+        # Copy missing metrics from prev_data.
+        for m, prev_eid_to_vals in prev_data.iteritems():
+          if m is 'ts':
+            continue
+          # The metric was missing entirely; copy it over.
+          if m not in data:
+            data[m] = prev_eid_to_vals
+          else:
+            # If the metric was missing for a specific entity, copy the metric
+            # from the previous snapshot.
+            for eid, prev_vals in prev_eid_to_vals.iteritems():
+              if eid not in data[m]:
+                data[m][eid] = prev_vals
+
+      aggregated_cur = aggregate_metrics(data)
+      if prev_data:
+        if not aggregated_prev:
+          aggregated_prev = aggregate_metrics(prev_data)
+        process(aggregated_prev, aggregated_cur)
+
       prev_data = data
+      aggregated_prev = aggregated_cur
 
 if __name__ == "__main__":
   main(sys.argv)


Mime
View raw message