eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [2/2] incubator-eagle git commit: [EAGLE-585] : AlertEngine: create metadata validation api
Date Sun, 02 Oct 2016 16:23:53 GMT
[EAGLE-585] : AlertEngine: create metadata validation api

Author: ralphsu

This closes #470


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4f348e3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4f348e3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4f348e3d

Branch: refs/heads/master
Commit: 4f348e3dc40fbf992a672acbd534f1f7a16b6d02
Parents: f6fad2e
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Mon Oct 3 00:07:12 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Mon Oct 3 00:23:45 2016 +0800

----------------------------------------------------------------------
 .../eagle-alert/alert-coordinator/pom.xml       |    8 +
 .../eagle/alert/coordinator/Coordinator.java    |   11 +-
 .../eagle/alert/coordinator/ValidateState.java  |   90 +
 .../coordinator/impl/MetadataValdiator.java     |  220 ++
 .../provider/ScheduleContextBuilder.java        |   14 +-
 .../resource/CoordinatorResource.java           |    8 +
 .../coordinator/TestMetadataValidator.java      |   41 +
 .../test/resources/validation/datasources.json  |   86 +
 .../src/test/resources/validation/policies.json |  572 +++++
 .../test/resources/validation/publishments.json |  113 +
 .../resources/validation/streamdefinitions.json | 2218 ++++++++++++++++++
 .../test/resources/validation/topologies.json   |   42 +
 12 files changed, 3416 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
index 4589e63..772d602 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
@@ -77,6 +77,14 @@
             <groupId>io.swagger</groupId>
             <artifactId>swagger-jaxrs</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-core</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 563db58..dc3e3d6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -16,11 +16,15 @@
  */
 package org.apache.eagle.alert.coordinator;
 
+import com.google.common.base.Stopwatch;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.config.ConfigBusProducer;
 import org.apache.eagle.alert.config.ConfigValue;
 import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.impl.MetadataValdiator;
 import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger;
 import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
@@ -28,9 +32,6 @@ import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import com.google.common.base.Stopwatch;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -190,6 +191,10 @@ public class Coordinator {
         return currentState;
     }
 
+    public ValidateState validate() {
+        return new MetadataValdiator(client).validate();
+    }
+
     /**
      * shutdown background threads and release various resources.
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
new file mode 100644
index 0000000..9dc177b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ValidateState.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import java.util.*;
+
+/**
+ * Created on 10/1/16.
+ */
+public class ValidateState {
+
+    private boolean isOk = false;
+
+    private List<String> unusedDataSources = new ArrayList<>();
+    private List<String> unusedStreams = new ArrayList<>();
+    private List<String> unPublishedPolicies = new ArrayList<>();
+
+    /*
+     * Includes validation of extension class existence
+     * Policy expression validation
+     * Inter-Reference validation
+     */
+    private Map<String, List<String>> dataSourcesValidation = new HashMap<>();
+    private Map<String, List<String>> streamsValidation = new HashMap<>();
+    private Map<String, List<String>> policiesValidation = new HashMap<>();
+    private Map<String, List<String>> publishmentValidation = new HashMap<>();
+    private Map<String, List<String>> topoMetaValidation = new HashMap<>();
+
+    public void appendUnusedDatasource(String ds) {
+        unusedDataSources.add(ds);
+    }
+
+    public void appendUnusedStreams(String s) {
+        unusedStreams.add(s);
+    }
+
+    public void appendUnPublishedPolicies(String s) {
+        unPublishedPolicies.add(s);
+    }
+
+    public void appendDataSourceValidation(String name, String msg) {
+        if (!dataSourcesValidation.containsKey(name)) {
+            dataSourcesValidation.putIfAbsent(name, new LinkedList<>());
+        }
+        dataSourcesValidation.get(name).add(msg);
+    }
+
+    public void appendStreamValidation(String name, String msg) {
+        if (!streamsValidation.containsKey(name)) {
+            streamsValidation.putIfAbsent(name, new LinkedList<>());
+        }
+        streamsValidation.get(name).add(msg);
+    }
+
+    public void appendPolicyValidation(String name, String msg) {
+        if (!policiesValidation.containsKey(name)) {
+            policiesValidation.putIfAbsent(name, new LinkedList<>());
+        }
+        policiesValidation.get(name).add(msg);
+    }
+
+    public void appendPublishemtnValidation(String name, String msg) {
+        if (!publishmentValidation.containsKey(name)) {
+            publishmentValidation.putIfAbsent(name, new LinkedList<>());
+        }
+        publishmentValidation.get(name).add(msg);
+    }
+
+    public void appendTopoMetaValidation(String name, String msg) {
+        if (!topoMetaValidation.containsKey(name)) {
+            topoMetaValidation.putIfAbsent(name, new LinkedList<>());
+        }
+        topoMetaValidation.get(name).add(msg);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
new file mode 100644
index 0000000..5d7eeb1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MetadataValdiator.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.ValidateState;
+import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
+import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.SiddhiManager;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Created on 10/1/16.
+ */
+public class MetadataValdiator {
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataValdiator.class);
+
+    private static final Map<StreamColumn.Type, String> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
+
+    static {
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, "STRING");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, "INT");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, "LONG");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, "FLOAT");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, "DOUBLE");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, "BOOL");
+        _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, "OBJECT");
+    }
+
+    private IScheduleContext context;
+    private final ValidateState state;
+
+    public MetadataValdiator(IMetadataServiceClient client) {
+        List<Topology> topologies = client.listTopologies();
+        List<Kafka2TupleMetadata> datasources = client.listDataSources();
+        List<StreamDefinition> streams = client.listStreams();
+        // filter out disabled policies
+        List<PolicyDefinition> enabledPolicies = client.listPolicies();
+        List<Publishment> publishments = client.listPublishment();
+
+        context = new InMemScheduleConext(ScheduleContextBuilder.listToMap(topologies), new HashMap<>(),
+            ScheduleContextBuilder.listToMap(datasources),
+            ScheduleContextBuilder.listToMap(enabledPolicies),
+            ScheduleContextBuilder.listToMap(publishments),
+            ScheduleContextBuilder.listToMap(streams), new HashMap<>(), new HashMap<>());
+        this.state = new ValidateState();
+    }
+
+    public MetadataValdiator(IScheduleContext context) {
+        this.context = context;
+        this.state = new ValidateState();
+    }
+
+
+    public ValidateState validate() {
+
+        validateTopology();
+
+        validateDataSources();
+
+        validateStreams();
+
+        validatePolicies();
+
+        validatePublishments();
+
+        return state;
+    }
+
+    private void validatePolicies() {
+        Collection<Publishment> pubs = context.getPublishments().values();
+        for (PolicyDefinition pd : context.getPolicies().values()) {
+            if (!pubs.stream().anyMatch(p -> p.getPolicyIds().contains(pd.getName()))) {
+                state.appendUnPublishedPolicies(pd.getName());
+            }
+
+            boolean isStreamMiss = false;
+            StringBuilder builder = new StringBuilder();
+            for (String inputStream : pd.getInputStreams()) {
+                if (context.getStreamSchemas().get(inputStream) == null) {
+                    state.appendPublishemtnValidation(pd.getName(), String.format("policy %s contains unknown stream %s!", pd.getName(), inputStream));
+                    isStreamMiss = true;
+                    break;
+                }
+                builder.append(buildStreamDefinition(context.getStreamSchemas().get(inputStream)));
+                builder.append("\n");
+            }
+
+            if (isStreamMiss) {
+                continue;
+            }
+            builder.append(pd.getDefinition().getValue());
+
+            // now evaluate
+            try {
+                SiddhiManager sm = new SiddhiManager();
+                sm.createExecutionPlanRuntime(builder.toString());
+            } catch (Exception e) {
+                LOG.error(String.format("siddhi creation failed! %s ", builder.toString()), e);
+                state.appendPolicyValidation(pd.getName(), e.getMessage());
+            }
+        }
+    }
+
+    private String buildStreamDefinition(StreamDefinition streamDefinition) {
+        List<String> columns = new ArrayList<>();
+        if (streamDefinition.getColumns() != null) {
+            for (StreamColumn column : streamDefinition.getColumns()) {
+                columns.add(String.format("%s %s", column.getName(), _EAGLE_SIDDHI_TYPE_MAPPING.get(column.getType().toString().toLowerCase())));
+            }
+        } else {
+            LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId());
+        }
+        return String.format("define stream %s( %s );", streamDefinition.getStreamId(), StringUtils.join(columns, ","));
+    }
+
+
+    private void validatePublishments() {
+        Collection<PolicyDefinition> definitions = context.getPolicies().values();
+
+        for (Publishment p : context.getPublishments().values()) {
+            //TODO: check type; check serializer types; check dedup fields existence; check extend deduplicator...
+            Set<String> unknown = p.getPolicyIds().stream().filter(pid -> definitions.stream().anyMatch(pd -> pd.getName().equals(pid))).collect(Collectors.toSet());
+            if (unknown.size() > 0) {
+                state.appendPublishemtnValidation(p.getName(), String.format("publishment %s reference unknown/uneabled policy %s!", p.getName(), unknown));
+            }
+        }
+    }
+
+    private void validateStreams() {
+        Collection<Kafka2TupleMetadata> datasources = context.getDataSourceMetadata().values();
+        Collection<PolicyDefinition> definitions = context.getPolicies().values();
+        for (StreamDefinition sd : context.getStreamSchemas().values()) {
+            if (!datasources.stream().anyMatch(d -> d.getName().equals(sd.getDataSource()))) {
+                state.appendStreamValidation(sd.getStreamId(), String.format("stream %s reference unknown data source %s !", sd.getStreamId(), sd.getDataSource()));
+            }
+            if (!definitions.stream().anyMatch(p -> p.getInputStreams().contains(sd.getStreamId()))) {
+                state.appendUnusedStreams(sd.getStreamId());
+            }
+            // more on columns
+            if (sd.getColumns() == null || sd.getColumns().size() == 0) {
+                state.appendStreamValidation(sd.getStreamId(), String.format("stream %s have empty columns!", sd.getStreamId()));
+            }
+        }
+    }
+
+    private void validateDataSources() {
+        Collection<StreamDefinition> sds = context.getStreamSchemas().values();
+        for (Kafka2TupleMetadata ds : context.getDataSourceMetadata().values()) {
+            // simply do a O(^2) loop
+            if (!sds.stream().anyMatch(t -> t.getDataSource().equals(ds.getName()))) {
+                state.appendUnusedDatasource(ds.getName());
+            }
+
+            if (!"KAFKA".equalsIgnoreCase(ds.getType())) {
+                state.appendDataSourceValidation(ds.getName(), String.format(" unsupported data source type %s !", ds.getType()));
+            }
+
+            //scheme
+            //            String schemeCls = ds.getSchemeCls();
+            //            try {
+            //                Object scheme = Class.forName(schemeCls).getConstructor(String.class, Map.class).newInstance(ds.getTopic(), new HashMap<>());// coul only mock empty map
+            //                if (!(scheme instanceof MultiScheme || scheme instanceof Scheme)) {
+            //                    throw new IllegalArgumentException(" scheme class not subclass of Scheme or MultiScheme !");
+            //                }
+            //            } catch (Exception e) {
+            //                state.appendDataSourceValidation(ds.getName(), String.format("schemeCls %s expected to be qualified sub class name of %s or %s with given constructor signature!"
+            //                      +"Message: %s !",
+            //                    schemeCls, Scheme.class.getCanonicalName(), MultiScheme.class.getCanonicalName(), e.getMessage()));
+            //            }
+
+            // codec
+            if (ds.getCodec() == null) {
+                state.appendDataSourceValidation(ds.getName(), String.format("codec of datasource must *not* be null!"));
+                continue;
+            }
+            //            String selectCls = ds.getCodec().getStreamNameSelectorCls();
+            //            try {
+            //                StreamNameSelector cachedSelector = (StreamNameSelector) Class.forName(selectCls).getConstructor(Properties.class)
+            //                    .newInstance(ds.getCodec().getStreamNameSelectorProp());
+            //            } catch (Exception e) {
+            //                state.appendDataSourceValidation(ds.getName(), String.format("streamNameSelectorCls %s expected to be subclass of %s and with given constructor signature! Message: %s !",
+            //                    selectCls, StreamNameSelector.class.getCanonicalName(), e.getMessage()));
+            //            }
+
+        }
+    }
+
+    private void validateTopology() {
+        for (Topology t : context.getTopologies().values()) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index 193b98f..69225da 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.coordinator.provider;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
@@ -30,7 +31,6 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +59,7 @@ public class ScheduleContextBuilder {
     private Map<String, StreamDefinition> streamDefinitions;
     private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
     private Map<String, TopologyUsage> usages;
+    private IScheduleContext builtContext;
 
     public ScheduleContextBuilder(Config config) {
         this.config = config;
@@ -104,8 +105,13 @@ public class ScheduleContextBuilder {
         usages = buildTopologyUsage();
 
         // copy to shedule context now
-        return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
+        builtContext = new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
             streamDefinitions, monitoredStreamMap, usages);
+        return builtContext;
+    }
+
+    public IScheduleContext getBuiltContext() {
+        return builtContext;
     }
 
     /**
@@ -228,7 +234,7 @@ public class ScheduleContextBuilder {
         return result;
     }
 
-    private <T, K> Map<K, T> listToMap(List<T> collections) {
+    public static <T, K> Map<K, T> listToMap(List<T> collections) {
         Map<K, T> maps = new HashMap<K, T>(collections.size());
         for (T t : collections) {
             maps.put(getKey(t), t);
@@ -240,7 +246,7 @@ public class ScheduleContextBuilder {
      * One drawback, once we add class, this code need to be changed!
      */
     @SuppressWarnings("unchecked")
-    private <T, K> K getKey(T t) {
+    private static <T, K> K getKey(T t) {
         if (t instanceof Topology) {
             return (K) ((Topology) t).getName();
         } else if (t instanceof PolicyAssignment) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
index 273ab33..9d83c4a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
@@ -19,6 +19,7 @@ package org.apache.eagle.alert.coordinator.resource;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordinator.Coordinator;
 import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.coordinator.ValidateState;
 import org.apache.eagle.alert.utils.JsonUtils;
 
 import javax.ws.rs.GET;
@@ -55,6 +56,13 @@ public class CoordinatorResource {
     }
 
     @POST
+    @Path("/validate")
+    public String validate() throws Exception {
+        ValidateState state = alertCoordinator.validate();
+        return JsonUtils.writeValueAsString(state);
+    }
+
+    @POST
     @Path("/enablePeriodicForceBuild")
     public void enforcePeriodicallyBuild() {
         alertCoordinator.enforcePeriodicallyBuild();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
new file mode 100644
index 0000000..c9d3b5e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestMetadataValidator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.alert.coordinator.impl.MetadataValdiator;
+import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
+import org.junit.Test;
+
+/**
+ * Created on 10/2/16.
+ */
+public class TestMetadataValidator {
+
+    private static final ObjectMapper om = new ObjectMapper();
+
+    @Test
+    public void validate() throws Exception {
+        InMemScheduleConext context = new InMemScheduleConext();
+        MetadataValdiator mv = new MetadataValdiator(context);
+
+
+        // om.readValue(TestMetadataValidator.class.getResourceAsStream("/validation/datasources.json"), new Gene);
+        // TODO add more test here.
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/datasources.json
new file mode 100644
index 0000000..840cee6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/datasources.json
@@ -0,0 +1,86 @@
+[
+  {
+    "type":"KAFKA",
+    "name":"network_syslog_event_datasource",
+    "properties":{
+
+    },
+    "topic":"preprocess.network-syslog.events",
+    "schemeCls":"EventSignalScheme",
+    "codec":{
+      "activeStreamNames":[
+
+      ],
+      "streamNameSelectorProp":{
+        "userProvidedStreamName":"syslogEventStream",
+        "streamNameFormat":"%s"
+      },
+      "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn":"timestamp",
+      "timestampFormat":""
+    }
+  },
+  {
+    "type":"KAFKA",
+    "name":"network_NC_alert_datasource",
+    "properties":{
+
+    },
+    "topic":"preprocess.network-nervecenter.events",
+    "schemeCls":"EventSignalScheme",
+    "codec":{
+      "activeStreamNames":[
+
+      ],
+      "streamNameSelectorProp":{
+        "userProvidedStreamName":"ncAlertStream",
+        "streamNameFormat":"%s"
+      },
+      "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn":"timestamp",
+      "timestampFormat":""
+    }
+  },
+  {
+    "type":"KAFKA",
+    "name":"network_sherlock_alert_datasource",
+    "properties":{
+
+    },
+    "topic":"preprocess.network-sherlock.events",
+    "schemeCls":"EventSignalScheme",
+    "codec":{
+      "activeStreamNames":[
+
+      ],
+      "streamNameSelectorProp":{
+        "userProvidedStreamName":"sherlockAlertStream",
+        "streamNameFormat":"%s"
+      },
+      "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn":"timestamp",
+      "timestampFormat":""
+    }
+  },
+  {
+    "type":"KAFKA",
+    "name":"network_aggregate_alert_output_datasource",
+    "properties":{
+
+    },
+    "topic":"process.network.alerts",
+    "schemeCls":"EventSignalScheme",
+    "codec":{
+      "activeStreamNames":[
+
+      ],
+      "streamNameSelectorProp":{
+        "fieldNamesToInferStreamName":"streamName",
+        "streamNameFormat":"%s"
+      },
+      "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn":"timestamp",
+      "timestampFormat":""
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/policies.json
new file mode 100644
index 0000000..f7aa44e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/policies.json
@@ -0,0 +1,572 @@
+[
+  {
+    "name":"nerveCenter_port_down",
+    "description":"",
+    "inputStreams":[
+      "ncAlertStream"
+    ],
+    "outputStreams":[
+      "ncAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from ncAlertStream[status=='active'] select 'nerveCenterAlert' as name, 'network' as namespace, UUID() as docId, timestamp, str:concat(cause, ':', entity, ':', dims_component) as alertKey, '' as parentKey, dims_component, dims_pod, dims_bubble, dims_entity, dims_dc, severity, componentType, entityType, source, message, key, linkedSwitch, localPort, pod, 'nerveCenter' as alertSource, ip, linkedPort, bubble, state, entity, status, dc, 0l as dedupCount, 0l as dedupFirstOccurrence, 'ncAlertOutputStream' as streamName, monitor, cause insert into ncAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"ENABLED",
+    "partitionSpec":[
+      {
+        "streamId":"ncAlertStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"sherlock_metric_updown",
+    "description":"",
+    "inputStreams":[
+      "sherlockAlertStream"
+    ],
+    "outputStreams":[
+      "sherlockAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from sherlockAlertStream[status=='active'] select 'sherlockAlert' as name, 'network' as namespace, UUID() as docId, timestamp, str:concat(cause, ':', entity, ':', component) as alertKey, '' as parentKey, dims_device, dims_ifAlias, dims_bubble, dims_pod, dims_dc, source, currState, msg, prevState, type, kind, switchtype, debouncer_type, debouncer_value, current_value, current_execursion, threshold_execursion, operation, severity, componentType, linkedSwitch, localPort, pod, 'sherlockAlert' as alertSource, entityType, ip, component, linkedPort, bubble, state, entity, status,dc, 0l as dedupCount, 0l as dedupFirstOccurrence, 'sherlockAlertOutputStream' as streamName, cause, message insert into sherlockAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"ENABLED",
+    "partitionSpec":[
+      {
+        "streamId":"sherlockAlertStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"network_aggregate",
+    "description":"aggregate alerts into parent alerts",
+    "inputStreams":[
+      "syslogAlertOutputStream",
+      "ncAlertOutputStream",
+      "sherlockAlertOutputStream"
+    ],
+    "outputStreams":[
+      "aggregatedAlertStream"
+    ],
+    "definition":{
+      "type":"Custom",
+      "value":"",
+      "handlerClass":"org.apache.eagle.alert.engine.evaluator.aggregate.AlertAggregateHandler",
+      "properties":{
+        "aggregateKey":{
+          "syslogAlertOutputStream":{
+            "pattern":"%s",
+            "columns":[
+              "alertKey"
+            ]
+          },
+          "ncAlertOutputStream":{
+            "pattern":"%s",
+            "columns":[
+              "alertKey"
+            ]
+          },
+          "sherlockAlertOutputStream":{
+            "pattern":"%s",
+            "columns":[
+              "alertKey"
+            ]
+          }
+        },
+        "fixFields":{
+          "name":"networkAggregateAlert",
+          "namespace":"network",
+          "streamName":"aggregatedAlertStream",
+          "alertSource":"aggregateAlert"
+        }
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"ENABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogAlertOutputStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      },
+      {
+        "streamId":"ncAlertOutputStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      },
+      {
+        "streamId":"sherlockAlertOutputStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"network_correlate",
+    "description":"correlate alerts for circuit down",
+    "inputStreams":[
+      "aggregatedAlertStream"
+    ],
+    "outputStreams":[
+      "correlatedAlertStream"
+    ],
+    "definition":{
+      "type":"Custom",
+      "value":"",
+      "handlerClass":"org.apache.eagle.alert.engine.evaluator.aggregate.AlertAggregateHandler",
+      "properties":{
+        "aggregateKey":{
+          "aggregatedAlertStream":{
+            "pattern":"%s",
+            "columns":[
+              "linkedPort"
+            ]
+          }
+        },
+        "fixFields":{
+          "name":"networkCorrelateAlert",
+          "namespace":"network",
+          "streamName":"correlatedAlertStream",
+          "alertSource":"correlateAlert"
+        },
+        "filters":{
+          "aggregatedAlertStream":"cause == 'Port_Down'"
+        }
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"aggregatedAlertStream",
+        "type":"GROUPBY",
+        "columns":[
+          "linkedPort"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"network_symptomatic",
+    "description":"Correlate circuit down alerts into a symptomatic alert",
+    "inputStreams":[
+      "correlatedAlertStream"
+    ],
+    "outputStreams":[
+      "deviceDownAlertStream",
+      "symptomaticAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from correlatedAlertStream#window.time(10 min) select \"networkDeviceDownAlert\" as name, namespace, UUID() as docId, linkedSwitch as entity, pod, bubble, dc, severity, state, cause, message, firstOccurenceTime, timestamp, \"deviceDownOutputStream\" as streamName group by linkedSwitch having count() > 6 insert into deviceDownAlertStream; from correlatedAlertStream#window.time(10 min) as left join deviceDownAlertStream#window.time(10 min) as right on left.linkedSwitch = right.linkedSwitch select left.name, left.streamId, left.pod, left.docId, left.cause, left.firstOccurenceTime, left.bubble, left.state, left.timestamp, left.severity, left.componentType, left.linkedSwitch, left.alertSource, left.entityType, left.ip, left.alertKey, left.message, left.streamName, left.createBy, left.component, left.linkedPort, left.policyId, left.createTime, left.namespace, left.endTime, left.entity, left.dc, left.status, right.docId as parentKey insert into symptomaticAlertOutputStream;"
 ,
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"correlatedAlertStream",
+        "type":"GROUPBY",
+        "columns":[
+          "linkedSwitch"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"critical_temperature",
+    "description":"CRITICAL system overheating msg",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"The system is overheating\", msg))] select 'syslogAlert' as name, 'CRITICAL' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Temp_Status' as cause, str:concat('Temp_Status', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"critical_power_supply",
+    "description":"CRITICAL power supply severity msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"Power Supply is not responding\",msg))] select 'syslogAlert' as name, 'CRITICAL' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Power_Supply_Health' as cause, str:concat('Power_Supply_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"critical_memory",
+    "description":"CRITICAL memory inconsistency  msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"Memory inconsistency detected\", msg))] select 'syslogAlert' as name, 'CRITICAL' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Memory_Health' as cause, str:concat('Memory_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"critical_chassis",
+    "description":" chassis critical alerts  ",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[severity_name == \"ERR\" and (regex:find(\"CB color=YELLOW, class=CHASSIS, reason=Check\", msg))] select 'syslogAlert' as name, 'CRITICAL' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Chassis_Health' as cause, str:concat('Chassis_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"warning_disk",
+    "description":"warning temporary directory warning alert msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"System temporary directory usage is unexpectedly high\", msg) or regex: find(\"Re enabling dynamic learning on all interfaces\", msg))] select 'syslogAlert' as name, 'WARNING' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Disk_Status' as cause, str:concat('Disk_Status', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"warning_stmlearning",
+    "description":"warning reenabling learning warning alert msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex: find(\"Re enabling dynamic learning on all interfaces\", msg))] select 'syslogAlert' as name, 'WARNING' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'STM_Learning' as cause, str:concat('STM_Learning', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"warning_snmpd",
+    "description":"snmp daemon issue minor alert",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"Service \"snmpd\" (PID [0-9]+]) hasn't caught signal 6 (core will be saved)\", msg))] select 'syslogAlert' as name, 'WARNING' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'SNMP_Health' as cause, str:concat('SNMP_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"warning_temperature",
+    "description":"warning temperature  minor alarm msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(severity_name == \"EMERG\" or severity_name == \"ALERT\" or severity_name ==\"CRIT\") and (regex:find(\"System minor temperature alarm on module\", msg))] select 'syslogAlert' as name, 'WARNING' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Temp_Status' as cause, str:concat('Temp_Status', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"fatal_chassis",
+    "description":"fatal chassis alerts msgs",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[severity_name == \"ERR\" and (regex:find(\"Reporting self ping blackhole for\", msg))] select 'syslogAlert' as name, 'FATAL' as severity, namespace, facility_name, facility_code, severity_code,severity_name ,entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Chassis_Health' as cause, str:concat('Chassis_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  },
+  {
+    "name":"fatal_FRUs",
+    "description":"fatal alert on keywords 'FRU Offline', 'FRU power off', and 'FRU power on'",
+    "inputStreams":[
+      "syslogEventStream"
+    ],
+    "outputStreams":[
+      "syslogAlertOutputStream"
+    ],
+    "definition":{
+      "type":"siddhi",
+      "value":"from syslogEventStream[(regex:find(\"Fru Offline\", msg)) or (regex:find(\"FRU power off\", msg)) or (regex:find(\"FRU power on\", msg))] select 'syslogAlert' as name, 'FATAL' as severity, namespace, facility_name, facility_code, severity_code, severity_name , entity, dims_appname, msgid,  msg, timestamp, ip, status, component, componentType, linkedPort, linkedSwitch, localPort, message, pod, bubble,dc,entityType,productionLevel, 'syslogAlert' as name, 'network' as namespace, 'Module_Health' as cause, str:concat('Module_Health', ':', dims_hostname, ':', component) as alertKey, '' as parentKey, UUID() as docId, 'CRITICAL' as severity,  'open' as state,0l as dedupCount, 0l as dedupFirstOccurrence, 'syslogAlertOutputStream' as streamName insert into syslogAlertOutputStream;",
+      "properties":{
+
+      },
+      "inputStreams":[
+
+      ],
+      "outputStreams":[
+
+      ]
+    },
+    "policyStatus":"DISABLED",
+    "partitionSpec":[
+      {
+        "streamId":"syslogEventStream",
+        "type":"GROUPBY",
+        "columns":[
+          "entity"
+        ]
+      }
+    ],
+    "parallelismHint":0
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f348e3d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/publishments.json
new file mode 100644
index 0000000..4e2d48a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/validation/publishments.json
@@ -0,0 +1,113 @@
+[
+  {
+    "name":"network-syslog-publish",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "policyIds":[
+      "syslog_mem_inconsistent",
+      "critical_temperature",
+      "critical_power_supply",
+      "critical_memory",
+      "critical_chassis",
+      "warning_disk",
+      "warning_stmlearning",
+      "warning_snmpd",
+      "warning_temperature",
+      "fatal_chassis",
+      "fatal_FRUs",
+      "test_severity_notice_check"
+    ],
+    "dedupIntervalMin":"PT0M",
+    "overrideDeduplicator":{
+      "className":"org.apache.eagle.alert.engine.extension.dedup.EsDocIdDeduplicator",
+      "properties":{
+        "esDocIdField":"values.docId",
+        "esAlertKeyField":"values.alertKey",
+        "esTimestampField":"timestamp",
+        "esStateField":"values.state",
+        "esStateCloseValue":"CLOSED",
+        "esIndice":"sherlock-alerts-network-*",
+        "esType":"syslogAlert",
+        "eventDocIdField":"docId",
+        "eventAlertKeyField":"alertKey",
+        "eventStateField":"state"
+      }
+    },
+    "properties":{
+      "kafka_broker":"broker:9092",
+      "topic":"process.network.alerts",
+      "value_deserializer":"SignalSetDeserializer",
+      "value_serializer":"SignalSetSerializer",
+      "rawAlertNamespaceLabel":"namespace",
+      "rawAlertNamespaceValue":"syslog"
+    },
+    "serializer":"AlertSignalSerializer"
+  },
+  {
+    "name":"network-nc-sherlock-publish",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "policyIds":[
+      "sherlock_metric_updown"
+    ],
+    "dedupIntervalMin":"PT0M",
+    "dedupFields":[
+      "alertKey"
+    ],
+    "dedupStateField":"severity",
+    "properties":{
+      "kafka_broker":"broker:9092",
+      "topic":"process.network.alerts",
+      "value_deserializer":"SignalSetDeserializer",
+      "value_serializer":"SignalSetSerializer"
+    },
+    "serializer":"AlertSignalSerializer"
+  },
+  {
+    "name":"network-aggregation-publish",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "policyIds":[
+      "network_aggregate"
+    ],
+    "dedupIntervalMin":"PT0M",
+    "properties":{
+      "kafka_broker":"broker:9092",
+      "topic":"process.network.alerts",
+      "value_deserializer":"SignalSetDeserializer",
+      "value_serializer":"SignalSetSerializer"
+    },
+    "serializer":"AlertSignalSerializer"
+  },
+  {
+    "name":"network-correlation-publish",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "policyIds":[
+      "network_correlate"
+    ],
+    "dedupIntervalMin":"PT0M",
+    "properties":{
+      "kafka_broker":"broker:9092",
+      "topic":"process.network.alerts",
+      "value_deserializer":"SignalSetDeserializer",
+      "value_serializer":"SignalSetSerializer"
+    },
+    "serializer":"AlertSignalSerializer"
+  },
+  {
+    "name":"network-nc-nervecenter-publish",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "policyIds":[
+      "nerveCenter_port_down"
+    ],
+    "dedupIntervalMin":"PT0M",
+    "dedupFields":[
+      "alertKey"
+    ],
+    "dedupStateField":"severity",
+    "properties":{
+      "kafka_broker":"broker:9092",
+      "topic":"process.network.alerts",
+      "value_deserializer":"SignalSetDeserializer",
+      "value_serializer":"SignalSetSerializer"
+    },
+    "serializer":"AlertSignalSerializer"
+  }
+]
\ No newline at end of file


Mime
View raw message