eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/5] incubator-eagle git commit: [EAGLE-806] Integrate Metric Process and Persistence with Application Framework
Date Wed, 30 Nov 2016 01:38:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
index f3eac23..94b3727 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hbase/HbaseTopologyEntityParser.java
@@ -1,165 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.hbase;
-
-import org.apache.eagle.app.utils.HadoopSecurityUtil;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.EntityBuilderHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-
-public class HbaseTopologyEntityParser implements TopologyEntityParser {
-
-    private Configuration hBaseConfiguration;
-    private String site;
-    private Boolean kerberosEnable = false;
-    private TopologyRackResolver rackResolver;
-
-    public HbaseTopologyEntityParser(String site, TopologyCheckAppConfig.HBaseConfig hBaseConfig, TopologyRackResolver resolver) {
-        this.site = site;
-        this.rackResolver = resolver;
-        this.hBaseConfiguration = HBaseConfiguration.create();
-        this.hBaseConfiguration.set("hbase.zookeeper.quorum", hBaseConfig.zkQuorum);
-        this.hBaseConfiguration.set("hbase.zookeeper.property.clientPort", hBaseConfig.zkClientPort);
-        this.hBaseConfiguration.set("zookeeper.znode.parent", hBaseConfig.zkRoot);
-        this.hBaseConfiguration.set("hbase.client.retries.number", hBaseConfig.zkRetryTimes);
-        // kerberos authentication
-        if (hBaseConfig.eaglePrincipal != null && hBaseConfig.eagleKeytab != null
-            && !hBaseConfig.eaglePrincipal.isEmpty() && !hBaseConfig.eagleKeytab.isEmpty()) {
-            this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_PRINCIPAL_KEY, hBaseConfig.eaglePrincipal);
-            this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_KEYTAB_FILE_KEY, hBaseConfig.eagleKeytab);
-            this.kerberosEnable = true;
-            this.hBaseConfiguration.set("hbase.security.authentication", "kerberos");
-            this.hBaseConfiguration.set("hbase.master.kerberos.principal", hBaseConfig.hbaseMasterPrincipal);
-        }
-    }
-
-    private HBaseAdmin getHBaseAdmin() throws IOException {
-        if (this.kerberosEnable) {
-            HadoopSecurityUtil.login(hBaseConfiguration);
-        }
-        return new HBaseAdmin(this.hBaseConfiguration);
-    }
-
-    @Override
-    public TopologyEntityParserResult parse(long timestamp) throws IOException {
-        long deadServers = 0;
-        long liveServers = 0;
-        TopologyEntityParserResult result = new TopologyEntityParserResult();
-        HBaseAdmin admin = null;
-        try {
-            admin = getHBaseAdmin();
-            ClusterStatus status = admin.getClusterStatus();
-            deadServers = status.getDeadServers();
-            liveServers = status.getServersSize();
-            result.setVersion(HadoopVersion.V2);
-            for (ServerName liveServer : status.getServers()) {
-                ServerLoad load = status.getLoad(liveServer);
-                result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp));
-            }
-            for (ServerName deadServer : status.getDeadServerNames()) {
-                ServerLoad load = status.getLoad(deadServer);
-                result.getSlaveNodes().add(parseServer(deadServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_DEAD_STATUS, timestamp));
-            }
-            ServerName master = status.getMaster();
-            if (master != null) {
-                ServerLoad load = status.getLoad(master);
-                result.getMasterNodes().add(parseServer(master, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_ACTIVE_STATUS, timestamp));
-            }
-            for (ServerName backupMaster : status.getBackupMasters()) {
-                ServerLoad load = status.getLoad(backupMaster);
-                result.getMasterNodes().add(parseServer(backupMaster, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_STANDBY_STATUS, timestamp));
-            }
-            double liveRatio = liveServers * 1d / (liveServers + deadServers);
-            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp));
-            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp));
-            return result;
-        } catch (RuntimeException e) {
-            e.printStackTrace();
-        } finally {
-            if (admin != null) {
-                try {
-                    admin.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        return result;
-    }
-
-    private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) {
-        if (serverName == null) {
-            return null;
-        }
-        HBaseServiceTopologyAPIEntity entity = createEntity(role, serverName.getHostname(), timestamp);
-        parseServerLoad(entity, serverLoad);
-        entity.setStatus(status);
-        return entity;
-    }
-
-    private void parseServerLoad(HBaseServiceTopologyAPIEntity entity, ServerLoad load) {
-        if (load == null) {
-            return;
-        }
-        entity.setMaxHeapMB(load.getMaxHeapMB());
-        entity.setUsedHeapMB(load.getUsedHeapMB());
-        entity.setNumRegions(load.getNumberOfRegions());
-        entity.setNumRequests(load.getNumberOfRequests());
-    }
-
-    private HBaseServiceTopologyAPIEntity createEntity(String roleType, String hostname, long timestamp) {
-        HBaseServiceTopologyAPIEntity entity = new HBaseServiceTopologyAPIEntity();
-        Map<String, String> tags = new HashMap<String, String>();
-        entity.setTags(tags);
-        tags.put(SITE_TAG, site);
-        tags.put(ROLE_TAG, roleType);
-        tags.put(HOSTNAME_TAG, hostname);
-        String rack = rackResolver.resolve(hostname);
-        tags.put(RACK_TAG, rack);
-        entity.setLastUpdateTime(timestamp);
-        entity.setTimestamp(timestamp);
-        return entity;
-    }
-
-    @Override
-    public TopologyConstants.TopologyType getTopologyType() {
-        return TopologyType.HBASE;
-    }
-
-    @Override
-    public TopologyConstants.HadoopVersion getHadoopVersion() {
-        return HadoopVersion.V2;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hbase;
+
+import org.apache.eagle.app.utils.HadoopSecurityUtil;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class HbaseTopologyEntityParser implements TopologyEntityParser {
+
+    private Configuration hBaseConfiguration;
+    private String site;
+    private Boolean kerberosEnable = false;
+    private TopologyRackResolver rackResolver;
+
+    public HbaseTopologyEntityParser(String site, TopologyCheckAppConfig.HBaseConfig hBaseConfig, TopologyRackResolver resolver) {
+        this.site = site;
+        this.rackResolver = resolver;
+        this.hBaseConfiguration = HBaseConfiguration.create();
+        this.hBaseConfiguration.set("hbase.zookeeper.quorum", hBaseConfig.zkQuorum);
+        this.hBaseConfiguration.set("hbase.zookeeper.property.clientPort", hBaseConfig.zkClientPort);
+        this.hBaseConfiguration.set("zookeeper.znode.parent", hBaseConfig.zkRoot);
+        this.hBaseConfiguration.set("hbase.client.retries.number", hBaseConfig.zkRetryTimes);
+        // kerberos authentication
+        if (hBaseConfig.eaglePrincipal != null && hBaseConfig.eagleKeytab != null
+            && !hBaseConfig.eaglePrincipal.isEmpty() && !hBaseConfig.eagleKeytab.isEmpty()) {
+            this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_PRINCIPAL_KEY, hBaseConfig.eaglePrincipal);
+            this.hBaseConfiguration.set(HadoopSecurityUtil.EAGLE_KEYTAB_FILE_KEY, hBaseConfig.eagleKeytab);
+            this.kerberosEnable = true;
+            this.hBaseConfiguration.set("hbase.security.authentication", "kerberos");
+            this.hBaseConfiguration.set("hbase.master.kerberos.principal", hBaseConfig.hbaseMasterPrincipal);
+        }
+    }
+
+    private HBaseAdmin getHBaseAdmin() throws IOException {
+        if (this.kerberosEnable) {
+            HadoopSecurityUtil.login(hBaseConfiguration);
+        }
+        return new HBaseAdmin(this.hBaseConfiguration);
+    }
+
+    @Override
+    public TopologyEntityParserResult parse(long timestamp) throws IOException {
+        long deadServers = 0;
+        long liveServers = 0;
+        TopologyEntityParserResult result = new TopologyEntityParserResult();
+        HBaseAdmin admin = null;
+        try {
+            admin = getHBaseAdmin();
+            ClusterStatus status = admin.getClusterStatus();
+            deadServers = status.getDeadServers();
+            liveServers = status.getServersSize();
+            result.setVersion(HadoopVersion.V2);
+            for (ServerName liveServer : status.getServers()) {
+                ServerLoad load = status.getLoad(liveServer);
+                result.getSlaveNodes().add(parseServer(liveServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_LIVE_STATUS, timestamp));
+            }
+            for (ServerName deadServer : status.getDeadServerNames()) {
+                ServerLoad load = status.getLoad(deadServer);
+                result.getSlaveNodes().add(parseServer(deadServer, load, TopologyConstants.REGIONSERVER_ROLE, TopologyConstants.REGIONSERVER_DEAD_STATUS, timestamp));
+            }
+            ServerName master = status.getMaster();
+            if (master != null) {
+                ServerLoad load = status.getLoad(master);
+                result.getMasterNodes().add(parseServer(master, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_ACTIVE_STATUS, timestamp));
+            }
+            for (ServerName backupMaster : status.getBackupMasters()) {
+                ServerLoad load = status.getLoad(backupMaster);
+                result.getMasterNodes().add(parseServer(backupMaster, load, TopologyConstants.HMASTER_ROLE, TopologyConstants.HMASTER_STANDBY_STATUS, timestamp));
+            }
+            double liveRatio = liveServers * 1d / (liveServers + deadServers);
+            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.REGIONSERVER_ROLE, liveRatio, site, timestamp));
+            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HMASTER_ROLE, 1d, site, timestamp));
+            return result;
+        } catch (RuntimeException e) {
+            e.printStackTrace();
+        } finally {
+            if (admin != null) {
+                try {
+                    admin.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return result;
+    }
+
+    private HBaseServiceTopologyAPIEntity parseServer(ServerName serverName, ServerLoad serverLoad, String role, String status, long timestamp) {
+        if (serverName == null) {
+            return null;
+        }
+        HBaseServiceTopologyAPIEntity entity = createEntity(role, serverName.getHostname(), timestamp);
+        parseServerLoad(entity, serverLoad);
+        entity.setStatus(status);
+        return entity;
+    }
+
+    private void parseServerLoad(HBaseServiceTopologyAPIEntity entity, ServerLoad load) {
+        if (load == null) {
+            return;
+        }
+        entity.setMaxHeapMB(load.getMaxHeapMB());
+        entity.setUsedHeapMB(load.getUsedHeapMB());
+        entity.setNumRegions(load.getNumberOfRegions());
+        entity.setNumRequests(load.getNumberOfRequests());
+    }
+
+    private HBaseServiceTopologyAPIEntity createEntity(String roleType, String hostname, long timestamp) {
+        HBaseServiceTopologyAPIEntity entity = new HBaseServiceTopologyAPIEntity();
+        Map<String, String> tags = new HashMap<String, String>();
+        entity.setTags(tags);
+        tags.put(SITE_TAG, site);
+        tags.put(ROLE_TAG, roleType);
+        tags.put(HOSTNAME_TAG, hostname);
+        String rack = rackResolver.resolve(hostname);
+        tags.put(RACK_TAG, rack);
+        entity.setLastUpdateTime(timestamp);
+        entity.setTimestamp(timestamp);
+        return entity;
+    }
+
+    @Override
+    public TopologyConstants.TopologyType getTopologyType() {
+        return TopologyType.HBASE;
+    }
+
+    @Override
+    public TopologyConstants.HadoopVersion getHadoopVersion() {
+        return HadoopVersion.V2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
index 3cb55e9..79277a4 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java
@@ -1,290 +1,289 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.hdfs;
-
-import org.apache.eagle.app.utils.PathResolverHelper;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.*;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
-
-public class HdfsTopologyEntityParser implements TopologyEntityParser {
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
-    private String[] namenodeUrls;
-    private String site;
-    private TopologyRackResolver rackResolver;
-
-    private static final String JMX_URL = "/jmx?anonymous=true";
-    private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
-    private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
-
-    private static final String HA_STATE = "tag.HAState";
-    private static final String HA_NAME = "tag.Hostname";
-    private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
-    private static final String CAPACITY_USED_GB = "CapacityUsedGB";
-    private static final String BLOCKS_TOTAL = "BlocksTotal";
-    private static final String LIVE_NODES = "LiveNodes";
-    private static final String DEAD_NODES = "DeadNodes";
-
-    private static final String JN_STATUS = "NameJournalStatus";
-    private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
-    private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
-
-    private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
-    private static final String DATA_NODE_USED_SPACE = "usedSpace";
-    private static final String DATA_NODE_CAPACITY = "capacity";
-    private static final String DATA_NODE_ADMIN_STATE = "adminState";
-    private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
-    private static final String DATA_NODE_VERSION = "version";
-    private static final String NAME_NODE_VERSION = "Version";
-
-    private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
-    private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
-
-    private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
-    private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
-
-    private static final double TB = 1024 * 1024 * 1024 * 1024;
-
-    public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
-        this.namenodeUrls = hdfsConfig.namenodeUrls;
-        this.site = site;
-        this.rackResolver = rackResolver;
-    }
-
-    @Override
-    public TopologyEntityParserResult parse(long timestamp) throws IOException {
-        final TopologyEntityParserResult result = new TopologyEntityParserResult();
-        result.setVersion(TopologyConstants.HadoopVersion.V2);
-        int numNamenode = 0;
-        for (String url : namenodeUrls) {
-            try {
-                final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
-                result.getMasterNodes().add(namenodeEntity);
-                numNamenode++;
-                if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
-                    String namenodeVersion = createSlaveNodeEntities(url, timestamp, result);
-                    namenodeEntity.setVersion(namenodeVersion);
-                }
-            } catch (RuntimeException ex) {
-                ex.printStackTrace();
-            } catch (IOException e) {
-                LOG.warn("Catch an IOException with url: {}", url);
-            }
-        }
-        double value = numNamenode * 1d / namenodeUrls.length;
-        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
-        return result;
-    }
-
-    private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
-        final String urlString = buildFSNamesystemURL(url);
-        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
-        final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
-
-        if (bean == null || bean.getPropertyMap() == null) {
-            throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
-        }
-
-        final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
-        HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
-        final String state = (String) bean.getPropertyMap().get(HA_STATE);
-        result.setStatus(state);
-        final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
-        result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
-        final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
-        result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
-        final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
-        result.setNumBlocks(Integer.toString(blocksTotal));
-        return result;
-    }
-
-    private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
-        final String urlString = buildNamenodeInfo(url);
-        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
-        final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
-        if (bean == null || bean.getPropertyMap() == null) {
-            throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
-        }
-        createAllDataNodeEntities(bean, updateTime, result);
-        createAllJournalNodeEntities(bean, updateTime, result);
-        return (String) bean.getPropertyMap().get(NAME_NODE_VERSION);
-    }
-
-    private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
-        if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
-            return;
-        }
-        String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
-        JSONObject jsonObject = new JSONObject(jnInfoString);
-        long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
-
-        String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
-        JSONArray jsonArray = new JSONArray(journalnodeString);
-        JSONObject jsonMap = (JSONObject) jsonArray.get(0);
-
-        Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
-        String manager = jsonMap.getString("manager");
-        Pattern qjm = Pattern.compile(QJM_PATTERN);
-        Matcher jpmMatcher = qjm.matcher(manager);
-        while (jpmMatcher.find()) {
-            String ip = jpmMatcher.group(1);
-            String hostname = EntityBuilderHelper.resolveHostByIp(ip);
-            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
-            entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
-            journalNodesMap.put(ip, entity);
-        }
-        if (journalNodesMap.isEmpty()) {
-            LOG.warn("Fail to find journal node info in JMX");
-            return;
-        }
-
-        String stream = jsonMap.getString("stream");
-        Pattern status = Pattern.compile(STATUS_PATTERN);
-        Matcher statusMatcher = status.matcher(stream);
-        long numLiveJournalNodes = 0;
-        while (statusMatcher.find()) {
-            numLiveJournalNodes++;
-            String ip = statusMatcher.group(1);
-            if (journalNodesMap.containsKey(ip)) {
-                long txid = Long.parseLong(statusMatcher.group(2));
-                journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
-                journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
-            }
-        }
-        result.getMasterNodes().addAll(journalNodesMap.values());
-
-        double value = numLiveJournalNodes * 1d / journalNodesMap.size();
-        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
-    }
-
-    private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
-        int numLiveNodes = 0;
-        int numLiveDecommNodes = 0;
-        int numDeadNodes = 0;
-        int numDeadDecommNodes = 0;
-
-        String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
-        JSONTokener tokener = new JSONTokener(deadNodesStrings);
-        JSONObject jsonNodesObject = new JSONObject(tokener);
-        final JSONArray deadNodes = jsonNodesObject.names();
-        for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
-            final String hostname = deadNodes.getString(i);
-            final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
-            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
-            if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
-                ++numDeadDecommNodes;
-                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
-            } else {
-                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
-            }
-            ++numDeadNodes;
-            result.getSlaveNodes().add(entity);
-        }
-        LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
-
-        String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
-        tokener = new JSONTokener(liveNodesStrings);
-        jsonNodesObject = new JSONObject(tokener);
-        final JSONArray liveNodes = jsonNodesObject.names();
-        for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
-            final String hostname = liveNodes.getString(i);
-            final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
-
-            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
-            final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
-            entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB));
-            final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
-            entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB));
-            final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
-            entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
-            if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
-                final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
-                entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
-            }
-            final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
-            if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
-                ++numLiveDecommNodes;
-                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
-            } else {
-                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
-            }
-            entity.setVersion(String.valueOf(liveNode.get(DATA_NODE_VERSION)));
-            numLiveNodes++;
-            result.getSlaveNodes().add(entity);
-        }
-        LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
-
-        double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
-        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
-    }
-
-    private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
-        HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
-        entity.setTimestamp(updateTime);
-        entity.setLastUpdateTime(updateTime);
-        Map<String, String> tags = new HashMap<String, String>();
-        entity.setTags(tags);
-        tags.put(SITE_TAG, site);
-        tags.put(ROLE_TAG, roleType);
-        tags.put(HOSTNAME_TAG, hostname);
-        String rack = rackResolver.resolve(hostname);
-        tags.put(RACK_TAG, rack);
-        return entity;
-    }
-
-    private String buildFSNamesystemURL(String url) {
-        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
-    }
-
-    private String buildNamenodeInfo(String url) {
-        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
-    }
-
-    @Override
-    public TopologyConstants.TopologyType getTopologyType() {
-        return TopologyConstants.TopologyType.HDFS;
-    }
-
-    @Override
-    public TopologyConstants.HadoopVersion getHadoopVersion() {
-        return TopologyConstants.HadoopVersion.V2;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.hdfs;
+
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.*;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+
+public class HdfsTopologyEntityParser implements TopologyEntityParser {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class);
+    private String[] namenodeUrls;
+    private String site;
+    private TopologyRackResolver rackResolver;
+
+    private static final String JMX_URL = "/jmx?anonymous=true";
+    private static final String JMX_FS_NAME_SYSTEM_BEAN_NAME = "Hadoop:service=NameNode,name=FSNamesystem";
+    private static final String JMX_NAMENODE_INFO = "Hadoop:service=NameNode,name=NameNodeInfo";
+
+    private static final String HA_STATE = "tag.HAState";
+    private static final String HA_NAME = "tag.Hostname";
+    private static final String CAPACITY_TOTAL_GB = "CapacityTotalGB";
+    private static final String CAPACITY_USED_GB = "CapacityUsedGB";
+    private static final String BLOCKS_TOTAL = "BlocksTotal";
+    private static final String LIVE_NODES = "LiveNodes";
+    private static final String DEAD_NODES = "DeadNodes";
+
+    private static final String JN_STATUS = "NameJournalStatus";
+    private static final String JN_TRANSACTION_INFO = "JournalTransactionInfo";
+    private static final String LAST_TX_ID = "LastAppliedOrWrittenTxId";
+
+    private static final String DATA_NODE_NUM_BLOCKS = "numBlocks";
+    private static final String DATA_NODE_USED_SPACE = "usedSpace";
+    private static final String DATA_NODE_CAPACITY = "capacity";
+    private static final String DATA_NODE_ADMIN_STATE = "adminState";
+    private static final String DATA_NODE_FAILED_VOLUMN = "volfails";
+    private static final String DATA_NODE_VERSION = "version";
+    private static final String NAME_NODE_VERSION = "Version";
+
+    private static final String DATA_NODE_DECOMMISSIONED = "Decommissioned";
+    private static final String DATA_NODE_DECOMMISSIONED_STATE = "decommissioned";
+
+    private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)";
+    private static final String QJM_PATTERN = "([\\d\\.]+):\\d+";
+
+    private static final double TB = 1024 * 1024 * 1024 * 1024;
+
+    public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) {
+        this.namenodeUrls = hdfsConfig.namenodeUrls;
+        this.site = site;
+        this.rackResolver = rackResolver;
+    }
+
+    @Override
+    public TopologyEntityParserResult parse(long timestamp) throws IOException {
+        final TopologyEntityParserResult result = new TopologyEntityParserResult();
+        result.setVersion(TopologyConstants.HadoopVersion.V2);
+        int numNamenode = 0;
+        for (String url : namenodeUrls) {
+            try {
+                final HdfsServiceTopologyAPIEntity namenodeEntity = createNamenodeEntity(url, timestamp);
+                result.getMasterNodes().add(namenodeEntity);
+                numNamenode++;
+                if (namenodeEntity.getStatus().equalsIgnoreCase(NAME_NODE_ACTIVE_STATUS)) {
+                    String namenodeVersion = createSlaveNodeEntities(url, timestamp, result);
+                    namenodeEntity.setVersion(namenodeVersion);
+                }
+            } catch (RuntimeException ex) {
+                ex.printStackTrace();
+            } catch (IOException e) {
+                LOG.warn("Catch an IOException with url: {}", url);
+            }
+        }
+        double value = numNamenode * 1d / namenodeUrls.length;
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NAME_NODE_ROLE, value, site, timestamp));
+        return result;
+    }
+
+    private HdfsServiceTopologyAPIEntity createNamenodeEntity(String url, long updateTime) throws JSONException, IOException {
+        final String urlString = buildFSNamesystemURL(url);
+        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+        final JMXBean bean = jmxBeanMap.get(JMX_FS_NAME_SYSTEM_BEAN_NAME);
+
+        if (bean == null || bean.getPropertyMap() == null) {
+            throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!");
+        }
+
+        final String hostname = (String) bean.getPropertyMap().get(HA_NAME);
+        HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime);
+        final String state = (String) bean.getPropertyMap().get(HA_STATE);
+        result.setStatus(state);
+        final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB);
+        result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024));
+        final Double capacityUsedGB = (Double) bean.getPropertyMap().get(CAPACITY_USED_GB);
+        result.setUsedCapacityTB(Double.toString(capacityUsedGB / 1024));
+        final Integer blocksTotal = (Integer) bean.getPropertyMap().get(BLOCKS_TOTAL);
+        result.setNumBlocks(Integer.toString(blocksTotal));
+        return result;
+    }
+
+    private String createSlaveNodeEntities(String url, long updateTime, TopologyEntityParserResult result) throws IOException {
+        final String urlString = buildNamenodeInfo(url);
+        final Map<String, JMXBean> jmxBeanMap = JMXQueryHelper.query(urlString);
+        final JMXBean bean = jmxBeanMap.get(JMX_NAMENODE_INFO);
+        if (bean == null || bean.getPropertyMap() == null) {
+            throw new ServiceNotResponseException("Invalid JMX format, NameNodeInfo bean is null!");
+        }
+        createAllDataNodeEntities(bean, updateTime, result);
+        createAllJournalNodeEntities(bean, updateTime, result);
+        return (String) bean.getPropertyMap().get(NAME_NODE_VERSION);
+    }
+
+    private void createAllJournalNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws UnknownHostException {
+        if (bean.getPropertyMap().get(JN_TRANSACTION_INFO) == null || bean.getPropertyMap().get(JN_STATUS) == null) {
+            return;
+        }
+        String jnInfoString = (String) bean.getPropertyMap().get(JN_TRANSACTION_INFO);
+        JSONObject jsonObject = new JSONObject(jnInfoString);
+        long lastTxId = Long.parseLong(jsonObject.getString(LAST_TX_ID));
+
+        String journalnodeString = (String) bean.getPropertyMap().get(JN_STATUS);
+        JSONArray jsonArray = new JSONArray(journalnodeString);
+        JSONObject jsonMap = (JSONObject) jsonArray.get(0);
+
+        Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>();
+        String manager = jsonMap.getString("manager");
+        Pattern qjm = Pattern.compile(QJM_PATTERN);
+        Matcher jpmMatcher = qjm.matcher(manager);
+        while (jpmMatcher.find()) {
+            String ip = jpmMatcher.group(1);
+            String hostname = EntityBuilderHelper.resolveHostByIp(ip);
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.JOURNAL_NODE_ROLE, hostname, updateTime);
+            entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+            journalNodesMap.put(ip, entity);
+        }
+        if (journalNodesMap.isEmpty()) {
+            LOG.warn("Fail to find journal node info in JMX");
+            return;
+        }
+
+        String stream = jsonMap.getString("stream");
+        Pattern status = Pattern.compile(STATUS_PATTERN);
+        Matcher statusMatcher = status.matcher(stream);
+        long numLiveJournalNodes = 0;
+        while (statusMatcher.find()) {
+            numLiveJournalNodes++;
+            String ip = statusMatcher.group(1);
+            if (journalNodesMap.containsKey(ip)) {
+                long txid = Long.parseLong(statusMatcher.group(2));
+                journalNodesMap.get(ip).setWrittenTxidDiff(lastTxId - txid);
+                journalNodesMap.get(ip).setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+            }
+        }
+        result.getMasterNodes().addAll(journalNodesMap.values());
+
+        double value = numLiveJournalNodes * 1d / journalNodesMap.size();
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.JOURNAL_NODE_ROLE, value, site, updateTime));
+    }
+
+    private void createAllDataNodeEntities(JMXBean bean, long updateTime, TopologyEntityParserResult result) throws JSONException, IOException {
+        int numLiveNodes = 0;
+        int numLiveDecommNodes = 0;
+        int numDeadNodes = 0;
+        int numDeadDecommNodes = 0;
+
+        String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES);
+        JSONTokener tokener = new JSONTokener(deadNodesStrings);
+        JSONObject jsonNodesObject = new JSONObject(tokener);
+        final JSONArray deadNodes = jsonNodesObject.names();
+        for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) {
+            final String hostname = deadNodes.getString(i);
+            final JSONObject deadNode = jsonNodesObject.getJSONObject(hostname);
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
+            if (deadNode.getBoolean(DATA_NODE_DECOMMISSIONED_STATE)) {
+                ++numDeadDecommNodes;
+                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_DECOMMISSIONED_STATUS);
+            } else {
+                entity.setStatus(TopologyConstants.DATA_NODE_DEAD_STATUS);
+            }
+            ++numDeadNodes;
+            result.getSlaveNodes().add(entity);
+        }
+        LOG.info("Dead nodes " + numDeadNodes + ", dead but decommissioned nodes: " + numDeadDecommNodes);
+
+        String liveNodesStrings = (String) bean.getPropertyMap().get(LIVE_NODES);
+        tokener = new JSONTokener(liveNodesStrings);
+        jsonNodesObject = new JSONObject(tokener);
+        final JSONArray liveNodes = jsonNodesObject.names();
+        for (int i = 0; liveNodes != null && i < liveNodes.length(); ++i) {
+            final String hostname = liveNodes.getString(i);
+            final JSONObject liveNode = jsonNodesObject.getJSONObject(hostname);
+
+            HdfsServiceTopologyAPIEntity entity = createHdfsServiceEntity(TopologyConstants.DATA_NODE_ROLE, EntityBuilderHelper.getValidHostName(hostname), updateTime);
+            final Number configuredCapacity = (Number) liveNode.get(DATA_NODE_CAPACITY);
+            entity.setConfiguredCapacityTB(Double.toString(configuredCapacity.doubleValue() / TB));
+            final Number capacityUsed = (Number) liveNode.get(DATA_NODE_USED_SPACE);
+            entity.setUsedCapacityTB(Double.toString(capacityUsed.doubleValue() / TB));
+            final Number blocksTotal = (Number) liveNode.get(DATA_NODE_NUM_BLOCKS);
+            entity.setNumBlocks(Double.toString(blocksTotal.doubleValue()));
+            if (liveNode.has(DATA_NODE_FAILED_VOLUMN)) {
+                final Number volFails = (Number) liveNode.get(DATA_NODE_FAILED_VOLUMN);
+                entity.setNumFailedVolumes(Double.toString(volFails.doubleValue()));
+            }
+            final String adminState = liveNode.getString(DATA_NODE_ADMIN_STATE);
+            if (DATA_NODE_DECOMMISSIONED.equalsIgnoreCase(adminState)) {
+                ++numLiveDecommNodes;
+                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_DECOMMISSIONED_STATUS);
+            } else {
+                entity.setStatus(TopologyConstants.DATA_NODE_LIVE_STATUS);
+            }
+            entity.setVersion(String.valueOf(liveNode.get(DATA_NODE_VERSION)));
+            numLiveNodes++;
+            result.getSlaveNodes().add(entity);
+        }
+        LOG.info("Live nodes " + numLiveNodes + ", live but decommissioned nodes: " + numLiveDecommNodes);
+
+        double value = numLiveNodes * 1.0d / result.getSlaveNodes().size();
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.DATA_NODE_ROLE, value, site, updateTime));
+    }
+
+    private HdfsServiceTopologyAPIEntity createHdfsServiceEntity(String roleType, String hostname, long updateTime) {
+        HdfsServiceTopologyAPIEntity entity = new HdfsServiceTopologyAPIEntity();
+        entity.setTimestamp(updateTime);
+        entity.setLastUpdateTime(updateTime);
+        Map<String, String> tags = new HashMap<String, String>();
+        entity.setTags(tags);
+        tags.put(SITE_TAG, site);
+        tags.put(ROLE_TAG, roleType);
+        tags.put(HOSTNAME_TAG, hostname);
+        String rack = rackResolver.resolve(hostname);
+        tags.put(RACK_TAG, rack);
+        return entity;
+    }
+
+    private String buildFSNamesystemURL(String url) {
+        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_FS_NAME_SYSTEM_BEAN_NAME);
+    }
+
+    private String buildNamenodeInfo(String url) {
+        return PathResolverHelper.buildUrlPath(url, JMX_URL + "&qry=" + JMX_NAMENODE_INFO);
+    }
+
+    @Override
+    public TopologyConstants.TopologyType getTopologyType() {
+        return TopologyConstants.TopologyType.HDFS;
+    }
+
+    @Override
+    public TopologyConstants.HadoopVersion getHadoopVersion() {
+        return TopologyConstants.HadoopVersion.V2;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
index 52148b7..447686c 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/MRTopologyEntityParser.java
@@ -1,219 +1,219 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.topology.extractor.mr;
-
-import org.apache.eagle.app.utils.AppConstants;
-import org.apache.eagle.app.utils.PathResolverHelper;
-import org.apache.eagle.app.utils.connection.InputStreamUtils;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.extractor.TopologyEntityParser;
-import org.apache.eagle.topology.resolver.TopologyRackResolver;
-import org.apache.eagle.topology.utils.EntityBuilderHelper;
-import org.apache.eagle.topology.utils.ServiceNotResponseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ConnectException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-
-import static org.apache.eagle.topology.TopologyConstants.*;
-
-public class MRTopologyEntityParser implements TopologyEntityParser {
-
-    private String[] rmUrls;
-    private String historyServerUrl;
-    private String site;
-    private TopologyRackResolver rackResolver;
-
-    private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
-    private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
-    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
-
-    static {
-        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
-    }
-
-    public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
-        this.site = site;
-        this.rmUrls = config.rmUrls;
-        this.historyServerUrl = config.historyServerUrl;
-        this.rackResolver = rackResolver;
-    }
-
-    @Override
-    public TopologyConstants.HadoopVersion getHadoopVersion() {
-        return TopologyConstants.HadoopVersion.V2;
-    }
-
-    @Override
-    public TopologyConstants.TopologyType getTopologyType() {
-        return TopologyConstants.TopologyType.MR;
-    }
-
-    @Override
-    public TopologyEntityParserResult parse(long timestamp) {
-        final TopologyEntityParserResult result = new TopologyEntityParserResult();
-        result.setVersion(TopologyConstants.HadoopVersion.V2);
-
-        for (String url : rmUrls) {
-            try {
-                doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
-            } catch (ServiceNotResponseException ex) {
-                LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
-                // reSelect url
-            }
-        }
-
-        double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length;
-        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp));
-
-        doCheckHistoryServer(timestamp, result);
-        return result;
-    }
-
-    private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
-        if (historyServerUrl == null || historyServerUrl.isEmpty()) {
-            return;
-        }
-        String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
-        double liveCount = 1;
-        try {
-            InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
-        } catch (ConnectException e) {
-            liveCount = 0;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return;
-        }
-        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
-    }
-
-    private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
-        InputStream is = null;
-        try {
-            is = InputStreamUtils.getInputStream(url, null, type);
-        } catch (ConnectException e) {
-            throw new ServiceNotResponseException(e);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        return is;
-    }
-
-    private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
-
-        InputStream is = null;
-        try {
-            LOGGER.info("Going to query URL: " + url);
-            is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
-            YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
-            if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
-                throw new ServiceNotResponseException("Invalid result of URL: " + url);
-            }
-            int runningNodeCount = 0;
-            int lostNodeCount = 0;
-            int unhealthyNodeCount = 0;
-            final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
-            for (YarnNodeInfo info : list) {
-                final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
-                if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
-                    nodeManagerEntity.setHealthReport(info.getHealthReport());
-                }
-                // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
-                if (info.getState() != null) {
-                    final String state = info.getState().toLowerCase();
-                    nodeManagerEntity.setStatus(state);
-                    if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
-                        ++runningNodeCount;
-                    } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
-                        ++lostNodeCount;
-                    } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
-                        ++unhealthyNodeCount;
-                    }
-                }
-                result.getSlaveNodes().add(nodeManagerEntity);
-            }
-            LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
-            final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
-            resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
-            result.getMasterNodes().add(resourceManagerEntity);
-            double value = runningNodeCount * 1d / list.size();
-            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
-        } catch (RuntimeException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            throw new ServiceNotResponseException(e);
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    // Do nothing
-                }
-            }
-        }
-    }
-
-    private String extractMasterHost(String url) {
-        Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
-        if (matcher.find()) {
-            return matcher.group(1);
-        }
-        return url;
-    }
-
-    private String extractRack(YarnNodeInfo info) {
-        if (info.getRack() == null) {
-            return null;
-        }
-        String value = info.getRack();
-        value = value.substring(value.lastIndexOf('/') + 1);
-        return value;
-    }
-
-    private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
-        MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
-        entity.setTimestamp(updateTime);
-        entity.setLastUpdateTime(updateTime);
-        Map<String, String> tags = new HashMap<String, String>();
-        entity.setTags(tags);
-        tags.put(SITE_TAG, site);
-        tags.put(ROLE_TAG, roleType);
-        tags.put(HOSTNAME_TAG, hostname);
-        String rack = rackResolver.resolve(hostname);
-        tags.put(RACK_TAG, rack);
-        return entity;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.extractor.mr;
+
+import org.apache.eagle.app.utils.AppConstants;
+import org.apache.eagle.app.utils.PathResolverHelper;
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.extractor.TopologyEntityParser;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.utils.EntityBuilderHelper;
+import org.apache.eagle.topology.utils.ServiceNotResponseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.topology.TopologyConstants.*;
+
+public class MRTopologyEntityParser implements TopologyEntityParser {
+
+    private String[] rmUrls;
+    private String historyServerUrl;
+    private String site;
+    private TopologyRackResolver rackResolver;
+
+    private static final String YARN_NODES_URL = "/ws/v1/cluster/nodes?anonymous=true";
+    private static final String YARN_HISTORY_SERVER_URL = "/ws/v1/history/info";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MRTopologyEntityParser.class);
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public MRTopologyEntityParser(String site, TopologyCheckAppConfig.MRConfig config, TopologyRackResolver rackResolver) {
+        this.site = site;
+        this.rmUrls = config.rmUrls;
+        this.historyServerUrl = config.historyServerUrl;
+        this.rackResolver = rackResolver;
+    }
+
+    @Override
+    public TopologyConstants.HadoopVersion getHadoopVersion() {
+        return TopologyConstants.HadoopVersion.V2;
+    }
+
+    @Override
+    public TopologyConstants.TopologyType getTopologyType() {
+        return TopologyConstants.TopologyType.MR;
+    }
+
+    @Override
+    public TopologyEntityParserResult parse(long timestamp) {
+        final TopologyEntityParserResult result = new TopologyEntityParserResult();
+        result.setVersion(TopologyConstants.HadoopVersion.V2);
+
+        for (String url : rmUrls) {
+            try {
+                doParse(PathResolverHelper.buildUrlPath(url, YARN_NODES_URL), timestamp, result);
+            } catch (ServiceNotResponseException ex) {
+                LOGGER.warn("Catch a ServiceNotResponseException with url: {}", url);
+                // reSelect url
+            }
+        }
+
+        double value = result.getMasterNodes().isEmpty() ? 0 : result.getMasterNodes().size() * 1d / rmUrls.length;
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.RESOURCE_MANAGER_ROLE, value, site, timestamp));
+
+        doCheckHistoryServer(timestamp, result);
+        return result;
+    }
+
+    private void doCheckHistoryServer(long updateTime, TopologyEntityParserResult result) {
+        if (historyServerUrl == null || historyServerUrl.isEmpty()) {
+            return;
+        }
+        String hsUrl = PathResolverHelper.buildUrlPath(historyServerUrl, YARN_HISTORY_SERVER_URL);
+        double liveCount = 1;
+        try {
+            InputStreamUtils.getInputStream(hsUrl, null, AppConstants.CompressionType.NONE);
+        } catch (ConnectException e) {
+            liveCount = 0;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return;
+        }
+        result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.HISTORY_SERVER_ROLE, liveCount, site, updateTime));
+    }
+
+    private InputStream getInputStream(String url, AppConstants.CompressionType type) throws ServiceNotResponseException {
+        InputStream is = null;
+        try {
+            is = InputStreamUtils.getInputStream(url, null, type);
+        } catch (ConnectException e) {
+            throw new ServiceNotResponseException(e);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return is;
+    }
+
+    private void doParse(String url, long timestamp, TopologyEntityParserResult result) throws ServiceNotResponseException {
+
+        InputStream is = null;
+        try {
+            LOGGER.info("Going to query URL: " + url);
+            is = InputStreamUtils.getInputStream(url, null, AppConstants.CompressionType.NONE);
+            YarnNodeInfoWrapper nodeWrapper = OBJ_MAPPER.readValue(is, YarnNodeInfoWrapper.class);
+            if (nodeWrapper.getNodes() == null || nodeWrapper.getNodes().getNode() == null) {
+                throw new ServiceNotResponseException("Invalid result of URL: " + url);
+            }
+            int runningNodeCount = 0;
+            int lostNodeCount = 0;
+            int unhealthyNodeCount = 0;
+            final List<YarnNodeInfo> list = nodeWrapper.getNodes().getNode();
+            for (YarnNodeInfo info : list) {
+                final MRServiceTopologyAPIEntity nodeManagerEntity = createEntity(NODE_MANAGER_ROLE, info.getNodeHostName(), timestamp);
+                if (info.getHealthReport() != null && (!info.getHealthReport().isEmpty())) {
+                    nodeManagerEntity.setHealthReport(info.getHealthReport());
+                }
+                // TODO: Need to remove the manually mapping RUNNING -> running, LOST - > lost, UNHEALTHY -> unhealthy
+                if (info.getState() != null) {
+                    final String state = info.getState().toLowerCase();
+                    nodeManagerEntity.setStatus(state);
+                    if (state.equals(TopologyConstants.NODE_MANAGER_RUNNING_STATUS)) {
+                        ++runningNodeCount;
+                    } else if (state.equals(TopologyConstants.NODE_MANAGER_LOST_STATUS)) {
+                        ++lostNodeCount;
+                    } else if (state.equals(TopologyConstants.NODE_MANAGER_UNHEALTHY_STATUS)) {
+                        ++unhealthyNodeCount;
+                    }
+                }
+                result.getSlaveNodes().add(nodeManagerEntity);
+            }
+            LOGGER.info("Running NMs: " + runningNodeCount + ", lost NMs: " + lostNodeCount + ", unhealthy NMs: " + unhealthyNodeCount);
+            final MRServiceTopologyAPIEntity resourceManagerEntity = createEntity(TopologyConstants.RESOURCE_MANAGER_ROLE, extractMasterHost(url), timestamp);
+            resourceManagerEntity.setStatus(TopologyConstants.RESOURCE_MANAGER_ACTIVE_STATUS);
+            result.getMasterNodes().add(resourceManagerEntity);
+            double value = runningNodeCount * 1d / list.size();
+            result.getMetrics().add(EntityBuilderHelper.generateMetric(TopologyConstants.NODE_MANAGER_ROLE, value, site, timestamp));
+        } catch (RuntimeException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            throw new ServiceNotResponseException(e);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    // Do nothing
+                }
+            }
+        }
+    }
+
+    private String extractMasterHost(String url) {
+        Matcher matcher = TopologyConstants.HTTP_HOST_MATCH_PATTERN.matcher(url);
+        if (matcher.find()) {
+            return matcher.group(1);
+        }
+        return url;
+    }
+
+    private String extractRack(YarnNodeInfo info) {
+        if (info.getRack() == null) {
+            return null;
+        }
+        String value = info.getRack();
+        value = value.substring(value.lastIndexOf('/') + 1);
+        return value;
+    }
+
+    private MRServiceTopologyAPIEntity createEntity(String roleType, String hostname, long updateTime) {
+        MRServiceTopologyAPIEntity entity = new MRServiceTopologyAPIEntity();
+        entity.setTimestamp(updateTime);
+        entity.setLastUpdateTime(updateTime);
+        Map<String, String> tags = new HashMap<String, String>();
+        entity.setTags(tags);
+        tags.put(SITE_TAG, site);
+        tags.put(ROLE_TAG, roleType);
+        tags.put(HOSTNAME_TAG, hostname);
+        String rack = rackResolver.resolve(hostname);
+        tags.put(RACK_TAG, rack);
+        return entity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
index afd3518..11d907b 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java
@@ -1,200 +1,200 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.topology.storm;
-
-import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
-import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
-import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
-import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.apache.eagle.topology.TopologyCheckAppConfig;
-import org.apache.eagle.topology.TopologyConstants;
-import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
-import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
-import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
-import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class TopologyDataPersistBolt extends BaseRichBolt {
-
-    private TopologyCheckAppConfig config;
-    private IEagleServiceClient client;
-    private OutputCollector collector;
-
-    private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
-
-    public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
-        this.config = config;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
-            this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        if (input == null) {
-            return;
-        }
-        String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
-        TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
-        Set<String> availableHostnames = new HashSet<String>();
-        List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
-        List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
-
-        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
-        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
-
-        String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
-        try {
-            GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
-            if (response.isSuccess() && response.getObj() != null) {
-                for (TopologyBaseAPIEntity entity : response.getObj()) {
-                    if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) {
-                        entitiesForDeletion.add(entity);
-                    }
-                }
-            }
-            deleteEntities(entitiesForDeletion, serviceName);
-            writeEntities(entitiesToWrite, serviceName);
-            writeEntities(result.getMetrics(), serviceName);
-            emitToKafkaBolt(result);
-            this.collector.ack(input);
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            this.collector.fail(input);
-        }
-    }
-
-    private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
-        for (TopologyBaseAPIEntity entity : entities) {
-            availableHostnames.add(generateKey(entity));
-            entitiesToWrite.add(entity);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("f1"));
-    }
-
-    private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
-        try {
-            GenericServiceAPIResponseEntity response = client.delete(entities);
-            if (!response.isSuccess()) {
-                LOG.error("Got exception from eagle service: " + response.getException());
-            } else {
-                LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
-            }
-        } catch (EagleServiceClientException e) {
-            LOG.error(e.getMessage(), e);
-        } catch (IOException e) {
-            LOG.error(e.getMessage(), e);
-        }
-        entities.clear();
-    }
-
-    private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
-        try {
-            GenericServiceAPIResponseEntity response = client.create(entities);
-            if (!response.isSuccess()) {
-                LOG.error("Got exception from eagle service: " + response.getException());
-            } else {
-                LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
-            }
-        } catch (Exception e) {
-            LOG.error("cannot create entities successfully", e);
-        }
-        entities.clear();
-    }
-
-    private String generateKey(TopologyBaseAPIEntity entity) {
-        return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
-            entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
-            entity.getTags().get(TopologyConstants.ROLE_TAG));
-    }
-
-    private void emitToKafkaBolt(TopologyEntityParserResult result) {
-
-        List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
-
-        setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
-
-        setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
-
-        for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
-            this.collector.emit(new Values(healthCheckAPIEntity));
-        }
-
-    }
-
-    private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
-        HealthCheckParseAPIEntity healthCheckAPIEntity = null;
-        for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
-
-            healthCheckAPIEntity = new HealthCheckParseAPIEntity();
-            TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
-
-            if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
-
-                healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
-
-            }
-            if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
-
-                healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
-            }
-
-            if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
-
-                healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
-            }
-
-            healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
-            healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
-            healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
-            healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
-            healthCheckParseAPIList.add(healthCheckAPIEntity);
-        }
-    }
-}
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.topology.storm;
+
+import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG;
+import static org.apache.eagle.topology.TopologyConstants.RACK_TAG;
+import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG;
+import static org.apache.eagle.topology.TopologyConstants.SITE_TAG;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.eagle.topology.TopologyCheckAppConfig;
+import org.apache.eagle.topology.TopologyConstants;
+import org.apache.eagle.topology.extractor.TopologyEntityParserResult;
+import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity;
+import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity;
+import org.apache.eagle.topology.entity.TopologyBaseAPIEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TopologyDataPersistBolt extends BaseRichBolt {
+
+    private TopologyCheckAppConfig config;
+    private IEagleServiceClient client;
+    private OutputCollector collector;
+
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyDataPersistBolt.class);
+
+    public TopologyDataPersistBolt(TopologyCheckAppConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"),
+            this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password")));
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (input == null) {
+            return;
+        }
+        String serviceName = input.getStringByField(TopologyConstants.SERVICE_NAME_FIELD);
+        TopologyEntityParserResult result = (TopologyEntityParserResult) input.getValueByField(TopologyConstants.TOPOLOGY_DATA_FIELD);
+        Set<String> availableHostnames = new HashSet<String>();
+        List<TopologyBaseAPIEntity> entitiesForDeletion = new ArrayList<>();
+        List<TopologyBaseAPIEntity> entitiesToWrite = new ArrayList<>();
+
+        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getMasterNodes());
+        filterEntitiesToWrite(entitiesToWrite, availableHostnames, result.getSlaveNodes());
+
+        String query = String.format("%s[@site=\"%s\"]{*}", serviceName, this.config.dataExtractorConfig.site);
+        try {
+            GenericServiceAPIResponseEntity<TopologyBaseAPIEntity> response = client.search().query(query).pageSize(Integer.MAX_VALUE).send();
+            if (response.isSuccess() && response.getObj() != null) {
+                for (TopologyBaseAPIEntity entity : response.getObj()) {
+                    if (availableHostnames != null && availableHostnames.size() > 0 && !availableHostnames.contains(generateKey(entity))) {
+                        entitiesForDeletion.add(entity);
+                    }
+                }
+            }
+            deleteEntities(entitiesForDeletion, serviceName);
+            writeEntities(entitiesToWrite, serviceName);
+            writeEntities(result.getMetrics(), serviceName);
+            emitToKafkaBolt(result);
+            this.collector.ack(input);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            this.collector.fail(input);
+        }
+    }
+
+    private void filterEntitiesToWrite(List<TopologyBaseAPIEntity> entitiesToWrite, Set<String> availableHostnames, List<TopologyBaseAPIEntity> entities) {
+        for (TopologyBaseAPIEntity entity : entities) {
+            availableHostnames.add(generateKey(entity));
+            entitiesToWrite.add(entity);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1"));
+    }
+
+    private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) {
+        try {
+            GenericServiceAPIResponseEntity response = client.delete(entities);
+            if (!response.isSuccess()) {
+                LOG.error("Got exception from eagle service: " + response.getException());
+            } else {
+                LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName);
+            }
+        } catch (EagleServiceClientException e) {
+            LOG.error(e.getMessage(), e);
+        } catch (IOException e) {
+            LOG.error(e.getMessage(), e);
+        }
+        entities.clear();
+    }
+
+    private void writeEntities(List<? extends TaggedLogAPIEntity> entities, String serviceName) {
+        try {
+            GenericServiceAPIResponseEntity response = client.create(entities);
+            if (!response.isSuccess()) {
+                LOG.error("Got exception from eagle service: " + response.getException());
+            } else {
+                LOG.info("Successfully wrote {} entities for {}", entities.size(), serviceName);
+            }
+        } catch (Exception e) {
+            LOG.error("cannot create entities successfully", e);
+        }
+        entities.clear();
+    }
+
+    private String generateKey(TopologyBaseAPIEntity entity) {
+        return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG),
+            entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG),
+            entity.getTags().get(TopologyConstants.ROLE_TAG));
+    }
+
+    private void emitToKafkaBolt(TopologyEntityParserResult result) {
+
+        List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>();
+
+        setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList);
+
+        setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList);
+
+        for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) {
+            this.collector.emit(new Values(healthCheckAPIEntity));
+        }
+
+    }
+
+    private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) {
+        HealthCheckParseAPIEntity healthCheckAPIEntity = null;
+        for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) {
+
+            healthCheckAPIEntity = new HealthCheckParseAPIEntity();
+            TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next();
+
+            if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+
+            }
+            if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+            }
+
+            if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) {
+
+                healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus());
+            }
+
+            healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp());
+            healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG));
+            healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG));
+            healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG));
+            healthCheckParseAPIList.add(healthCheckAPIEntity);
+        }
+    }
+}


Mime
View raw message