eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/2] incubator-eagle git commit: EAGLE-149 Enable hadoop jmx metric cases
Date Wed, 03 Feb 2016 14:26:29 GMT
EAGLE-149 Enable hadoop jmx metric cases

https://issues.apache.org/jira/browse/EAGLE-149

Closes #81


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f213bab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f213bab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f213bab0

Branch: refs/heads/master
Commit: f213bab049c9f19e7a1e8ecee25c7cbd76a3c318
Parents: bd3b555
Author: Hao Chen <hao@apache.org>
Authored: Wed Feb 3 22:25:35 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Wed Feb 3 22:25:35 2016 +0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../src/main/bin/eagle-topology-init.sh         |  56 +++---
 .../src/main/bin/hadoop-metric-monitor.sh       |  96 ++++-----
 .../eagle/alert/cep/TestSiddhiEvaluator.java    |   9 +-
 .../alert/dao/TestAlertDefinitionDAOImpl.java   |   6 +-
 .../policy/TestPolicyDistributionUpdater.java   |   5 +-
 .../datastream/core/StreamAlertExpansion.scala  |   8 +-
 .../policy/dao/AlertDefinitionDAOImpl.java      |  86 --------
 eagle-external/hadoop_jmx_collector/.gitignore  |   2 +
 .../hadoop_jmx_collector/config-sample.json     |  19 ++
 eagle-external/hadoop_jmx_collector/config.json |  19 --
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |  30 +--
 .../hadoop_jmx_collector/metric_extensions.py   |  57 +++++-
 .../hadoop_jmx_collector/util_func.py           |  19 +-
 eagle-gc/src/main/resources/alert-gc-policy.sh  |   4 +-
 .../main/resources/alert-metadata-create-gc.sh  |   8 +-
 .../metric/HadoopJmxMetricDeserializer.java     |  56 ------
 .../hadoop/metric/HadoopJmxMetricMonitor.java   |  35 ++++
 .../eagle/hadoop/metric/NameNodeLagMonitor.java |  67 -------
 .../org/apache/eagle/hadoop/metric/Utils.java   |  64 ++++++
 .../src/main/resources/application.conf         |  27 +--
 .../src/main/resources/eagle-env.sh             |  44 +++++
 .../src/main/resources/hadoop-metric-init.sh    | 165 ++++++++++++++++
 .../src/main/resources/hadoopjmx.yaml           |  18 ++
 .../src/main/resources/hastate-policy-import.sh |  51 +++++
 .../lastcheckpointtime-policy-import.sh         |  51 +++++
 .../resources/missingblock-policy-import.sh     |  51 +++++
 .../main/resources/namenodelag-policy-import.sh |  49 +++++
 .../src/main/resources/namenodelag.yaml         |  18 --
 .../src/main/resources/namenodelage-init.sh     | 194 -------------------
 .../main/resources/nodecount-policy-import.sh   |  51 +++++
 .../resources/safemodecheck-policy-import.sh    |  51 +++++
 .../metric/HadoopJmxMetricDeserializerTest.java |  17 +-
 .../hadoop/metric/TestHadoopMetricSiddhiQL.java |  82 +++++++-
 eagle-topology-assembly/pom.xml                 |   5 +
 pom.xml                                         |   2 +
 36 files changed, 923 insertions(+), 600 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 8a40f3a..033e79f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,6 +73,7 @@ logs/
 .DS_Store
 
 *.cache-tests
+application-local.conf
 
 *.orig
 **/*.pyc

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-assembly/src/main/bin/eagle-topology-init.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-topology-init.sh b/eagle-assembly/src/main/bin/eagle-topology-init.sh
index f30749f..03b2a9a 100755
--- a/eagle-assembly/src/main/bin/eagle-topology-init.sh
+++ b/eagle-assembly/src/main/bin/eagle-topology-init.sh
@@ -90,34 +90,34 @@ echo "Importing AlertStreamService for USERPROFILE"
 curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H "Content-Type: application/json"  "http://$EAGLE_SERVICE_HOST:$EAGLE_SERVICE_PORT/eagle-service/rest/entities?serviceName=AlertStreamService" \
      -d '[ { "prefix": "alertStream", "tags": { "streamName": "userActivity", "site":"sandbox", "dataSource":"userProfile" }, "alertExecutorIdList": [ "userProfileAnomalyDetectionExecutor" ] } ]'
 
-#####################################################################
-#            Import stream metadata for HADOOP METRIC
-#####################################################################
-
-## AlertDataSource: data sources bound to sites
-echo "Importing AlertDataSourceService for HADOOP METRIC ... "
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
-
-
-## AlertStreamService: alert streams generated from data source
-echo ""
-echo "Importing AlertStreamService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
--H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
--d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]'
-
-## AlertExecutorService: what alert streams are consumed by alert executor
-echo ""
-echo "Importing AlertExecutorService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]'
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]'
-
-## AlertStreamSchemaService: schema for event from alert stream
-echo ""
-echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"
 service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"
 service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
+######################################################################
+##            Import stream metadata for HADOOP METRIC
+######################################################################
+#
+### AlertDataSource: data sources bound to sites
+#echo "Importing AlertDataSourceService for HADOOP METRIC ... "
+#
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
+#
+#
+### AlertStreamService: alert streams generated from data source
+#echo ""
+#echo "Importing AlertStreamService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
+#-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
+#-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]'
+#
+### AlertExecutorService: what alert streams are consumed by alert executor
+#echo ""
+#echo "Importing AlertExecutorService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]'
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]'
+#
+### AlertStreamSchemaService: schema for event from alert stream
+#echo ""
+#echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":
 "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":
 "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
 
 ## Finished
 echo ""

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
index 706154a..d88f9d9 100644
--- a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
+++ b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh
@@ -1,48 +1,48 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-
-#####################################################################
-#            Import stream metadata for HADOOP METRIC
-#####################################################################
-## AlertDataSource: data sources bound to sites
-echo "Importing AlertDataSourceService for HADOOP METRIC ... "
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
-
-
-## AlertStreamService: alert streams generated from data source
-echo ""
-echo "Importing AlertStreamService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
--H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
--d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]'
-
-## AlertExecutorService: what alert streams are consumed by alert executor
-echo ""
-echo "Importing AlertExecutorService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]'
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]'
-
-## AlertStreamSchemaService: schema for event from alert stream
-echo ""
-echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"
 service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"
 service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
-
-$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor $EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf
\ No newline at end of file
+##!/bin/bash
+#
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##    http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+#
+#source $(dirname $0)/eagle-env.sh
+#
+######################################################################
+##            Import stream metadata for HADOOP METRIC
+######################################################################
+### AlertDataSource: data sources bound to sites
+#echo "Importing AlertDataSourceService for HADOOP METRIC ... "
+#
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]'
+#
+#
+### AlertStreamService: alert streams generated from data source
+#echo ""
+#echo "Importing AlertStreamService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \
+#-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
+#-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]'
+#
+### AlertExecutorService: what alert streams are consumed by alert executor
+#echo ""
+#echo "Importing AlertExecutorService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]'
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]'
+#
+### AlertStreamSchemaService: schema for event from alert stream
+#echo ""
+#echo "Importing AlertStreamSchemaService for HADOOP METRIC ... "
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":
 "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
+#curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":
 "service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]'
+#
+#$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor $EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index b29e4c1..ab086aa 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -28,10 +28,8 @@ import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.*;
 import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
 import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
 import org.apache.eagle.policy.siddhi.StreamMetadataManager;
@@ -95,7 +93,8 @@ public class TestSiddhiEvaluator {
 							"insert into outputStream ;";
         policyDef.setExpression(expression);
 
-		PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, null)) {
+		PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null),
+				Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
 			@Override
 			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
 				return null;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
index 783abea..79173f9 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
@@ -18,10 +18,11 @@ package org.apache.eagle.alert.dao;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,7 +55,8 @@ public class TestAlertDefinitionDAOImpl {
 
 		String site = "sandbox";
 		String dataSource = "UnitTest";
-		PolicyDefinitionDAO dao = new AlertDefinitionDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort)) {
+		PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort),
+				Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
 			@Override
 			public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
 				List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
index 7c02db9..7ca0846 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
@@ -27,9 +27,9 @@ import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
 import org.apache.eagle.alert.executor.AlertExecutor;
 import org.apache.eagle.policy.DefaultPolicyPartitioner;
 import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl;
 import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
 import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -45,7 +45,8 @@ public class TestPolicyDistributionUpdater {
 
     @Test
     public void testPolicyDistributionReporter() throws Exception{
-        PolicyDefinitionDAO alertDao = new AlertDefinitionDAOImpl(new EagleServiceConnector(null, 1)) {
+        PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, 1),
+                Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
             @Override
             public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
                 final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 97ea669..01d3a70 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -20,8 +20,10 @@ package org.apache.eagle.datastream.core
 
 import java.util
 
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity
 import org.apache.eagle.alert.executor.AlertExecutorCreationUtils
-import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl
+import org.apache.eagle.policy.common.Constants
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl
 
 import scala.collection.JavaConversions.asScalaSet
 import scala.collection.mutable.ListBuffer
@@ -89,7 +91,9 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
         /**
          * step 2: partition alert executor by policy partitioner class
          */
-        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId)
+        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config,
+          new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME),
+          upStreamNames, alertExecutorId)
         var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
         alertExecutors.foreach(exec => {
           val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
deleted file mode 100644
index 8694f52..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.dao;
-
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Utility methods to load alert definitions for a program
- */
-public class AlertDefinitionDAOImpl implements PolicyDefinitionDAO<AlertDefinitionAPIEntity> {
-
-	private static final long serialVersionUID = 7717408104714443056L;
-	private static final Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAOImpl.class);
-	private final EagleServiceConnector connector;
-
-	public AlertDefinitionDAOImpl(EagleServiceConnector connector){
-		this.connector = connector;
-	}
-
-    @Override
-	public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
-		try {
-			IEagleServiceClient client = new EagleServiceClientImpl(connector);
-			String query = Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
-			GenericServiceAPIResponseEntity<AlertDefinitionAPIEntity> response =  client.search()
-																		                .pageSize(Integer.MAX_VALUE)
-																		                .query(query)
-																	                    .send();
-			client.close();
-			if (response.getException() != null) {
-				throw new Exception("Got an exception when query eagle service: " + response.getException()); 
-			}
-			List<AlertDefinitionAPIEntity> list = response.getObj();
-			List<AlertDefinitionAPIEntity> enabledList = new ArrayList<AlertDefinitionAPIEntity>();
-			for (AlertDefinitionAPIEntity entity : list) {
-				if (entity.isEnabled()) enabledList.add(entity);
-			}
-			return enabledList;
-		}
-		catch (Exception ex) {
-			LOG.error("Got an exception when query alert Def service", ex);
-			throw new IllegalStateException(ex);
-		}					   
-	}
-
-    @Override
-	public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
-		List<AlertDefinitionAPIEntity> list = findActivePolicies(site, dataSource);
-		Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-			for (AlertDefinitionAPIEntity entity : list) {
-				String executorID = entity.getTags().containsKey(Constants.EXECUTOR_ID) ? entity.getTags().get(Constants.EXECUTOR_ID)
-						: entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
-				if (map.get(executorID) == null) {
-					map.put(executorID, new HashMap<String, AlertDefinitionAPIEntity>());
-				}
-				map.get(executorID).put(entity.getTags().get("policyId"), entity);
-			}
-		return map;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/.gitignore b/eagle-external/hadoop_jmx_collector/.gitignore
new file mode 100644
index 0000000..adad264
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/.gitignore
@@ -0,0 +1,2 @@
+config.json
+*.pyc
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json
new file mode 100644
index 0000000..1f5cb7e
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/config-sample.json
@@ -0,0 +1,19 @@
+{
+   "env": {
+    "site": "sandbox"
+   },
+   "input": {
+    "component": "namenode",
+    "port": "50070",
+    "https": false
+   },
+   "filter": {
+    "monitoring.group.selected": ["hadoop", "java.lang"]
+   },
+   "output": {
+     "kafka": {
+       "topic": "nn_jmx_metric_sandbox",
+       "brokerList": ["sandbox.hortonworks.com:6667"]
+     }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/config.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config.json b/eagle-external/hadoop_jmx_collector/config.json
deleted file mode 100644
index 1f5cb7e..0000000
--- a/eagle-external/hadoop_jmx_collector/config.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
-   "env": {
-    "site": "sandbox"
-   },
-   "input": {
-    "component": "namenode",
-    "port": "50070",
-    "https": false
-   },
-   "filter": {
-    "monitoring.group.selected": ["hadoop", "java.lang"]
-   },
-   "output": {
-     "kafka": {
-       "topic": "nn_jmx_metric_sandbox",
-       "brokerList": ["sandbox.hortonworks.com:6667"]
-     }
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index e0667af..b342a5e 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -70,8 +70,7 @@ def get_metric_prefix_name(mbean_attribute, context):
         name_index = [i[0] for i in mbean_list].index('name')
         mbean_list[name_index][1] = context
         metric_prefix_name = '.'.join([i[1] for i in mbean_list])
-    return DATA_TYPE + "." + metric_prefix_name
-
+    return (DATA_TYPE + "." + metric_prefix_name).replace(" ","").lower()
 
 def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean):
     selected_group = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
@@ -97,7 +96,7 @@ def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean):
         for key, value in bean.iteritems():
             #print key, value
             key = key.lower()
-            if not isNumber(value) or re.match(r'tag.*', key):
+            if re.match(r'tag.*', key):
                 continue
 
             if mbean_domain == 'hadoop' and re.match(r'^namespace', key):
@@ -111,8 +110,8 @@ def parse_hadoop_jmx(producer, topic, config, beans, dataMap, fat_bean):
                 key = items[1]
 
             metric = metric_prefix_name + '.' + key
-            send_output_message(producer, topic, kafka_dict, metric, value)
 
+            single_metric_callback(producer, topic, kafka_dict, metric, value)
 
 def get_jmx_beans(host, port, https):
     # port = inputConfig.get('port')
@@ -135,12 +134,16 @@ def main():
     kafka = None
     producer = None
     topic = None
+    brokerList = None
 
     try:
         #start = time.clock()
 
         # read the kafka.ini
-        config = load_config('config.json')
+        if (len(sys.argv) > 1):
+            config = load_config(sys.argv[1])
+        else:
+            config = load_config('config.json')
         #print config
 
         site = config[u'env'].get('site').encode('utf-8')
@@ -154,18 +157,21 @@ def main():
         port = config[u'input'].get('port')
         https = config[u'input'].get('https')
         kafkaConfig = config[u'output'].get(u'kafka')
-        brokerList = kafkaConfig.get('brokerList')
-        topic = kafkaConfig.get('topic').encode('utf-8')
+        if kafkaConfig != None :
+            brokerList = kafkaConfig.get('brokerList')
+            topic = kafkaConfig.get('topic').encode('utf-8')
 
         beans = get_jmx_beans(host, port, https)
         #print brokerList
-        kafka, producer = kafka_connect(brokerList)
+        if brokerList != None:
+            kafka, producer = kafka_connect(brokerList)
+
         default_metric = {"site": site, "host": host, "timestamp": '', "component": component, "metric": '', "value": ''}
         fat_bean = dict()
         parse_hadoop_jmx(producer, topic, config, beans, default_metric, fat_bean)
-        extend_jmx_metrics(producer, topic, default_metric, fat_bean)
-    except Exception, e:
-        print 'main except: ', e
+        metrics_bean_callback(producer, topic, default_metric, fat_bean)
+    # except Exception, e:
+    #     print 'main except: ', e
     finally:
         if kafka != None and producer != None:
             kafka_close(kafka, producer)
@@ -174,4 +180,4 @@ def main():
         #print("Time used:",elapsed)
 
 if __name__ == "__main__":
-    main()
\ No newline at end of file
+    main()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/metric_extensions.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_extensions.py b/eagle-external/hadoop_jmx_collector/metric_extensions.py
index 4ea89c9..0028204 100644
--- a/eagle-external/hadoop_jmx_collector/metric_extensions.py
+++ b/eagle-external/hadoop_jmx_collector/metric_extensions.py
@@ -14,11 +14,55 @@
 # limitations under the License.
 
 
-#!/usr/bin/python
+# !/usr/bin/python
 
 from util_func import *
 import json
 
+
+# Metric Parsing Callback Entry
+def single_metric_callback(producer, topic, kafka_dict, metric, value):
+    if isNumber(value):
+        numeric_metric_callack(producer, topic, kafka_dict, metric, value)
+    else:
+        nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value)
+
+
+def metrics_bean_callback(producer, topic, metric, bean):
+    cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm")
+    journal_transaction_info(producer, topic, bean, metric, "hadoop.namenode.journaltransaction")
+    nn_hastate(producer,topic,bean,metric,"hadoop.namenode.fsnamesystem")
+
+#################################################
+# Metric Parsing Extensions
+#################################################
+
+def numeric_metric_callack(producer, topic, kafka_dict, metric, value):
+    # Send out numeric value directly
+    send_output_message(producer, topic, kafka_dict, metric, value)
+
+
+def nonnumeric_metric_callack(producer, topic, kafka_dict, metric, value):
+    nn_safe_mode_metric(producer, topic, kafka_dict, metric, value)
+
+def nn_safe_mode_metric(producer, topic, kafka_dict, metric, value):
+    if metric == "hadoop.namenode.fsnamesystemstate.fsstate":
+        if value == "safeMode":
+            value = 1
+        else:
+            value = 0
+
+        send_output_message(producer, topic, kafka_dict, metric, value)
+
+def nn_hastate(producer, topic, bean, metricMap, metric_prefix_name="hadoop.namenode.fsnamesystem"):
+    kafka_dict = metricMap.copy()
+    if bean[u'tag.HAState'] == "active":
+        value = 0
+    else:
+        value = 1
+
+    send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".hastate", value)
+
 def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name):
     kafka_dict = metricMap.copy()
     PercentVal = None
@@ -34,6 +78,7 @@ def cal_mem_usage(producer, topic, bean, metricMap, metric_prefix_name):
     PercentVal = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100, 2)
     send_output_message(producer, topic, kafka_dict, metric_prefix_name + ".MemHeapCommittedUsage", PercentVal)
 
+
 def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name):
     new_metric = metric.copy()
     if bean.has_key("JournalTransactionInfo"):
@@ -42,11 +87,9 @@ def journal_transaction_info(producer, topic, bean, metric, metric_prefix_name):
         LastAppliedOrWrittenTxId = int(JournalTransactionInfo.get("LastAppliedOrWrittenTxId"))
         MostRecentCheckpointTxId = int(JournalTransactionInfo.get("MostRecentCheckpointTxId"))
 
-        send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
-        send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId", MostRecentCheckpointTxId)
+        send_output_message(producer, topic, new_metric, metric_prefix_name + ".LastAppliedOrWrittenTxId",
+                            LastAppliedOrWrittenTxId)
+        send_output_message(producer, topic, new_metric, metric_prefix_name + ".MostRecentCheckpointTxId",
+                            MostRecentCheckpointTxId)
     else:
         raise Exception("JournalTransactionInfo not found")
-
-def extend_jmx_metrics(producer, topic, metric, bean):
-    cal_mem_usage(producer, topic, bean, metric, "hadoop.namenode.jvm")
-    journal_transaction_info(producer,topic,bean,metric,"hadoop.namenode.JournalTransaction")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-external/hadoop_jmx_collector/util_func.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/util_func.py b/eagle-external/hadoop_jmx_collector/util_func.py
index c02abbd..a2ca44a 100644
--- a/eagle-external/hadoop_jmx_collector/util_func.py
+++ b/eagle-external/hadoop_jmx_collector/util_func.py
@@ -52,22 +52,21 @@ def send_output_message(producer, topic, kafka_dict, metric, value):
 def load_config(filename):
     # read the self-defined filters
 
-    script_dir = os.path.dirname(__file__)
-    rel_path = "./" + filename
-    abs_file_path = os.path.join(script_dir, rel_path)
-    f = open(abs_file_path, 'r')
-    json_file = f.read()
-    f.close()
-    #print json_file
-
     try:
+        script_dir = os.path.dirname(__file__)
+        rel_path = "./" + filename
+        abs_file_path = os.path.join(script_dir, rel_path)
+        if not os.path.isfile(abs_file_path):
+            print abs_file_path+" doesn't exist, please rename config-sample.json to config.json"
+            exit(1)
+        f = open(abs_file_path, 'r')
+        json_file = f.read()
+        f.close()
         config = json.loads(json_file)
-
     except ValueError:
         print "configuration file load error"
     return config
 
-
 def isNumber(str):
     try:
         if str == None or isinstance(str, (bool)):

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-gc/src/main/resources/alert-gc-policy.sh
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/alert-gc-policy.sh b/eagle-gc/src/main/resources/alert-gc-policy.sh
index ffa6549..7f32c22 100644
--- a/eagle-gc/src/main/resources/alert-gc-policy.sh
+++ b/eagle-gc/src/main/resources/alert-gc-policy.sh
@@ -15,7 +15,7 @@
 
 #!/bin/sh
 #### AlertDefinitionService: alert definition for NNGCLog Pause Time Long
-curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox"","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode pause time long","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream#window.externalTime(timestamp,10 min) select sum(pausedGCTimeSec) as sumPausedSec having sumPausedSec >= 30 insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]'
+curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode pause time long","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream#window.externalTime(timestamp,10 min) select sum(pausedGCTimeSec) as sumPausedSec having sumPausedSec >= 30 insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]'
 
 #### AlertDefinitionService: alert definition for NNGCLog Full GC
-curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode has full gc","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream[(permAreaGCed == true)] select * insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]'
\ No newline at end of file
+curl -X POST -H 'Content-Type:application/json' -H "Authorization:Basic YWRtaW46c2VjcmV0" "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"NNGCLog","policyId":"NNGCPauseTimeLong","alertExecutorId":"NNGCAlert","policyType":"siddhiCEPEngine"},"desc":"alert when namenode has full gc","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from NNGCLogStream[(permAreaGCed == true)] select * insert into outputStream;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":true}]'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-gc/src/main/resources/alert-metadata-create-gc.sh
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/alert-metadata-create-gc.sh b/eagle-gc/src/main/resources/alert-metadata-create-gc.sh
index 1e57c28..2e34490 100644
--- a/eagle-gc/src/main/resources/alert-metadata-create-gc.sh
+++ b/eagle-gc/src/main/resources/alert-metadata-create-gc.sh
@@ -15,13 +15,13 @@
 
 #!/bin/sh
 #### AlertDataSourceService: alert streams generated from data source
-curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix": "alertDataSource", "tags": {"site": "sandbox","dataSource": "NNGCLog"}}]'
+curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix": "alertDataSource", "tags": {"site": "sandbox","dataSource": "NNGCLog"}}]'
 
 #### AlertStreamService: alert streams generated from data source
-curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream"},"desc":"alert event stream from namenode gc log"}]'
+curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertStreamService" -d '[{"prefix":"alertStream","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream"},"desc":"alert event stream from namenode gc log"}]'
 
 #### AlertExecutorService: what alert streams are consumed by alert executor
-curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"NNGCLog","alertExecutorId":"NNGCAlert","streamName":"NNGCLogStream"},"desc":"alert executor for namenode gc log"}]'
+curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"NNGCLog","alertExecutorId":"NNGCAlert","streamName":"NNGCLogStream"},"desc":"alert executor for namenode gc log"}]'
 
 #### AlertStreamSchemaService: schema for event from alert stream
-curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"timestamp"}},{"prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"eventType"}},{"prefix":"alertStreamSchema","category":"","attrType":"double","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"pausedGCTimeSec"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","
 streamName":"NNGCLogStream","attrName":"youngUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long
 ","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapUsageAvailable"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"usedTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapK"}},{
 "prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"logLine"}}]'
\ No newline at end of file
+curl -X POST -H 'Content-Type:application/json' -H 'Authorization: Basic YWRtaW46c2VjcmV0' "http://localhost:9099/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"timestamp"}},{"prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"eventType"}},{"prefix":"alertStreamSchema","category":"","attrType":"double","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"pausedGCTimeSec"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","s
 treamName":"NNGCLogStream","attrName":"youngUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"youngTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permAreaGCed"}},{"prefix":"alertStreamSchema","category":"","attrType":"long"
 ,"attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"permTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapUsageAvailable"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"usedTotalHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"bool","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"tenuredUsedHeapK"}},{"prefix":"alertStreamSchema","category":"","attrType":"long","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"totalHeapK"}},{"
 prefix":"alertStreamSchema","category":"","attrType":"string","attrValueResolver":"","tags":{"dataSource":"NNGCLog","streamName":"NNGCLogStream","attrName":"logLine"}}]'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
deleted file mode 100644
index 27a416d..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Properties;
-import java.util.SortedMap;
-
-/**
- * Created on 1/19/16.
- */
-public class HadoopJmxMetricDeserializer implements SpoutKafkaMessageDeserializer {
-    
-    private static final Logger LOG = LoggerFactory.getLogger(HadoopJmxMetricDeserializer.class);
-
-    private Properties props;
-
-    public  HadoopJmxMetricDeserializer(Properties props){
-        this.props = props;
-    }
-
-
-    // convert to a map of <key, map<>>
-    @Override
-    public Object deserialize(byte[] arg0) {
-        try {
-            String content = new String(arg0, Charset.defaultCharset().name());
-            Map<String, Object> metricItem = JsonSerDeserUtils.deserialize(content, SortedMap.class);
-            return metricItem;
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOG.error("unrecognizable content", e);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
new file mode 100644
index 0000000..9202da4
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.core.StreamProducer;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+
+/**
+ * Created on 1/12/16.
+ */
+public class HadoopJmxMetricMonitor {
+
+    public static void main(String[] args) {
+        StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
+        String streamName = "hadoopJmxMetricEventStream";
+        StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
+        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
+        env.execute();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
deleted file mode 100644
index 7c574f8..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/NameNodeLagMonitor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 1/12/16.
- */
-public class NameNodeLagMonitor {
-
-    public static void main(String[] args) {
-        StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
-        String streamName = "hadoopJmxMetricEventStream";
-        StreamProducer sp = env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).parallelism(1).nameAs(streamName);
-        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
-
-        env.execute();
-    }
-
-    // create a tuple kafka source
-    private static KafkaSourcedSpoutProvider createProvider(Config config) {
-        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-            @Override
-            public List<Object> deserialize(byte[] ser) {
-                Object tmp = deserializer.deserialize(ser);
-                Map<String, Object> map = (Map<String, Object>)tmp;
-                if(tmp == null) return null;
-                // this is the key to be grouped by
-                return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp);
-            }
-        };
-
-        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-            @Override
-            public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-                return new SchemeAsMultiScheme(scheme);
-            }
-        };
-        return provider;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
new file mode 100644
index 0000000..173441c
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 1/25/16.
+ */
+public class Utils {
+
+    /**
+     * Creates a spout provider that have host-metric as the first tuple data, so that it's feasible for alert grouping.
+     *
+     * @param config
+     * @return
+     */
+    public static KafkaSourcedSpoutProvider createProvider(Config config) {
+        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+
+            @Override
+            public List<Object> deserialize(byte[] ser) {
+                Object tmp = deserializer.deserialize(ser);
+                Map<String, Object> map = (Map<String, Object>) tmp;
+                if (tmp == null) return null;
+                // this is the key to be grouped by
+                return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp);
+            }
+
+        };
+
+        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+
+            @Override
+            public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+                return new SchemeAsMultiScheme(scheme);
+            }
+
+        };
+        return provider;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf
index aa7e340..d72cc26 100644
--- a/eagle-hadoop-metric/src/main/resources/application.conf
+++ b/eagle-hadoop-metric/src/main/resources/application.conf
@@ -17,8 +17,8 @@
   "envContextConfig" : {
     "env" : "storm",
     "mode" : "local",
-    "topologyName" : "nameNodeLagTopology",
-    "stormConfigFile" : "namenodelage.yaml",
+    "topologyName" : "hadoopJmxMetricTopology",
+    "stormConfigFile" : "hadoopjmx.yaml",
     "parallelismConfig" : {
       "kafkaMsgConsumer" : 1,
       "hadoopJmxMetricAlertExecutor*" : 1
@@ -26,12 +26,12 @@
   },
   "dataSourceConfig": {
     "topic" : "nn_jmx_metric_sandbox",
-    "zkConnection" : "localhost:2181",
+    "zkConnection" : "sandbox.hortonworks.com:2181",
     "zkConnectionTimeoutMS" : 15000,
     "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.hadoop.metric.HadoopJmxMetricDeserializer",
-    "transactionZKServers" : "localhost",
+    "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
+    "transactionZKServers" : "sandbox.hortonworks.com",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
     "transactionStateUpdateMS" : 2000
@@ -43,23 +43,6 @@
        "needValidation" : "true"
      }
   },
-  "persistExecutorConfigs" {
-    "persistExecutor1" : {
-      "kafka": {
-        "bootstrap_servers" : "localhost",
-        "topics" : {
-          "defaultOutput" : "downSampleOutput"
-        }
-      }
-    }
-  },
-  "aggregateExecutorConfigs" : {
-    "aggregateExecutor1" : {
-      "parallelism" : 1,
-      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-      "needValidation" : "true"
-    }
-  },
   "eagleProps" : {
     "site" : "sandbox",
     "dataSource": "hadoopJmxMetricDataSource",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/eagle-env.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/eagle-env.sh b/eagle-hadoop-metric/src/main/resources/eagle-env.sh
new file mode 100755
index 0000000..79ff5fa
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/eagle-env.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# set EAGLE_HOME
+export EAGLE_HOME=$(dirname $0)/..
+
+# The java implementation to use. please use jdk 1.7 or later
+# export JAVA_HOME=${JAVA_HOME}
+# export JAVA_HOME=/usr/java/jdk1.7.0_80/
+
+# nimbus.host, default is localhost
+export EAGLE_NIMBUS_HOST=localhost
+
+# EAGLE_SERVICE_HOST, default is `hostname -f`
+export EAGLE_SERVICE_HOST=localhost
+
+# EAGLE_SERVICE_PORT, default is 9099
+export EAGLE_SERVICE_PORT=9099
+
+# EAGLE_SERVICE_USER
+export EAGLE_SERVICE_USER=admin
+
+# EAGLE_SERVICE_PASSWORD
+export EAGLE_SERVICE_PASSWD=secret
+
+export EAGLE_CLASSPATH=$EAGLE_HOME/conf
+# Add eagle shared library jars
+for file in $EAGLE_HOME/lib/share/*;do
+	EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$file
+done

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
new file mode 100644
index 0000000..8405c19
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
@@ -0,0 +1,165 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source $(dirname $0)/eagle-env.sh
+
+#####################################################################
+#            Import stream metadata for HDFS
+#####################################################################
+
+## AlertDataSource: data sources bound to sites
+echo "Importing AlertDataSourceService for persist... "
+
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" \
+  -d '
+  [
+     {
+        "prefix":"alertDataSource",
+        "tags":{
+           "site":"sandbox",
+           "dataSource":"hadoopJmxMetricDataSource"
+        },
+        "enabled": true,
+        "config":"",
+        "desc":"hadoop"
+     }
+  ]
+  '
+
+## AlertStreamService: alert streams generated from data source
+echo ""
+echo "Importing AlertStreamService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
+ -d '
+ [
+    {
+       "prefix":"alertStream",
+       "tags":{
+          "dataSource":"hadoopJmxMetricDataSource",
+          "streamName":"hadoopJmxMetricEventStream"
+       },
+       "desc":"hadoop"
+    }
+ ]
+ '
+
+## AlertExecutorService: what alert streams are consumed by alert executor
+echo ""
+echo "Importing AlertExecutorService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
+ -d '
+ [
+    {
+       "prefix":"alertExecutor",
+       "tags":{
+          "dataSource":"hadoopJmxMetricDataSource",
+          "alertExecutorId":"hadoopJmxMetricAlertExecutor",
+          "streamName":"hadoopJmxMetricEventStream"
+       },
+       "desc":"aggregate executor for hadoop jmx metric event stream"
+    }
+ ]
+ '
+
+## AlertStreamSchemaService: schema for event from alert stream
+echo ""
+echo "Importing AlertStreamSchemaService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
+ -d '
+ [
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "host"
+       },
+       "attrDescription": "the host that current metric comes form",
+       "attrType": "string",
+       "category": "",
+       "attrValueResolver": ""
+    },
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "timestamp"
+       },
+       "attrDescription": "the metric timestamp",
+       "attrType": "long",
+       "category": "",
+       "attrValueResolver": ""
+    },
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "metric"
+       },
+       "attrDescription": "the metric name",
+       "attrType": "string",
+       "category": "",
+       "attrValueResolver": ""
+    },
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "component"
+       },
+       "attrDescription": "the component that the metric comes from",
+       "attrType": "string",
+       "category": "",
+       "attrValueResolver": ""
+    },
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "site"
+       },
+       "attrDescription": "the site that the metric belongs to",
+       "attrType": "string",
+       "category": "",
+       "attrValueResolver": ""
+    },
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "hadoopJmxMetricDataSource",
+          "streamName": "hadoopJmxMetricEventStream",
+          "attrName": "value"
+       },
+       "attrDescription": "the metric value in string presentation",
+       "attrType": "double",
+       "category": "",
+       "attrValueResolver": ""
+    }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for eagle topology"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml b/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
new file mode 100644
index 0000000..a68a323
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+topology.workers: 1
+topology.acker.executors: 1
+topology.tasks: 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f213bab0/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh b/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
new file mode 100644
index 0000000..a043cd4
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source $(dirname $0)/eagle-env.sh
+source $(dirname $0)/hadoop-metric-init.sh
+
+
+##### add policies ##########
+echo ""
+echo "Importing policy: haStatePolicy "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+     {
+       "prefix": "alertdef",
+       "tags": {
+         "site": "sandbox",
+         "dataSource": "hadoopJmxMetricDataSource",
+         "policyId": "haStatePolicy",
+         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
+         "policyType": "siddhiCEPEngine"
+       },
+       "description": "jmx metric ",
+       "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.fsnamesystem.hastate\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host == a.host and (convert(a.value, \\\"long\\\") != convert(value, \\\"long\\\"))] within 10 min select a.host, a.value as oldHaState, b.value as newHaState, b.timestamp as timestamp, b.metric as metric, b.component as component, b.site as site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
+       "enabled": true,
+       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
+       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
+     }
+ ]
+ '
+
+ ## Finished
+echo ""
+echo "Finished initialization for eagle topology"
+
+exit 0



Mime
View raw message