apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-core git commit: APEXCORE-291 #resolve #comment using operator instance as default aggregator when it implements AutoMetric.Aggregator
Date Wed, 30 Dec 2015 16:34:10 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 56b55fe1d -> ecb96bad0


APEXCORE-291 #resolve #comment using operator instance as default aggregator when it implements
AutoMetric.Aggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/ecb96bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/ecb96bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/ecb96bad

Branch: refs/heads/devel-3
Commit: ecb96bad0aa2e5c959d7626ebdd93e0fb265c424
Parents: 56b55fe
Author: Chandni Singh <csingh@apache.org>
Authored: Tue Dec 22 00:45:22 2015 -0800
Committer: Chandni Singh <csingh@apache.org>
Committed: Tue Dec 29 23:40:22 2015 -0800

----------------------------------------------------------------------
 engine/pom.xml                                  |  2 +-
 .../stram/appdata/AppDataPushAgent.java         |  3 +-
 .../stram/plan/logical/LogicalPlan.java         | 38 +--------
 .../plan/logical/MetricAggregatorMeta.java      | 89 ++++++++++++++++++++
 .../stram/engine/AutoMetricTest.java            | 65 +++++++++++++-
 5 files changed, 156 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 34593ce..6bf8283 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>4406</maxAllowedViolations>
+          <maxAllowedViolations>4400</maxAllowedViolations>
         </configuration>
       </plugin>
       <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
index 9389e3c..b47dc41 100644
--- a/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/appdata/AppDataPushAgent.java
@@ -47,6 +47,7 @@ import com.datatorrent.stram.StreamingContainerManager;
 import com.datatorrent.stram.WebsocketAppDataPusher;
 import com.datatorrent.stram.api.AppDataPusher;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.MetricAggregatorMeta;
 import com.datatorrent.stram.webapp.LogicalOperatorInfo;
 
 /**
@@ -253,7 +254,7 @@ public class AppDataPushAgent extends AbstractService
       result.put("appName", dnmgr.getApplicationAttributes().get(DAGContext.APPLICATION_NAME));
       result.put("logicalOperatorName", operatorMeta.getName());
 
-      LogicalPlan.MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta();
+      MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta();
       JSONArray valueSchemas = new JSONArray();
       for (Map.Entry<String, Object> entry : aggregates.entrySet()) {
         String metricName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 377fa6d..347e94f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -839,6 +839,9 @@ public class LogicalPlan implements Serializable, DAG
     protected void populateAggregatorMeta()
     {
       AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR);
+      if (aggregator == null && operator instanceof AutoMetric.Aggregator) {
+        aggregator = new MetricAggregatorMeta.MetricsAggregatorProxy(this);
+      }
       if (aggregator == null) {
         MetricsAggregator defAggregator = null;
         Set<String> metricNames = Sets.newHashSet();
@@ -2015,39 +2018,4 @@ public class LogicalPlan implements Serializable, DAG
     return result;
   }
 
-  public final class MetricAggregatorMeta implements Serializable
-  {
-    private final AutoMetric.Aggregator aggregator;
-    private final AutoMetric.DimensionsScheme dimensionsScheme;
-
-    protected MetricAggregatorMeta(AutoMetric.Aggregator aggregator,
-                                   AutoMetric.DimensionsScheme dimensionsScheme)
-    {
-      this.aggregator = aggregator;
-      this.dimensionsScheme = dimensionsScheme;
-    }
-
-    public AutoMetric.Aggregator getAggregator()
-    {
-      return this.aggregator;
-    }
-
-    public String[] getDimensionAggregatorsFor(String logicalMetricName)
-    {
-      if (dimensionsScheme == null) {
-        return null;
-      }
-      return dimensionsScheme.getDimensionAggregationsFor(logicalMetricName);
-    }
-
-    public String[] getTimeBuckets()
-    {
-      if (dimensionsScheme == null) {
-        return null;
-      }
-      return dimensionsScheme.getTimeBuckets();
-    }
-
-    private static final long serialVersionUID = 201604271719L;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java
b/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java
new file mode 100644
index 0000000..65bc2a4
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/MetricAggregatorMeta.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.AutoMetric;
+
+/**
+ * A class that encapsulates {@link AutoMetric.Aggregator} and {@link AutoMetric.DimensionsScheme}
of a particular
+ * operator.
+ */
+public final class MetricAggregatorMeta implements Serializable
+{
+  private final AutoMetric.Aggregator aggregator;
+  private final AutoMetric.DimensionsScheme dimensionsScheme;
+
+  protected MetricAggregatorMeta(AutoMetric.Aggregator aggregator, AutoMetric.DimensionsScheme
dimensionsScheme)
+  {
+    this.aggregator = aggregator;
+    this.dimensionsScheme = dimensionsScheme;
+  }
+
+  public AutoMetric.Aggregator getAggregator()
+  {
+    return this.aggregator;
+  }
+
+  public String[] getDimensionAggregatorsFor(String logicalMetricName)
+  {
+    if (dimensionsScheme == null) {
+      return null;
+    }
+    return dimensionsScheme.getDimensionAggregationsFor(logicalMetricName);
+  }
+
+  public String[] getTimeBuckets()
+  {
+    if (dimensionsScheme == null) {
+      return null;
+    }
+    return dimensionsScheme.getTimeBuckets();
+  }
+
+  private static final long serialVersionUID = 201604271719L;
+
+  /**
+   * Serves as a proxy for Aggregator when operator itself implements {@link AutoMetric.Aggregator}.
+   */
+  static final class MetricsAggregatorProxy implements AutoMetric.Aggregator, Serializable
+  {
+    private final LogicalPlan.OperatorMeta om;
+
+    MetricsAggregatorProxy(@NotNull LogicalPlan.OperatorMeta om)
+    {
+      this.om = Preconditions.checkNotNull(om);
+    }
+
+    @Override
+    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext>
physicalMetrics)
+    {
+      return ((AutoMetric.Aggregator)om.getOperator()).aggregate(windowId, physicalMetrics);
+    }
+
+    private static final long serialVersionUID = 201512221830L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/ecb96bad/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
index 28e2e51..a76e6e0 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java
@@ -25,7 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.hadoop.conf.Configuration;
+import javax.validation.constraints.NotNull;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -34,17 +35,21 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Stats.OperatorStats;
-
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.common.partitioner.StatelessPartitioner;
-import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
@@ -307,6 +312,33 @@ public class AutoMetricTest
     Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());
   }
 
+  @Test
+  public void testDefaultMetricsAggregator() throws Exception
+  {
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
+
+    TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    OperatorAndAggregator o1 = dag.addOperator("o1", new OperatorAndAggregator(latch));
+
+    dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
+
+    dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
+
+    lpc.prepareDAG(dag, null, "AutoMetricTest");
+
+    LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1");
+    Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());
+
+    lpc.prepareDAG(dag, null, "AutoMetricTest");
+    StramLocalCluster lc = new StramLocalCluster(dag);
+    lc.runAsync();
+    latch.await();
+    Assert.assertEquals("progress", 1, o1.result.get("progress"));
+    lc.shutdown();
+  }
+
   private static class MockAggregator implements AutoMetric.Aggregator, Serializable
   {
     long cachedSum = -1;
@@ -361,6 +393,31 @@ public class AutoMetricTest
     }
   }
 
+  public static class OperatorAndAggregator extends OperatorWithMetrics implements AutoMetric.Aggregator
+  {
+    Map<String, Object> result = Maps.newHashMap();
+
+    private final transient CountDownLatch latch;
+
+    private OperatorAndAggregator()
+    {
+      latch = null;
+    }
+
+    OperatorAndAggregator(@NotNull CountDownLatch latch)
+    {
+      this.latch = Preconditions.checkNotNull(latch);
+    }
+
+    @Override
+    public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext>
physicalMetrics)
+    {
+      result.put("progress", physicalMetrics.iterator().next().getMetrics().get("progress"));
+      latch.countDown();
+      return result;
+    }
+  }
+
   @Test
   public void testMetricsAnnotatedMethod() throws Exception
   {


Mime
View raw message