eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject eagle git commit: [EAGLE-860] TopologyDataExtractor can't extract right rack info
Date Fri, 13 Jan 2017 09:15:39 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 0a4299766 -> 71c1f95a4


[EAGLE-860] TopologyDataExtractor can't extract right rack info

- Use clusternode API to get rack info.

https://issues.apache.org/jira/browse/EAGLE-860

Author: r7raul1984 <tangjijun@yhd.com>

Closes #772 from r7raul1984/EAGLE-860.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/71c1f95a
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/71c1f95a
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/71c1f95a

Branch: refs/heads/master
Commit: 71c1f95a46fef70bff42cf0b1db56cfc3f6b1d3c
Parents: 0a42997
Author: r7raul1984 <tangjijun@yhd.com>
Authored: Fri Jan 13 17:15:17 2017 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Jan 13 17:15:17 2017 +0800

----------------------------------------------------------------------
 eagle-topology-check/eagle-topology-app/pom.xml | 13 +++-
 .../eagle/topology/TopologyCheckAppConfig.java  |  2 +
 .../ClusterNodeAPITopologyRackResolver.java     | 72 ++++++++++++++++++++
 .../eagle/topology/resolver/model/Node.java     | 36 ++++++++++
 .../eagle/topology/resolver/model/NodeInfo.java | 57 ++++++++++++++++
 .../topology/storm/TopologyDataExtractor.java   | 14 ++--
 ....eagle.topology.TopologyCheckAppProvider.xml |  7 +-
 .../TestClusterNodeAPITopologyRackResolver.java | 72 ++++++++++++++++++++
 .../src/test/resources/nodeinfo.json            | 17 +++++
 9 files changed, 284 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/pom.xml b/eagle-topology-check/eagle-topology-app/pom.xml
index 913003d..dc74b9f 100644
--- a/eagle-topology-check/eagle-topology-app/pom.xml
+++ b/eagle-topology-check/eagle-topology-app/pom.xml
@@ -50,7 +50,18 @@
             <groupId>org.json</groupId>
             <artifactId>json</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index 90a3773..a75a3b3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -77,6 +77,7 @@ public class TopologyCheckAppConfig implements Serializable {
         this.dataExtractorConfig.numDataFetcherSpout = config.getInt("topology.numDataFetcherSpout");
         this.dataExtractorConfig.numEntityPersistBolt = config.getInt("topology.numEntityPersistBolt");
         this.dataExtractorConfig.numKafkaSinkBolt = config.getInt("topology.numOfKafkaSinkBolt");
+        this.dataExtractorConfig.resolverAPIUrl = config.getString("topology.resolverAPIUrl");
         String resolveCls = config.getString("topology.rackResolverCls");
         try {
             this.dataExtractorConfig.resolverCls = (Class<? extends TopologyRackResolver>)
Class.forName(resolveCls);
@@ -120,6 +121,7 @@ public class TopologyCheckAppConfig implements Serializable {
         public int numKafkaSinkBolt;
         public long fetchDataIntervalInSecs;
         public int parseThreadPoolSize;
+        public String resolverAPIUrl;
         public Class<? extends TopologyRackResolver> resolverCls;
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java
new file mode 100644
index 0000000..8e153f0
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/ClusterNodeAPITopologyRackResolver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.app.utils.AppConstants;
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.model.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+/**
+ * resolve rack by ClusterNode API.
+ * https://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html
+ */
+public class ClusterNodeAPITopologyRackResolver implements TopologyRackResolver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterNodeAPITopologyRackResolver.class);
+    private static final String DEFAULT_RACK_NAME = "/default-rack";
+    private String activeApiUrl = "";
+    private String hostPort = "8041";//TODO configurable
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    public ClusterNodeAPITopologyRackResolver(String activeApiUrl) {
+        this.activeApiUrl = activeApiUrl;
+    }
+
+    @Override
+    public String resolve(String hostname) {
+        String nodeid = hostname + ":" + hostPort;
+        String requestUrl = activeApiUrl + "/" + nodeid;
+        String rack = DEFAULT_RACK_NAME;
+        InputStream is = null;
+        try {
+            is = InputStreamUtils.getInputStream(requestUrl, null, AppConstants.CompressionType.NONE);
+            LOG.info("resolve rack by api url {}", requestUrl);
+            Node node = OBJ_MAPPER.readValue(is, Node.class);
+            rack = node.getNode().getRack();
+        } catch (Exception e) {
+            LOG.warn("resolve rack by api url {} failed, {}", requestUrl, e);
+            return rack;
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (Exception e) {
+                    LOG.warn("{}", e);
+                }
+            }
+        }
+        return rack;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java
new file mode 100644
index 0000000..ac9460a
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/Node.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Node {
+    private NodeInfo node;
+
+    public NodeInfo getNode() {
+        return node;
+    }
+
+    public void setNode(NodeInfo node) {
+        this.node = node;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java
new file mode 100644
index 0000000..4a47b8a
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/model/NodeInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.topology.resolver.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class NodeInfo {
+
+    private String id;
+    private String rack;
+    private String nodeHostName;
+
+    public String getRack() {
+        return rack;
+    }
+
+    public void setRack(String rack) {
+        this.rack = rack;
+    }
+
+    public String getNodeHostName() {
+        return nodeHostName;
+    }
+
+    public void setNodeHostName(String nodeHostName) {
+        this.nodeHostName = nodeHostName;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
index 32eae9b..492cf3f 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataExtractor.java
@@ -23,10 +23,13 @@ import org.apache.eagle.topology.TopologyCheckAppConfig;
 import org.apache.eagle.topology.extractor.TopologyCrawler;
 import org.apache.eagle.topology.extractor.TopologyExtractorFactory;
 import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver;
 import org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
@@ -74,10 +77,13 @@ public class TopologyDataExtractor {
         TopologyRackResolver rackResolver = new DefaultTopologyRackResolver();
         if (config.dataExtractorConfig.resolverCls != null) {
             try {
-                rackResolver = config.dataExtractorConfig.resolverCls.newInstance();
-            } catch (InstantiationException e) {
-                e.printStackTrace();
-            } catch (IllegalAccessException e) {
+                if (config.dataExtractorConfig.resolverCls == ClusterNodeAPITopologyRackResolver.class)
{
+                    Constructor ctor = config.dataExtractorConfig.resolverCls.getConstructor(String.class);
+                    rackResolver = (ClusterNodeAPITopologyRackResolver) ctor.newInstance(config.dataExtractorConfig.resolverAPIUrl);
+                } else {
+                    rackResolver = config.dataExtractorConfig.resolverCls.newInstance();
+                }
+            } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
                 e.printStackTrace();
             }
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index bfe43ed..87d3202 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -58,7 +58,12 @@
             <value>2</value>
             <description>number of sinks connected to alert engine</description>
         </property>
-
+        <property>
+            <name>topology.resolverAPIUrl</name>
+            <displayName>Rack Resolver APIUrl</displayName>
+            <description>Use the URL to obtain a Node Object, from a node identified
by the nodeid value.</description>
+            <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value>
+        </property>
         <property>
             <name>topology.rackResolverCls</name>
             <displayName>Rack Resolver Class</displayName>

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java
b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java
new file mode 100644
index 0000000..cb53c1a
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/java/org/apache/eagle/topology/TestClusterNodeAPITopologyRackResolver.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.topology;
+
+import org.apache.eagle.app.utils.connection.InputStreamUtils;
+import org.apache.eagle.topology.resolver.TopologyRackResolver;
+import org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.InputStream;
+import java.lang.reflect.Constructor;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(InputStreamUtils.class)
+public class TestClusterNodeAPITopologyRackResolver {
+    @Test
+    public void testClusterNodeAPITopologyRackResolver() throws Exception {
+        mockStatic(InputStreamUtils.class);
+        String apiUrl = "http://yhd-jqhadoop168.int.yihaodian.com:8088/ws/v1/cluster/nodes";
+        String hostname = "hostname";
+        mockInputSteam("/nodeinfo.json", apiUrl + "/" + hostname + ":8041");
+
+        Class<? extends TopologyRackResolver> resolverCls = (Class<? extends TopologyRackResolver>)
Class.forName("org.apache.eagle.topology.resolver.impl.ClusterNodeAPITopologyRackResolver");
+        Assert.assertTrue(resolverCls == ClusterNodeAPITopologyRackResolver.class);
+        Constructor ctor = resolverCls.getConstructor(String.class);
+        TopologyRackResolver topologyRackResolver = (TopologyRackResolver) ctor.newInstance(apiUrl);
+        Assert.assertEquals("/rowb/rack12", topologyRackResolver.resolve(hostname));
+    }
+
+    @Test
+    public void testClusterNodeAPITopologyRackResolver1() throws Exception {
+        mockStatic(InputStreamUtils.class);
+        String apiUrl = "http://yhd-jqhadoop168.int.yihaodian.com:8088/ws/v1/cluster/nodes";
+        String hostname = "hostname";
+        mockInputSteamWithException(apiUrl + "/" + hostname + ":8041");
+        TopologyRackResolver topologyRackResolver = new ClusterNodeAPITopologyRackResolver(apiUrl);
+        Assert.assertEquals("/default-rack", topologyRackResolver.resolve(hostname));
+    }
+
+    private void mockInputSteam(String mockDataFilePath, String url) throws Exception {
+        InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
+        when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenReturn(jsonstream);
+    }
+
+    private void mockInputSteamWithException(String url) throws Exception {
+        when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenThrow(new
Exception());
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/71c1f95a/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json b/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json
new file mode 100644
index 0000000..ac87391
--- /dev/null
+++ b/eagle-topology-check/eagle-topology-app/src/test/resources/nodeinfo.json
@@ -0,0 +1,17 @@
+{
+  "node": {
+    "rack": "/rowb/rack12",
+    "state": "RUNNING",
+    "id": "hostname:8041",
+    "nodeHostName": "hostname",
+    "nodeHTTPAddress": "hostname:8042",
+    "lastHealthUpdate": 1484196671092,
+    "version": "2.6.0",
+    "healthReport": "",
+    "numContainers": 2,
+    "usedMemoryMB": 6144,
+    "availMemoryMB": 43008,
+    "usedVirtualCores": 2,
+    "availableVirtualCores": 22
+  }
+}
\ No newline at end of file


Mime
View raw message