eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [2/3] incubator-eagle git commit: [EAGLE-545] hdfs/bhase/yarn topology health check
Date Mon, 10 Oct 2016 03:26:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
new file mode 100644
index 0000000..f2cad37
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hdfs;
+
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.*;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+
+public class HdfsTopologyEntityParser implements TopologyEntityParser {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
+    private String [] namenodeUrls;
+    private String site;
+    private TopologyRackResolver rackResolver;
+
+    private static final String JMX_URL = "/jmx?anonymous=true";
+    private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
+    private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
+
+    private static final String HA_STATE = "tag.HAState";
+    private static final String HA_NAME = "tag.Hostname";
+    private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
+    private static final String CAPACITY_USED_GB = "CapacityUsedGB";
+    private static final String BLOCKS_TOTAL = "BlocksTotal";
+    private static final String LIVE_NODES = "LiveNodes";
+    private static final String DEAD_NODES = "DeadNodes";
+
+    private static final String JN_STATUS = "NameJournalStatus";
+    private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
+    private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
+
+    private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
+    private static final String DATA_NODE_USED_SPACE = "usedSpace";
+    private static final String DATA_NODE_CAPACITY = "capacity";
+    private static final String DATA_NODE_ADMIN_STATE = "adminState";
+    private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
+
+    private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
+    private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
+
+    private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
+    private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
+
+    public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
+        this.namenodeUrls = hdfsConfig.namenodeUrls;
+        this.site = site;
+        this.rackResolver = rackResolver;
+    }
+
+    @Override
+    public TopologyEntityParserResult parse(long timestamp) throws IOException {
+        final TopologyEntityParserResult result = new TopologyEntityParserResult();
+        result.setVersion(TopologyConstants.HadoopVersion.V2);
+        int numNamenode = 0;
+        for (String url : namenodeUrls) {
+            try {
+                final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
+                result.getMasterNodes().add(namenodeEntity);
+                numNamenode++;
+                if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
+                    createSlaveNodeEntities(url, timestamp, result);
+                }
+            } catch (RuntimeException ex) {
+                ex.printStackTrace();
+            } catch (IOException e) {
+                LOG.warn("Catch an IOException with url: {}", url);
+            }
+        }
+        double value = numNamenode * 1d / namenodeUrls.length;
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
+        return result;
+    }
+
+    private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
+        final String urlString = buildFSNamesystemURL(url);
+        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+        final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
+        if (bean == null || bean.getPropertyMap() == null) {
+            throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
+        }
+        final String hostname = (String)bean.getPropertyMap().get(HA_NAME);
+        HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
+        final String state = (String)bean.getPropertyMap().get(HA_STATE);
+        result.setStatus(state);
+        final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
+        result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
+        final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
+        result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
+        final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
+        result.setNumBlocks(Integer.toString(blocksTotal));
+        return result;
+    }
+
+    private void createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
+        final String urlString = buildNamenodeInfo(url);
+        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+        final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
+        if (bean == null || bean.getPropertyMap() == null) {
+            throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
+        }
+        createAllDataNodeEntities(bean, updateTime, result);
+        createAllJournalNodeEntities(bean, updateTime, result);
+    }
+
+    private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
+        if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
+            return;
+        }
+        String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
+        JSONObject jsonObject = new JSONObject(jnInfoString);
+        long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
+
+        String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
+        JSONArray jsonArray = new JSONArray(journalnodeString);
+        JSONObject jsonMap = (JSONObject) jsonArray.get(0);
+
+        Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
+        String QJM = jsonMap.getString("manager");
+        Pattern qjm = Pattern.compile(QJM_PATTERN);
+        Matcher jpmMatcher = qjm.matcher(QJM);
+        while (jpmMatcher.find()) {
+            String ip = jpmMatcher.group(1);
+            String hostname = EntityBuilderHelper.resolveHostByIp(ip);
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
+            entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+            journalNodesMap.put(ip, entity);
+        }
+        if (journalNodesMap.isEmpty()) {
+            LOG.warn("Fail to find journal node info in JMX");
+            return;
+        }
+
+        String stream = jsonMap.getString("stream");
+        Pattern status = Pattern.compile(STATUS_PATTERN);
+        Matcher statusMatcher = status.matcher(stream);
+        long numLiveJournalNodes = 0;
+        while (statusMatcher.find()) {
+            numLiveJournalNodes++;
+            String ip = statusMatcher.group(1);
+            if (journalNodesMap.containsKey(ip)) {
+                long txid = Long.parseLong(statusMatcher.group(2));
+                journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
+                journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+            }
+        }
+        result.getMasterNodes().addAll(journalNodesMap.values());
+
+        double value = numLiveJournalNodes * 1d / journalNodesMap.size();
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
+    }
+
+    private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
+        int numLiveNodes = 0;
+        int numLiveDecommNodes = 0;
+        int numDeadNodes = 0;
+        int numDeadDecommNodes = 0;
+
+        String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
+        JSONTokener tokener  = new JSONTokener(deadNodesStrings);
+        JSONObject jsonNodesObject = new JSONObject(tokener);
+        final JSONArray deadNodes = jsonNodesObject.names();
+        for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
+            final String hostname = deadNodes.getString(i);
+            final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime);
+            if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
+                ++numDeadDecommNodes;
+                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
+            } else {
+                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+            }
+            ++numDeadNodes;
+            result.getSlaveNodes().add(entity);
+        }
+        LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
+
+        String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
+        tokener = new JSONTokener(liveNodesStrings);
+        jsonNodesObject = new JSONObject(tokener);
+        final JSONArray liveNodes = jsonNodesObject.names();
+        for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
+            final String hostname = liveNodes.getString(i);
+            final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
+
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, hostname, updateTime);
+            final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
+            entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0));
+            final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
+            entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / 1024.0 / 1024.0 / 1024.0 / 1024.0));
+            final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
+            entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
+            if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
+                final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
+                entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
+            }
+            final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
+            if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
+                ++numLiveDecommNodes;
+                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
+            } else {
+                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+            }
+            numLiveNodes++;
+            result.getSlaveNodes().add(entity);
+        }
+        LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
+
+        double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
+    }
+
+    private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
+        HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
+        entity.setTimestamp(updateTime);
+        Map<String, String> tags = new HashMap<String, String>();
+        entity.setTags(tags);
+        tags.put(SITE_TAG, site);
+        tags.put(ROLE_TAG, roleType);
+        tags.put(HOSTNAME_TAG, hostname);
+        String rack = rackResolver.resolve(hostname);
+        tags.put(RACK_TAG, rack);
+        return entity;
+    }
+
+    private String buildFSNamesystemURL(String url) {
+        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
+    }
+
+    private String buildNamenodeInfo(String url) {
+        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
+    }
+
+    @Override
+    public TopologyConstants.TopologyType getTopologyType() {
+        return TopologyConstants.TopologyType.HDFS;
+    }
+
+    @Override
+    public TopologyConstants.HadoopVersion getHadoopVersion() {
+        return TopologyConstants.HadoopVersion.V2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
new file mode 100644
index 0000000..af6bd51
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyCrawler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyCheckMessageId;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.extractor.TopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MRTopologyCrawler implements TopologyCrawler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MRTopologyCrawler.class);
+
+    private MRTopologyEntityParser parser;
+    private SpoutOutputCollector outputCollector;
+
+    public MRTopologyCrawler(TopologyCheckAppConfig config, TopologyRackResolver rackResolver, SpoutOutputCollector collector) {
+        this.parser = new MRTopologyEntityParser(config.dataExtractorConfig.site, config.mrConfig, rackResolver);
+        this.outputCollector = collector;
+    }
+
+    @Override
+    public void extract() {
+        long updateTimestamp = System.currentTimeMillis();
+        TopologyEntityParserResult result = parser.parse(updateTimestamp);
+        if (result == null || result.getMasterNodes().isEmpty()) {
+            LOG.warn("No data fetched");
+            return;
+        }
+        TopologyCheckMessageId messageId = new TopologyCheckMessageId(TopologyConstants.TopologyType.MR, updateTimestamp);
+        this.outputCollector.emit(new Values(TopologyConstants.MR_INSTANCE_SERVICE_NAME, result), messageId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
new file mode 100644
index 0000000..455b1d0
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.apache.eagle.app.utils.AppConstants;
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.eagle.topology.utils.ServiceNotResponseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class MRTopologyEntityParser implements TopologyEntityParser {
+
+    private String [] rmUrls;
+    private String historyServerUrl;
+    private String site;
+    private TopologyRackResolver rackResolver;
+
+    private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
+    private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
+        this.site = site;
+        this.rmUrls = config.rmUrls;
+        this.historyServerUrl = config.historyServerUrl;
+        this.rackResolver = rackResolver;
+    }
+
+    @Override
+    public TopologyConstants.HadoopVersion getHadoopVersion() {
+        return TopologyConstants.HadoopVersion.V2;
+    }
+
+    @Override
+    public TopologyConstants.TopologyType getTopologyType() {
+        return TopologyConstants.TopologyType.MR;
+    }
+
+    @Override
+    public TopologyEntityParserResult parse(long timestamp) {
+        final TopologyEntityParserResult result = new TopologyEntityParserResult();
+        result.setVersion(TopologyConstants.HadoopVersion.V2);
+
+        for (String url : rmUrls) {
+            try {
+                doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
+            } catch (ServiceNotResponseException ex) {
+                LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
+                // reSelect url
+            }
+        }
+        if (result.getMasterNodes().isEmpty()) {
+            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, 0, site, timestamp));
+        }
+        doCheckHistoryServer(timestamp, result);
+        return result;
+    }
+
+    private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
+        if (historyServerUrl == null || historyServerUrl.isEmpty()) {
+            return;
+        }
+        String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
+        double liveCount = 1;
+        try {
+            InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
+        } catch (ConnectException e) {
+            liveCount = 0;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return;
+        }
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
+    }
+
+    private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
+        InputStream is = null;
+        try {
+            is = InputStreamUtils.getInputStream(url, null, type);
+        } catch (ConnectException e) {
+            throw new ServiceNotResponseException(e);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return is;
+    }
+
+    private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
+
+        InputStream is = null;
+        try {
+            LOGGER.info("Going to query URL: " + url);
+            is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
+            YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
+            if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
+                throw new ServiceNotResponseException("Invalid result of URL: " + url);
+            }
+            int runningNodeCount = 0;
+            int lostNodeCount = 0;
+            int unhealthyNodeCount = 0;
+            final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
+            for (YarnNodeInfo info : list) {
+                final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
+                if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
+                    nodeManagerEntity.setHealthReport(info.getHealthReport());
+                }
+                // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
+                if (info.getState() != null) {
+                    final String state = info.getState().toLowerCase();
+                    nodeManagerEntity.setStatus(state);
+                    if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
+                        ++runningNodeCount;
+                    } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
+                        ++lostNodeCount;
+                    } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
+                        ++unhealthyNodeCount;
+                    }
+                }
+                result.getSlaveNodes().add(nodeManagerEntity);
+            }
+            LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
+            final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
+            resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
+            result.getMasterNodes().add(resourceManagerEntity);
+            double value = runningNodeCount * 1d / list.size();
+            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
+        } catch (RuntimeException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            throw new ServiceNotResponseException(e);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    // Do nothing
+                }
+            }
+        }
+    }
+
+    private String extractMasterHost(String url) {
+        Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
+        if (matcher.find()) {
+            return matcher.group(1);
+        }
+        return url;
+    }
+
+    private String extractRack(YarnNodeInfo info) {
+        if (info.getRack() == null) {
+            return null;
+        }
+        String value = info.getRack();
+        value = value.substring(value.lastIndexOf('/') + 1);
+        return value;
+    }
+
+    private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
+        MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
+        entity.setLastUpdateTime(updateTime);
+        Map<String, String> tags = new HashMap<String, String>();
+        entity.setTags(tags);
+        tags.put(SITE_TAG, site);
+        tags.put(ROLE_TAG, roleType);
+        tags.put(HOSTNAME_TAG, hostname);
+        String rack = rackResolver.resolve(hostname);
+        tags.put(RACK_TAG, rack);
+        return entity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
new file mode 100644
index 0000000..9315f78
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfo {
+
+    private String rack;
+    private String state;
+    private String id;
+    private String nodeHostName;
+    private String nodeHTTPAddress;
+    private String lastHealthUpdate;
+    private String healthReport;
+    private String numContainers;
+    private String usedMemoryMB;
+    private String availMemoryMB;
+
+
+    public String getRack() {
+        return rack;
+    }
+    public void setRack(String rack) {
+        this.rack = rack;
+    }
+    public String getState() {
+        return state;
+    }
+    public void setState(String state) {
+        this.state = state;
+    }
+    public String getId() {
+        return id;
+    }
+    public void setId(String id) {
+        this.id = id;
+    }
+    public String getNodeHostName() {
+        return nodeHostName;
+    }
+    public void setNodeHostName(String nodeHostName) {
+        this.nodeHostName = nodeHostName;
+    }
+    public String getNodeHTTPAddress() {
+        return nodeHTTPAddress;
+    }
+    public void setNodeHTTPAddress(String nodeHTTPAddress) {
+        this.nodeHTTPAddress = nodeHTTPAddress;
+    }
+    public String getLastHealthUpdate() {
+        return lastHealthUpdate;
+    }
+    public void setLastHealthUpdate(String lastHealthUpdate) {
+        this.lastHealthUpdate = lastHealthUpdate;
+    }
+    public String getHealthReport() {
+        return healthReport;
+    }
+    public void setHealthReport(String healthReport) {
+        this.healthReport = healthReport;
+    }
+    public String getNumContainers() {
+        return numContainers;
+    }
+    public void setNumContainers(String numContainers) {
+        this.numContainers = numContainers;
+    }
+    public String getUsedMemoryMB() {
+        return usedMemoryMB;
+    }
+    public void setUsedMemoryMB(String usedMemoryMB) {
+        this.usedMemoryMB = usedMemoryMB;
+    }
+    public String getAvailMemoryMB() {
+        return availMemoryMB;
+    }
+    public void setAvailMemoryMB(String availMemoryMB) {
+        this.availMemoryMB = availMemoryMB;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
new file mode 100644
index 0000000..83d8d7f
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfoWrapper {
+
+    private YarnNodeInfos infos;
+
+    public YarnNodeInfos getNodes() {
+        return infos;
+    }
+
+    public void setNodes(YarnNodeInfos infos) {
+        this.infos = infos;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
new file mode 100644
index 0000000..d715a1e
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class YarnNodeInfos {
+
+    private List<YarnNodeInfo> node;
+
+    public List<YarnNodeInfo> getNode() {
+        return node;
+    }
+
+    public void setNode(List<YarnNodeInfo> node) {
+        this.node = node;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
new file mode 100644
index 0000000..ab7b3cc
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver;
+
+public interface TopologyRackResolver {
+
+    /**
+     *resolve rack by hostname
+     * @return rack name
+     */
+    String resolve(String hostname);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
new file mode 100644
index 0000000..8f0aff8
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/DefaultTopologyRackResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.impl;
+
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+
+public class DefaultTopologyRackResolver implements TopologyRackResolver {
+
+    private static final String DEFAULT_RACK_NAME = "default-rack";
+    private String rack;
+
+    public DefaultTopologyRackResolver() {
+        this.rack = DEFAULT_RACK_NAME;
+    }
+
+    public DefaultTopologyRackResolver(String rack) {
+        this.rack = rack;
+    }
+
+    /**
+     * If topology.script.file.name is unset, then the rack name for all hostnames is default-rack
+     * @param hostname
+     * @return rack
+     */
+    @Override
+    public String resolve(String hostname) {
+        return rack;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
new file mode 100644
index 0000000..df0f863
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.impl;
+
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * resolve rack by hostname
+ */
+public class IPMaskTopologyRackResolver implements TopologyRackResolver {
+
+    private final int DEFAULT_RACK_POS = 2;
+    private int rackPos;
+
+    public IPMaskTopologyRackResolver() {
+        this.rackPos = DEFAULT_RACK_POS;
+    }
+
+    public IPMaskTopologyRackResolver(int rackPos) {
+        this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos;
+    }
+
+    @Override
+    public String resolve(String hostname) {
+        String result = null;
+        try {
+            InetAddress address = InetAddress.getByName(hostname);
+            result = "rack" + (int)(address.getAddress()[rackPos] & 0xff);
+        } catch (UnknownHostException e) {
+            //e.printStackTrace();
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
new file mode 100644
index 0000000..f2f3d41
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyCheckAppSpout.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.Map;
+
+public class TopologyCheckAppSpout extends BaseRichSpout {
+
+    private TopologyDataExtractor extractor;
+    private TopologyCheckAppConfig topologyCheckAppConfig;
+
+    private long lastFetchTime;
+    private long fetchInterval;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckAppSpout.class);
+
+    public TopologyCheckAppSpout(TopologyCheckAppConfig topologyCheckAppConfig) {
+        this.topologyCheckAppConfig = topologyCheckAppConfig;
+        this.lastFetchTime = 0;
+        this.fetchInterval = topologyCheckAppConfig.dataExtractorConfig.fetchDataIntervalInSecs * DateTimeUtil.ONESECOND;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(TopologyConstants.SERVICE_NAME_FIELD, TopologyConstants.TOPOLOGY_DATA_FIELD));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.extractor = new TopologyDataExtractor(topologyCheckAppConfig, collector);
+    }
+
+    @Override
+    public void nextTuple() {
+        long currentTime = System.currentTimeMillis();
+        Calendar calendar = Calendar.getInstance();
+        if (currentTime > lastFetchTime + fetchInterval) {
+            calendar.setTimeInMillis(this.lastFetchTime);
+            LOG.info("Last fetch time = {}", calendar.getTime());
+            this.extractor.crawl();
+            lastFetchTime = currentTime;
+        }
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        LOG.warn("ack {}", msgId.toString());
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        LOG.info("ack {}", msgId.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
new file mode 100644
index 0000000..32eae9b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
@@ -0,0 +1,109 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.extractor.TopologyCrawler;
+import org.apache.eagle.topology.extractor.TopologyExtractorFactory;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class TopologyDataExtractor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TopologyDataExtractor.class);
+    private static final int MIN_WAIT_TIME_SECS = 60;
+    private static final double FETCH_TIMEOUT_FACTOR = 0.8;
+
+    private TopologyCheckAppConfig config;
+    private List<TopologyCrawler> extractors;
+    private ExecutorService executorService;
+
+    public TopologyDataExtractor(TopologyCheckAppConfig topologyCheckAppConfig, SpoutOutputCollector collector) {
+        this.config = topologyCheckAppConfig;
+        extractors = getExtractors(collector);
+        executorService = Executors.newFixedThreadPool(topologyCheckAppConfig.dataExtractorConfig.parseThreadPoolSize);
+    }
+
+    public void crawl() {
+        List<Future<?>> futures = new ArrayList<>();
+        for (TopologyCrawler topologyExtractor : extractors) {
+            futures.add(executorService.submit(new DataFetchRunnableWrapper(topologyExtractor)));
+        }
+        long fetchTimeoutSecs = (long) Math.max(config.dataExtractorConfig.fetchDataIntervalInSecs * FETCH_TIMEOUT_FACTOR, MIN_WAIT_TIME_SECS);
+        futures.forEach(future -> {
+            try {
+                future.get(fetchTimeoutSecs, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOGGER.info("Caught an overtime exception with message" + e.getMessage());
+                e.printStackTrace();
+            } catch (ExecutionException e) {
+                e.printStackTrace();
+            } catch (TimeoutException e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+    private List<TopologyCrawler> getExtractors(SpoutOutputCollector collector) {
+        List<TopologyCrawler> extractors = new ArrayList<>();
+        TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+        if (config.dataExtractorConfig.resolverCls != null) {
+            try {
+                rackResolver = config.dataExtractorConfig.resolverCls.newInstance();
+            } catch (InstantiationException e) {
+                e.printStackTrace();
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+        for (TopologyType type : config.topologyTypes) {
+            try {
+                extractors.add(TopologyExtractorFactory.create(type, config, rackResolver, collector));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        return extractors;
+    }
+
+    private static class DataFetchRunnableWrapper implements Runnable {
+
+        private TopologyCrawler topologyExtractor;
+
+        public DataFetchRunnableWrapper(TopologyCrawler topologyExtractor) {
+            this.topologyExtractor = topologyExtractor;
+        }
+
+        @Override
+        public void run() {
+            topologyExtractor.extract();
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
new file mode 100644
index 0000000..490f427
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -0,0 +1,142 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TopologyDataPersistBolt extends BaseRichBolt {
+
+    private TopologyCheckAppConfig config;
+    private IEagleServiceClient client;
+    private OutputCollector collector;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
+
+    public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.config));
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (input == null) {
+            return;
+        }
+        String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
+        TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
+        Set<String> availableHostnames = new HashSet<String>();
+        List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
+        List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
+
+        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
+        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
+
+        String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
+        try {
+            GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
+            if (response.isSuccess() && response.getObj() != null) {
+                for (TopologyBaseAPIEntity entity : response.getObj()) {
+                    if (!availableHostnames.contains(generateKey(entity))) {
+                        entitiesForDeletion.add(entity);
+                    }
+                }
+            }
+            deleteEntities(entitiesForDeletion, serviceName);
+            writeEntities(entitiesToWrite, serviceName);
+            writeEntities(result.getMetrics(), serviceName);
+            this.collector.ack(input);
+        } catch (Exception e) {
+            e.printStackTrace();
+            this.collector.fail(input);
+        }
+    }
+
+    private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
+        for (TopologyBaseAPIEntity entity : entities) {
+            availableHostnames.add(generateKey(entity));
+            entitiesToWrite.add(entity);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
+        try {
+            GenericServiceAPIResponseEntity response = client.delete(entities);
+            if (!response.isSuccess()) {
+                LOG.error("Got exception from eagle service: " + response.getException());
+            } else {
+                LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
+            }
+        } catch (EagleServiceClientException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        entities.clear();
+    }
+
+    private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
+        try {
+            GenericServiceAPIResponseEntity response = client.create(entities);
+            if (!response.isSuccess()) {
+                LOG.error("Got exception from eagle service: " + response.getException());
+            } else {
+                LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
+            }
+        } catch (Exception e) {
+            LOG.error("cannot create entities successfully", e);
+        }
+        entities.clear();
+    }
+
+    private String generateKey(TopologyBaseAPIEntity entity) {
+        return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
+                entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+                entity.getTags().get(TopologyConstants.ROLE_TAG));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
new file mode 100644
index 0000000..55c6183
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class EntityBuilderHelper {
+
+    public static String resolveHostByIp(String ip) {
+        InetAddress addr = null;
+        try {
+            addr = InetAddress.getByName(ip);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        return addr.getHostName();
+    }
+
+    public static GenericMetricEntity metricWrapper(Long timestamp, String metricName, double value, Map<String, String> tags) {
+        GenericMetricEntity metricEntity = new GenericMetricEntity();
+        metricEntity.setTimestamp(timestamp);
+        metricEntity.setTags(tags);
+        metricEntity.setPrefix(metricName);
+        metricEntity.setValue(new double[]{value});
+        return metricEntity;
+    }
+
+    public static GenericMetricEntity generateMetric(String role, double value, String site, long timestamp) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put(TopologyConstants.SITE_TAG, site);
+        tags.put(TopologyConstants.ROLE_TAG, role);
+        String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role);
+        return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
new file mode 100644
index 0000000..d9ac3e0
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXBean.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.util.Map;
+
+public class JMXBean {
+
+    private Map<String, Object> propertyMap;
+
+    public Map<String, Object> getPropertyMap() {
+        return propertyMap;
+    }
+
+    public void setPropertyMap(Map<String, Object> propertyMap) {
+        this.propertyMap = propertyMap;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
new file mode 100644
index 0000000..fa4c71f
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import org.apache.eagle.app.utils.connection.URLConnectionUtils;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Helper class to query Hadoop JMX servlets
+ */
+public final class JMXQueryHelper {
+
+    private static final int DEFAULT_QUERY_TIMEOUT = 30 * 60 * 1000;
+    private static final Logger LOG = LoggerFactory.getLogger(JMXQueryHelper.class);
+
+    public static Map<String, JMXBean> query(String jmxQueryUrl) throws JSONException, IOException {
+        LOG.info("Going to query JMX url: " + jmxQueryUrl);
+        InputStream is = null;
+        try {
+            final URLConnection connection = URLConnectionUtils.getConnection(jmxQueryUrl);
+            connection.setReadTimeout(DEFAULT_QUERY_TIMEOUT);
+            is = connection.getInputStream();
+            return parseStream(is);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        } finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+    }
+
+    public static Map<String, JMXBean> parseStream(InputStream is) {
+        final Map<String, JMXBean> resultMap = new HashMap<String, JMXBean>();
+        final JSONTokener tokener = new JSONTokener(is);
+        final JSONObject jsonBeansObject = new JSONObject(tokener);
+        final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans");
+        int size = jsonArray.length();
+        for (int i = 0; i < size; ++i) {
+            final JSONObject obj = (JSONObject)jsonArray.get(i);
+            final JMXBean bean = new JMXBean();
+            final Map<String, Object> map = new HashMap<String, Object>();
+            bean.setPropertyMap(map);
+            final JSONArray names = obj.names();
+            int jsonSize = names.length();
+            for (int j = 0 ; j < jsonSize; ++j) {
+                final String key = names.getString(j);
+                Object value = obj.get(key);
+                map.put(key, value);
+            }
+            final String nameString = (String) map.get("name");
+            resultMap.put(nameString, bean);
+        }
+        return resultMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
new file mode 100644
index 0000000..48c9133
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.utils;
+
+import java.io.IOException;
+
+public class ServiceNotResponseException extends IOException {
+
+    private static final long serialVersionUID = -2425311876734366496L;
+
+    /**
+     * Default constructor of FeederException
+     */
+    public ServiceNotResponseException() {
+        super();
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param message error message
+     */
+    public ServiceNotResponseException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param message error message
+     * @param cause the cause of the exception
+     *
+     */
+    public ServiceNotResponseException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructor of FeederException
+     *
+     * @param cause the cause of the exception
+     */
+    public ServiceNotResponseException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
new file mode 100644
index 0000000..0a8f1d1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<application>
+    <type>TOPOLOGY_HEALTH_CHECK_APP</type>
+    <name>Topology Health Check</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.topology.TopologyCheckApp</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.topology.TopologyCheckApp -->
+        <property>
+            <name>dataExtractorConfig.site</name>
+            <displayName>site</displayName>
+            <description>Site</description>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>dataExtractorConfig.fetchDataIntervalInSecs</name>
+            <displayName>FetchDataIntervalInSecs</displayName>
+            <description>Fetch Data Interval in Secs</description>
+            <value>300</value>
+        </property>
+        <property>
+            <name>dataExtractorConfig.parseThreadPoolSize</name>
+            <displayName>parseThreadPoolSize</displayName>
+            <description>Parser Thread Pool Size</description>
+            <value>5</value>
+        </property>
+        <property>
+            <name>dataExtractorConfig.numDataFetcherSpout</name>
+            <displayName>numDataFetcherSpout</displayName>
+            <description>Spout Task Number</description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>dataExtractorConfig.numEntityPersistBolt</name>
+            <displayName>numEntityPersistBolt</displayName>
+            <description>Bolt Task Number</description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.zkQuorum</name>
+            <displayName>zkQuorum</displayName>
+            <description>Zookeeper Quorum</description>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.zkZnodeParent</name>
+            <displayName>zkZnodeParent</displayName>
+            <description>Hbase Zookeeper Znode Parent Root</description>
+            <value>/hbase-unsecure</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.zkPropertyClientPort</name>
+            <displayName>zkPropertyClientPort</displayName>
+            <description>Hbase Zookeeper Client Port</description>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.kerberos.master.principal</name>
+            <displayName>hbaseMasterPrincipal</displayName>
+            <description>Hbase Master Principal</description>
+            <value>hadoop/_HOST@EXAMPLE.COM</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name>
+            <displayName>eagleKeytab</displayName>
+            <description>Eagle keytab</description>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hbase.kerberos.master.principal</name>
+            <displayName>hbaseMasterPrincipal</displayName>
+            <description>Hbase Master Principal</description>
+            <value>hadoop/_HOST@EXAMPLE.COM</value>
+        </property>
+
+        <property>
+            <name>dataSourceConfig.hdfs.namenodeUrl</name>
+            <displayName>hdfsNamenodeUrl</displayName>
+            <description>Hdfs Namenode Web URL</description>
+            <value>http://sandbox.hortonworks.com:50070</value>
+        </property>
+
+        <property>
+            <name>dataSourceConfig.mr.rmUrl</name>
+            <displayName>resourceManagerUrl</displayName>
+            <description>Resource Manager URL</description>
+            <value>http://sandbox.hortonworks.com:8088</value>
+        </property>
+
+        <property>
+            <name>dataSourceConfig.mr.historyServerUrl</name>
+            <displayName>historyServerUrl</displayName>
+            <description>History Server URL</description>
+            <value></value>
+        </property>
+
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>localhost</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9090</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.basePath</name>
+            <description>eagleProps.eagleService.basePath</description>
+            <value>/rest</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.readTimeOutSeconds</name>
+            <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName>
+            <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
+            <value>2</value>
+        </property>
+
+    </configuration>
+    <docs>
+        <install>
+        </install>
+        <uninstall>
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..4e08313
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+org.apache.eagle.topology.TopologyCheckAppProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
new file mode 100644
index 0000000..cbc7ac1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  appId : "topologyCheckApp",
+  mode : "LOCAL",
+  workers : 1,
+
+  dataExtractorConfig : {
+    "site": "sandbox",
+    "fetchDataIntervalInSecs": 300,
+    "parseThreadPoolSize": 5,
+    "numDataFetcherSpout" : 1,
+    "numEntityPersistBolt" : 1,
+    "rackResolverCls" : "org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver"
+  }
+
+  dataSourceConfig : {
+    hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+    hbase: {
+      zkQuorum: "sandbox.hortonworks.com",
+      zkPropertyClientPort : "2181",
+      zkZnodeParent: "/hbase-unsecure",
+      zkRetryTimes : "5",
+      kerberos : {
+        master.principal : "hadoop/_HOST@EXAMPLE.COM"
+        eagle.principal: "", #if not need, then empty
+        eagle.keytab: ""
+      }
+    },
+    mr: {
+      rmUrl: "http://sandbox.hortonworks.com:8088",
+      historyServerUrl : "http://sandbox.hortonworks.com:19888"  #if not need, then empty
+    }
+  }
+
+  eagleProps : {
+    "mailHost" : "abc.com",
+    "mailDebug" : "true",
+    eagleService.host:"localhost",
+    eagleService.port: 9090,
+    eagleService.username: "admin",
+    eagleService.password : "secret",
+    eagleService.basePath : "/rest",
+    eagleService.readTimeOutSeconds : 20,
+    eagleService.maxFlushNum : 500
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
new file mode 100644
index 0000000..6956ef1
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHbaseTopologyCrawler.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.topology.extractor.hbase.HbaseTopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestHbaseTopologyCrawler {
+
+    @Test @Ignore
+    public void test() {
+        Config config = ConfigFactory.load();
+
+        TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config);
+        TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+        HbaseTopologyCrawler crawler = new HbaseTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
+        crawler.extract();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9e873770/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
new file mode 100644
index 0000000..5069a3b
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestHdfsTopologyCrawler.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.topology.extractor.hdfs.HdfsTopologyCrawler;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestHdfsTopologyCrawler {
+
+    @Test @Ignore
+    public void test() {
+        Config config = ConfigFactory.load();
+
+        TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.getInstance(config);
+        TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
+        HdfsTopologyCrawler crawler = new HdfsTopologyCrawler(topologyCheckAppConfig, rackResolver, null);
+        crawler.extract();
+    }
+}



Mime
View raw message