apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From david...@apache.org
Subject [16/50] [abbrv] incubator-apex-core git commit: added writing custom aggregator
Date Fri, 04 Mar 2016 19:48:37 GMT
added writing custom aggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f64379e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f64379e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f64379e3

Branch: refs/heads/master
Commit: f64379e362fdef018e258a24c51c715b265d2b4c
Parents: 6b79095
Author: Chandni Singh <csingh@apache.org>
Authored: Wed Nov 4 19:24:40 2015 -0800
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Sun Feb 28 22:46:36 2016 -0800

----------------------------------------------------------------------
 autometrics/autometrics.md | 78 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f64379e3/autometrics/autometrics.md
----------------------------------------------------------------------
diff --git a/autometrics/autometrics.md b/autometrics/autometrics.md
index 364007f..f449082 100644
--- a/autometrics/autometrics.md
+++ b/autometrics/autometrics.md
@@ -5,7 +5,7 @@ Apache Apex: AutoMetric in a Nutshell
 Metrics help to collect some statistical information about a process which can be very useful
for diagnosis. Auto Metrics in Apex help to monitor operators in a DAG. The goal of `AutoMetric`
API is to enable operator developer to define relevant metrics for an operator in a simple
way which the platform collects and reports automatically.
 
 # Specifying AutoMetrics in an Operator
-An `AutoMetric` can be any object. It can be of a primitive type - int, long, etc. or a complex
one. A field or a `get` method in an operator can be annotated with `@AutoMetric` to specify
that its value is a metric. After every application window, the platform collects the values
of these fields/methods in a map and sends it to application master.
+An `AutoMetric` can be any object. It can be of a primitive type - int, long, etc. or a complex
one. A field or a `get` method in an operator can be annotated with `@AutoMetric` to specify
that its value is a metric. After every application end window, the platform collects the
values of these fields/methods in a map and sends it to application master.
 
 ```java
 public class LineReceiver extends BaseOperator
@@ -35,10 +35,10 @@ public class LineReceiver extends BaseOperator
 }
 ```
 
-In the above snippet, there are 2 auto-metrics declared in the `LineReceiver`. At the end
of each application window, the platform will send a map with 2 entries - `[(length, 100),
(count, 10)]` to the application master.
+There are 2 auto-metrics declared in the `LineReceiver`. At the end of each application window,
the platform will send a map with 2 entries - `[(length, 100), (count, 10)]` to the application
master.
 
 # Aggregating AutoMetrics across Partitions
-When an operator is partitioned, it is useful to aggregate the values of auto-metrics across
all its partitions every window to get a logical view of these metrics. 
+When an operator is partitioned, it is useful to aggregate the values of auto-metrics across
all its partitions every window to get a logical view of these metrics. The application master
performs these aggregations using metrics aggregators.
 
 The AutoMetric API helps to achieve this by providing an interface for writing aggregators-
`AutoMetric.Aggregator`. Any implementation of `AutoMetric.Aggregator` can be set as an operator
attribute - `METRICS_AGGREGATOR` for a particular operator which in turn is used for aggregating
physical metrics. 
 
@@ -47,12 +47,80 @@ The AutoMetric API helps to achieve this by providing an interface for
writing a
 
 `MetricsAggregator` is just a collection of `SingleMetricAggregator`s. There are multiple
implementations of `SingleMetricAggregator` that perform sum, min, max, avg which are present
in Apex core and Apex malhar.
 
-For the `LineReceiver` operator, the application developer need not specify any aggregator.
The platform will automatically inject an instance of `MetricsAggregator` that constitutes
of two `LongSumAggregator`s - one for `length` and one for `count`. This aggregator will report
sum of length and sum of count across all the partitions of `LineReceiver`.
+For the `LineReceiver` operator, the application developer need not specify any aggregator.
The platform will automatically inject an instance of `MetricsAggregator` that contains two
`LongSumAggregator`s - one for `length` and one for `count`. This aggregator will report sum
of length and sum of count across all the partitions of `LineReceiver`.
 
 
 ## Building custom aggregators
-Platform cannot perform any meaningful aggregation for a non-numeric metric. In such case,
the operator or application developer can write custom aggregators. Let’s say, if the `LineReceiver`
was modified to 
+Platform cannot perform any meaningful aggregations for non-numeric metrics. In such cases,
the operator or application developer can write custom aggregators. Let’s say, if the `LineReceiver`
was modified to have a complex metric as shown below.
 
+```java
+public class AnotherLineReceiver extends BaseOperator
+{
+  @AutoMetric
+  final LineMetrics lineMetrics = new LineMetrics();
+
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String s)
+    {
+      lineMetrics.length += s.length();
+      lineMetrics.count++;
+    }
+  };
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    lineMetrics.length = 0;
+    lineMetrics.count = 0;
+  }
+
+  public static class LineMetrics implements Serializable
+  {
+    int length;
+    int count;
+
+    private static final long serialVersionUID = 201511041908L;
+  }
+}
+```
+
+Below is a custom aggregator that can calculate average line length across all partitions
of `AnotherLineReceiver`.
+
+```java
+public class AvgLineLengthAggregator implements AutoMetric.Aggregator
+{
+
+  Map<String, Object> result = Maps.newHashMap();
+
+  @Override
+  public Map<String, Object> aggregate(long l, Collection<AutoMetric.PhysicalMetricsContext>
collection)
+  {
+    long totalLength = 0;
+    long totalCount = 0;
+    for (AutoMetric.PhysicalMetricsContext pmc : collection) {
+      AnotherLineReceiver.LineMetrics lm = (AnotherLineReceiver.LineMetrics)pmc.getMetrics().get("lineMetrics");
+      totalLength += lm.length;
+      totalCount += lm.count;
+    }
+    result.put("avgLineLength", totalLength/totalCount);
+    return result;
+  }
+}
+```
+An instance of above aggregator can be specified as the `METRIC_AGGREGATOR` for `AnotherLineReceiver`
while creating the DAG as shown below.
+
+```
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    ...
+    AnotherLineReceiver lineReceiver = dag.addOperator("LineReceiver", new AnotherLineReceiver());
+    dag.setAttribute(lineReceiver, Context.OperatorContext.METRICS_AGGREGATOR, new AvgLineLengthAggregator());
+    ...
+  }
+```
 
 # Retrieving AutoMetrics
 The Gateway REST API provides a way to retrieve the latest AutoMetrics for each logical operator.
 For example:


Mime
View raw message