eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject incubator-eagle git commit: [MINOR] add sortSpec to pattern match if exists corresponding StreamPartition
Date Fri, 09 Dec 2016 06:22:27 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 02abe02c3 -> aa8d3c9b9


[MINOR] add sortSpec to pattern match if exists corresponding StreamPartition

Author: wujinhu <wujinhu920@126.com>

Closes #728 from wujinhu/EAGLE-793.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/aa8d3c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/aa8d3c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/aa8d3c9b

Branch: refs/heads/master
Commit: aa8d3c9b9a17ffa66c599fb88bd06e898328d431
Parents: 02abe02
Author: wujinhu <wujinhu920@126.com>
Authored: Fri Dec 9 14:22:23 2016 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Fri Dec 9 14:22:23 2016 +0800

----------------------------------------------------------------------
 .../interpreter/PolicyExecutionPlannerImpl.java | 13 ++++++-
 .../interpreter/PolicyInterpreterTest.java      | 38 ++++++++++++++++++++
 .../impl/ApplicationHealthCheckServiceImpl.java | 13 ++++---
 .../eagle/app/spi/ApplicationProvider.java      |  9 ++---
 .../src/main/resources/HealthCheckTemplate.vm   |  7 ++--
 .../queue/HadoopQueueRunningAppProvider.java    |  6 ++--
 .../MRHistoryJobApplicationProvider.java        |  4 +--
 .../history/SparkHistoryJobAppProvider.java     |  6 ++--
 .../topology/TopologyCheckAppProvider.java      |  6 ++--
 9 files changed, 78 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
index 1f46298..e30b3de 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -198,7 +198,18 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
                                 }
                                 for (Map.Entry<String, List<Variable>> entry
: streamGroupBy.entrySet()) {
                                     if (entry.getValue().size() > 0) {
-                                        retrievePartition(generatePartition(entry.getKey(),
null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()]))));
+                                        StreamPartition partition = generatePartition(entry.getKey(),
null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));
+                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN))
{
+                                            if (effectivePartitions.containsKey(partition.getStreamId()))
{
+                                                StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
+                                                if (!existingPartition.equals(partition)
+                                                        && existingPartition.getType().equals(partition.getType())
+                                                        && ListUtils.isEqualList(existingPartition.getColumns(),
partition.getColumns())) {
+                                                    partition.setSortSpec(existingPartition.getSortSpec());
+                                                }
+                                            }
+                                        }
+                                        retrievePartition(partition);
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
index f68a295..1553e17 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -445,4 +445,42 @@ public class PolicyInterpreterTest {
         Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
         Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
     }
+
+    @Test
+    public void testValidPolicyWithPatternSort() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        String policy =
+                "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp,
1 min) " +
+                        "select * group by site, host, component, metric insert into temp;\n"
+
+                "\n" +
+                "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]
-> b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and
b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
+                        "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\")
as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp)
as timestamp " +
+                        "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
+        definition.setValue(policy);
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition,
new HashMap<String, StreamDefinition>() {
+            {
+                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
+        Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
+        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
index cc8a8d7..7d8dcfd 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
@@ -44,7 +44,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
     private ApplicationHealthCheckPublisher applicationHealthCheckPublisher;
     private final Config config;
     private Environment environment;
-    private Map<String, HealthCheck> appHealthChecks = new HashMap<>();
+    private Map<String, Optional<HealthCheck>> appHealthChecks = new HashMap<>();
     private final Object lock = new Object();
     private int initialDelay = 10;
     private int period = 300;
@@ -118,12 +118,17 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
             return;
         }
         ApplicationProvider<?> appProvider = applicationProviderService.getApplicationProviderByType(appEntity.getDescriptor().getType());
-        HealthCheck applicationHealthCheck = appProvider.getAppHealthCheck(
+        Optional<HealthCheck> applicationHealthCheck = appProvider.getAppHealthCheck(
                         ConfigFactory.parseMap(appEntity.getContext())
                         .withFallback(config)
                         .withFallback(ConfigFactory.parseMap(appEntity.getConfiguration()))
         );
-        this.environment.healthChecks().register(appEntity.getAppId(), applicationHealthCheck);
+
+        if (!applicationHealthCheck.isPresent()) {
+            LOG.warn("application {} does not implement HealthCheck", appEntity.getAppId());
+            return;
+        }
+        this.environment.healthChecks().register(appEntity.getAppId(), applicationHealthCheck.get());
         currentInjector.injectMembers(applicationHealthCheck);
         synchronized (lock) {
             if (!appHealthChecks.containsKey(appEntity.getAppId())) {
@@ -165,7 +170,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
         Map<String, HealthCheck> copyAppHealthChecks = new HashMap<>();
         synchronized (lock) {
             for (String appId : appHealthChecks.keySet()) {
-                copyAppHealthChecks.put(appId, appHealthChecks.get(appId));
+                copyAppHealthChecks.put(appId, appHealthChecks.get(appId).get());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index 361b4c6..fbae411 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -72,12 +72,7 @@ public interface ApplicationProvider<T extends Application> {
      */
     void register(ModuleRegistry registry);
 
-    default HealthCheck getAppHealthCheck(Config config) {
-        return new HealthCheck() {
-            @Override
-            protected Result check() throws Exception {
-                return Result.healthy();
-            }
-        };
+    default Optional<HealthCheck> getAppHealthCheck(Config config) {
+        return Optional.empty();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
index bdc4339..5a92bb7 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/resources/HealthCheckTemplate.vm
@@ -23,10 +23,6 @@
 <body>
     #set ( $elem = $unHealthyContext )
 
-
-<p><b>Health Check: </b><a href=$elem["healthCheckUrl"]>$elem["healthCheckUrl"]</a></p>
-<p><b>Appliaction Management: </b><a href=$elem["appMgmtUrl"]>$elem["appMgmtUrl"]</a></p>
-
 <table border="1">
     <tr>
         <th><b>Application ID</b></th>
@@ -40,5 +36,8 @@
     #end
 </table>
 
+<p><b>Health Check: </b><a href=$elem["healthCheckUrl"]>$elem["healthCheckUrl"]</a></p>
+<p><b>Appliaction Management: </b><a href=$elem["appMgmtUrl"]>$elem["appMgmtUrl"]</a></p>
+
 </body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
index 5d4078c..090b3f3 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java
@@ -20,13 +20,15 @@ import com.codahale.metrics.health.HealthCheck;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider<HadoopQueueRunningApp>
{
     public HadoopQueueRunningApp getApplication() {
         return new HadoopQueueRunningApp();
     }
 
     @Override
-    public HealthCheck getAppHealthCheck(Config config) {
-        return new HadoopQueueRunningApplicationHealthCheck(config);
+    public Optional<HealthCheck> getAppHealthCheck(Config config) {
+        return Optional.of(new HadoopQueueRunningApplicationHealthCheck(config));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
index e7d97d4..89f20ec 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
@@ -35,7 +35,7 @@ public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider
     }
 
     @Override
-    public HealthCheck getAppHealthCheck(Config config) {
-        return new MRHistoryJobApplicationHealthCheck(config);
+    public Optional<HealthCheck> getAppHealthCheck(Config config) {
+        return Optional.of(new MRHistoryJobApplicationHealthCheck(config));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
index 2b962c9..366d8cb 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
@@ -21,6 +21,8 @@ import com.codahale.metrics.health.HealthCheck;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<SparkHistoryJobApp>
{
     @Override
     public SparkHistoryJobApp getApplication() {
@@ -28,7 +30,7 @@ public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<Spar
     }
 
     @Override
-    public HealthCheck getAppHealthCheck(Config config) {
-        return new SparkHistoryJobApplicationHealthCheck(config);
+    public Optional<HealthCheck> getAppHealthCheck(Config config) {
+        return Optional.of(new SparkHistoryJobApplicationHealthCheck(config));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/aa8d3c9b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
index 5766454..867c46a 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppProvider.java
@@ -22,6 +22,8 @@ import com.codahale.metrics.health.HealthCheck;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
 
+import java.util.Optional;
+
 public class TopologyCheckAppProvider extends AbstractApplicationProvider<TopologyCheckApp>
{
     @Override
     public TopologyCheckApp getApplication() {
@@ -29,7 +31,7 @@ public class TopologyCheckAppProvider extends AbstractApplicationProvider<Topolo
     }
 
     @Override
-    public HealthCheck getAppHealthCheck(Config config) {
-        return new TopologyCheckApplicationHealthCheck(config);
+    public Optional<HealthCheck> getAppHealthCheck(Config config) {
+        return Optional.of(new TopologyCheckApplicationHealthCheck(config));
     }
 }


Mime
View raw message