eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [1/2] eagle git commit: [EAGLE-946] Refactor MRRunningJobApp & HadoopQueueApp
Date Wed, 15 Mar 2017 08:34:19 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 93f83f4a9 -> 3fe637eb5


http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index 225ede9..e1991b0 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -19,6 +19,8 @@
  */
 package org.apache.eagle.jpm.util.resourcefetch;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
 import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
@@ -27,25 +29,24 @@ import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper;
 import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfo;
 import org.apache.eagle.jpm.util.resourcefetch.model.ClusterInfoWrapper;
-import org.apache.eagle.jpm.util.resourcefetch.url.JobListServiceURLBuilderImpl;
-import org.apache.eagle.jpm.util.resourcefetch.url.ServiceURLBuilder;
-import org.apache.eagle.jpm.util.resourcefetch.url.SparkCompleteJobServiceURLBuilderImpl;
-import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil;
+import org.apache.eagle.jpm.util.resourcefetch.url.*;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.InputStream;
+import java.security.InvalidParameterException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 
     private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
     private final HAURLSelector selector;
-    private final ServiceURLBuilder jobListServiceURLBuilder;
+    //private final ServiceURLBuilder jobListServiceURLBuilder;
     private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
 
@@ -54,31 +55,32 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo>
{
     }
 
     public RMResourceFetcher(String[] rmBasePaths) {
-        this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+        //this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
         this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl();
-
-        this.selector = new HAURLSelectorImpl(rmBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP);
+        this.selector = new HAURLSelectorImpl(
+                rmBasePaths,
+                new RmActiveTestURLBuilderImpl(),
+                Constants.CompressionType.NONE, null);
     }
 
-    private void checkUrl() throws IOException {
-        if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(),
Constants.JobState.RUNNING.name()))) {
-            selector.reSelectUrl();
-        }
+    public HAURLSelector getSelector() {
+        return selector;
     }
 
-    private List<AppInfo> doFetchFinishApplicationsList(String urlString, Constants.CompressionType
compressionType) throws Exception {
-        List<AppInfo> result = new ArrayList<>(0);
+    private List<AppInfo> doFetchApplicationsList(String urlString, Constants.CompressionType
compressionType) {
+        List<AppInfo> result = new ArrayList<>();
         InputStream is = null;
         try {
-            checkUrl();
-            LOG.info("Going to call yarn api to fetch finished application list: " + urlString);
+            LOG.info("Going to query cluster applications list: " + urlString);
             is = InputStreamUtils.getInputStream(urlString, null, compressionType);
             final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
             if (appWrapper != null && appWrapper.getApps() != null
                 && appWrapper.getApps().getApp() != null) {
                 result = appWrapper.getApps().getApp();
             }
-            return result;
+            LOG.info("Successfully fetched {} AppInfos from url {}", result.size(), urlString);
+        } catch (Exception e) {
+            LOG.error("Fail to query {} due to {}", urlString, e.getMessage());
         } finally {
             if (is != null) {
                 try {
@@ -88,21 +90,25 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo>
{
                 }
             }
         }
+        return result;
     }
 
-    private String getSparkRunningJobURL() {
-        return selector.getSelectedUrl()
-                + "/"
-                + Constants.V2_APPS_URL
-                + "?applicationTypes=SPARK&state=RUNNING&"
-                + Constants.ANONYMOUS_PARAMETER;
-    }
-
-    private String getMRRunningJobURL() {
-        return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s",
-            selector.getSelectedUrl(),
-            Constants.V2_APPS_URL,
-            Constants.ANONYMOUS_PARAMETER);
+    public String getRunningJobURL(Constants.JobType jobType, String startTime, String endTime,
String limit) {
+        String condition = "";
+        limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit);
+        if (startTime == null && endTime == null) {
+            condition = String.format("applicationTypes=%s%s&", jobType, limit);
+        } else if (startTime == null) {
+            condition = String.format("applicationTypes=%s&startedTimeEnd=%s%s&",
jobType, endTime, limit);
+        } else if (endTime == null) {
+            condition = String.format("applicationTypes=%s&startedTimeBegin=%s%s&",
jobType, startTime, limit);
+        } else {
+            condition = String.format("applicationTypes=%s&startedTimeBegin=%s&startedTimeEnd=%s%s&",
+                    jobType, startTime, endTime, limit);
+        }
+        String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
+        return String.format("%s/%s?%sstate=RUNNING&%s", url, Constants.V2_APPS_URL,
condition,
+                Constants.ANONYMOUS_PARAMETER);
     }
 
     private String getMRFinishedJobURL(String lastFinishedTime) {
@@ -112,40 +118,101 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo>
{
                 + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER;
     }
 
-    private List<AppInfo> doFetchRunningApplicationsList(String urlString, Constants.CompressionType
compressionType) throws Exception {
-        List<AppInfo> result = new ArrayList<>(0);
-        InputStream is = null;
+    private String getAcceptedAppURL() {
+        String baseUrl = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
+        return String.format("%s/%s?state=ACCEPTED&%s", baseUrl, Constants.V2_APPS_URL,
Constants.ANONYMOUS_PARAMETER);
+    }
+
+    private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType jobType,
+                                                         Constants.CompressionType compressionType,
+                                                         Object... parameter) throws Exception
{
+        Map<String, AppInfo> result = new HashMap();
+        List<AppInfo> apps = new ArrayList<>();
         try {
-            checkUrl();
-            LOG.info("Going to call yarn api to fetch running application list: " + urlString);
-            is = InputStreamUtils.getInputStream(urlString, null, compressionType);
-            final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
-            if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp()
!= null) {
-                result = appWrapper.getApps().getApp();
+            selector.checkUrl();
+
+            String limit = "";
+            int requests = 1;
+            int timeRangePerRequestInMin = 60;
+
+            switch (parameter.length) {
+                case 0 :
+                    String urlString = getRunningJobURL(jobType, null, null, null);
+                    return doFetchApplicationsList(urlString, compressionType);
+                case 1 :
+                    limit = String.valueOf(parameter[0]);
+                    break;
+                case 2 :
+                    limit = String.valueOf(parameter[0]);
+                    requests = (int) parameter[1];
+                    break;
+                case 3 :
+                    limit = String.valueOf(parameter[0]);
+                    requests = (int) parameter[1];
+                    timeRangePerRequestInMin = (int) parameter[2];
+                    break;
+                default :
+                    throw new InvalidParameterException("parameter list: limit, requests,
requestTimeRange");
             }
-            return result;
-        } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (Exception e) {
-                    LOG.warn("{}", e);
-                }
+
+            if (requests <= 1) {
+                String urlString = getRunningJobURL(jobType, null, null, limit);
+                return doFetchApplicationsList(urlString, compressionType);
             }
+
+            long interval =  timeRangePerRequestInMin * DateTimeUtil.ONEMINUTE;
+            long currentTime = System.currentTimeMillis() - interval;
+
+            List<String> requestUrls = new ArrayList<>();
+            requestUrls.add(getRunningJobURL(jobType, String.valueOf(currentTime), null,
limit));
+
+            for (int cnt = 2; cnt < requests; cnt++) {
+                long start = currentTime - interval;
+                requestUrls.add(getRunningJobURL(jobType, String.valueOf(start), String.valueOf(currentTime),
limit));
+                currentTime -= interval;
+            }
+
+            requestUrls.add(getRunningJobURL(jobType, null, String.valueOf(currentTime),
limit));
+            LOG.info("{} requests to fetch running MapReduce applications: \n{}", requestUrls.size(),
+                    StringUtils.join(requestUrls, "\n"));
+
+            requestUrls.forEach(query ->
+                doFetchApplicationsList(query, compressionType).forEach(app -> result.put(app.getId(),
app))
+            );
+        } catch (Exception e) {
+            LOG.error("Catch an exception when query url{} : {}", selector.getSelectedUrl(),
e.getMessage(), e);
+            return apps;
+        }
+        apps.addAll(result.values());
+        return apps;
+    }
+
+    private List<AppInfo> doFetchAcceptedApplicationList(Constants.CompressionType
compressionType,
+                                                         Object... parameter) throws Exception
{
+        List<AppInfo> apps = new ArrayList<>();
+        try {
+            selector.checkUrl();
+            String url = getAcceptedAppURL();
+            return doFetchApplicationsList(url, compressionType);
+        } catch (Exception e) {
+            LOG.error("Catch an exception when query {} : {}", selector.getSelectedUrl(),
e.getMessage(), e);
         }
+        return apps;
     }
 
     private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType
compressionType, Object... parameter) throws Exception {
         switch (resourceType) {
             case COMPLETE_SPARK_JOB:
                 final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(),
(String) parameter[0]);
-                return doFetchFinishApplicationsList(urlString, compressionType);
+                return doFetchApplicationsList(urlString, compressionType);
             case RUNNING_SPARK_JOB:
-                return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType);
+                return doFetchRunningApplicationsList(Constants.JobType.SPARK, compressionType,
parameter);
             case RUNNING_MR_JOB:
-                return doFetchRunningApplicationsList(getMRRunningJobURL(), compressionType);
+                return doFetchRunningApplicationsList(Constants.JobType.MAPREDUCE, compressionType,
parameter);
             case COMPLETE_MR_JOB:
-                return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]),
compressionType);
+                return doFetchApplicationsList(getMRFinishedJobURL((String) parameter[0]),
compressionType);
+            case ACCEPTED_JOB:
+                return doFetchAcceptedApplicationList(compressionType, parameter);
             default:
                 throw new Exception("Not support resourceType :" + resourceType);
         }
@@ -166,7 +233,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo>
{
     public ClusterInfo getClusterInfo() throws Exception {
         InputStream is = null;
         try {
-            checkUrl();
+            selector.checkUrl();
             final String urlString = getClusterInfoURL();
             LOG.info("Calling yarn api to fetch cluster info: " + urlString);
             is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
index 6b0f454..9e776be 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
@@ -53,7 +53,6 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA
         this.historyServerURL = historyServerURL;
         this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl();
         this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName,
pwd).getBytes()));
-        ;
     }
 
     private List<SparkApplication> doFetchSparkApplicationDetail(String appId) throws
Exception {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
deleted file mode 100644
index 2a99d26..0000000
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.util.resourcefetch.ha;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-
-public abstract class AbstractURLSelector implements HAURLSelector {
-    private final String[] urls;
-    private volatile String selectedUrl;
-
-    private volatile boolean reselectInProgress;
-    private final Constants.CompressionType compressionType;
-
-    private static final long MAX_RETRY_TIME = 3;
-    private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
-
-    public AbstractURLSelector(String[] urls, Constants.CompressionType compressionType)
{
-        this.urls = urls;
-        this.compressionType = compressionType;
-    }
-
-    public boolean checkUrl(String urlString) {
-        InputStream is = null;
-        try {
-            is = InputStreamUtils.getInputStream(urlString, null, compressionType);
-        } catch (Exception ex) {
-            LOG.info("get input stream from url: " + urlString + " failed. ");
-            return false;
-        } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (IOException e) {
-                    LOG.warn("{}", e);
-                }
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public String getSelectedUrl() {
-        if (selectedUrl == null) {
-            selectedUrl = urls[0];
-        }
-        return selectedUrl;
-    }
-
-    @Override
-    public void reSelectUrl() throws IOException {
-        if (reselectInProgress) {
-            return;
-        }
-        synchronized (this) {
-            if (reselectInProgress) {
-                return;
-            }
-            reselectInProgress = true;
-            try {
-                LOG.info("Going to reselect url");
-                for (int i = 0; i < urls.length; i++) {
-                    String urlToCheck = urls[i];
-                    LOG.info("Going to try url :" + urlToCheck);
-                    for (int time = 0; time < MAX_RETRY_TIME; time++) {
-                        if (checkUrl(buildTestURL(urlToCheck))) {
-                            selectedUrl = urls[i];
-                            LOG.info("Successfully switch to new url : " + selectedUrl);
-                            return;
-                        }
-                        LOG.info("try url " + urlToCheck + " failed for " + (time + 1) +
" times, sleep 5 seconds before try again. ");
-                        try {
-                            Thread.sleep(5 * 1000);
-                        } catch (InterruptedException ex) {
-                            LOG.warn("{}", ex);
-                        }
-                    }
-                }
-                throw new IOException("No alive url found: " + StringUtils.join(";", Arrays.asList(this.urls)));
-            } finally {
-                reselectInProgress = false;
-            }
-        }
-    }
-
-    protected abstract String buildTestURL(String urlToCheck);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java
index fa9b52b..6539263 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelector.java
@@ -20,9 +20,9 @@ import java.io.IOException;
 
 public interface HAURLSelector {
 
-    boolean checkUrl(String url);
+    void checkUrl() throws IOException;
 
-    void reSelectUrl() throws IOException;
+    boolean checkUrl(String urlString);
 
     String getSelectedUrl();
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
index a083ef2..ca1df25 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImpl.java
@@ -32,16 +32,38 @@ public class HAURLSelectorImpl implements HAURLSelector {
     private final String[] urls;
     private volatile String selectedUrl;
     private final ServiceURLBuilder builder;
+    private final Constants.JobState jobState;
 
     private volatile boolean reselectInProgress;
     private final Constants.CompressionType compressionType;
     private static final long MAX_RETRY_TIME = 2;
     private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
 
-    public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType
compressionType) {
+    public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType
compressionType, Constants.JobState jobState) {
         this.urls = urls;
         this.compressionType = compressionType;
         this.builder = builder;
+        this.jobState = jobState;
+    }
+
+    private String convertRestApi(Constants.JobState jobState) {
+        if (jobState == null) {
+            return null;
+        }
+        switch (jobState) {
+            case RUNNING : return Constants.V2_APPS_RUNNING_URL;
+            case FINISHED : return Constants.V2_APPS_COMPLETED_URL;
+            case ALL : return Constants.V2_APPS_URL;
+            default :
+                LOG.error("Unsupported JobState={}", jobState);
+                return null;
+        }
+    }
+
+    public void checkUrl() throws IOException {
+        if (!checkUrl(builder.build(getSelectedUrl(), convertRestApi(jobState)))) {
+            reSelectUrl();
+        }
     }
 
     public boolean checkUrl(String urlString) {
@@ -49,7 +71,7 @@ public class HAURLSelectorImpl implements HAURLSelector {
         try {
             is = InputStreamUtils.getInputStream(urlString, null, compressionType);
         } catch (Exception ex) {
-            LOG.info("get inputstream from url: " + urlString + " failed. ");
+            LOG.info("get inputStream from url: " + urlString + " failed. ");
             return false;
         } finally {
             if (is != null) {
@@ -71,8 +93,7 @@ public class HAURLSelectorImpl implements HAURLSelector {
         return selectedUrl;
     }
 
-    @Override
-    public void reSelectUrl() throws IOException {
+    private void reSelectUrl() throws IOException {
         if (reselectInProgress) {
             return;
         }
@@ -87,7 +108,7 @@ public class HAURLSelectorImpl implements HAURLSelector {
                     String urlToCheck = urls[i];
                     LOG.info("Going to try url :" + urlToCheck);
                     for (int time = 0; time < MAX_RETRY_TIME; time++) {
-                        if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name())))
{
+                        if (checkUrl(builder.build(urlToCheck, convertRestApi(jobState))))
{
                             selectedUrl = urls[i];
                             LOG.info("Successfully switch to new url : " + selectedUrl);
                             return;

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java
index 90709c9..5b0269d 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/model/AppInfo.java
@@ -43,9 +43,12 @@ public class AppInfo implements Serializable {
     private long elapsedTime;
     private String amContainerLogs;
     private String amHostHttpAddress;
-    private long allocatedMB;
+    private int allocatedMB;
     private int allocatedVCores;
     private int runningContainers;
+    // for HDP 2.7
+    private double queueUsagePercentage;
+    private double clusterUsagePercentage;
 
     public String getId() {
         return id;
@@ -183,11 +186,11 @@ public class AppInfo implements Serializable {
         this.amHostHttpAddress = amHostHttpAddress;
     }
 
-    public long getAllocatedMB() {
+    public int getAllocatedMB() {
         return allocatedMB;
     }
 
-    public void setAllocatedMB(long allocatedMB) {
+    public void setAllocatedMB(int allocatedMB) {
         this.allocatedMB = allocatedMB;
     }
 
@@ -207,6 +210,22 @@ public class AppInfo implements Serializable {
         this.runningContainers = runningContainers;
     }
 
+    public double getQueueUsagePercentage() {
+        return queueUsagePercentage;
+    }
+
+    public void setQueueUsagePercentage(double queueUsagePercentage) {
+        this.queueUsagePercentage = queueUsagePercentage;
+    }
+
+    public double getClusterUsagePercentage() {
+        return clusterUsagePercentage;
+    }
+
+    public void setClusterUsagePercentage(double clusterUsagePercentage) {
+        this.clusterUsagePercentage = clusterUsagePercentage;
+    }
+
     @Override
     public String toString() {
         return "AppInfo{"

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java
index 5513771..e5994aa 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImpl.java
@@ -20,25 +20,17 @@ import org.apache.eagle.jpm.util.Constants;
 
 public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
 
-    public String build(String... parameters) {
+    public String build(String url, String... parameters) {
         /**
          * {rmUrl}/ws/v1/cluster/apps?state=RUNNING.
          * We need to remove tailing slashes to avoid "url//ws/v1"
          * because it would not be found and would be redirected to
          * history server ui.
          */
-        String rmUrl = URLUtil.removeTrailingSlash(parameters[0]);
+        String rmUrl = URLUtil.removeTrailingSlash(url);
 
-        String restApi = null;
-        String jobState = parameters[1];
+        String restApi = parameters[0];
 
-        if (jobState.equals(Constants.JobState.RUNNING.name())) {
-            restApi = Constants.V2_APPS_RUNNING_URL;
-        } else if (jobState.equals(Constants.JobState.FINISHED.name())) {
-            restApi = Constants.V2_APPS_COMPLETED_URL;
-        } else if (jobState.equals(Constants.JobState.ALL.name())) {
-            restApi = Constants.V2_APPS_URL;
-        }
         if (restApi == null) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java
new file mode 100644
index 0000000..f0b963b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/RmActiveTestURLBuilderImpl.java
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util.resourcefetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class RmActiveTestURLBuilderImpl implements ServiceURLBuilder {
+    @Override
+    public String build(String url, String... parameters) {
+        String rmUrl = URLUtil.removeTrailingSlash(url);
+        return String.format("%s/%s&limit=1&%s", rmUrl, Constants.V2_APPS_COMPLETED_URL,
Constants.ANONYMOUS_PARAMETER);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java
index 09fea2f..1fc234f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/ServiceURLBuilder.java
@@ -17,5 +17,5 @@
 package org.apache.eagle.jpm.util.resourcefetch.url;
 
 public interface ServiceURLBuilder {
-    String build(String... parameters);
+    String build(String url, String... parameters);
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java
index ca6e938..063ac5f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkCompleteJobServiceURLBuilderImpl.java
@@ -23,11 +23,11 @@ import org.apache.eagle.jpm.util.Constants;
 
 public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder {
 
-    public String build(String... parameters) {
-        String url = URLUtil.removeTrailingSlash(parameters[0]);
+    public String build(String url, String... parameters) {
+        String newUrl = URLUtil.removeTrailingSlash(url);
 
-        return url + "/" + Constants.V2_APPS_URL
+        return newUrl + "/" + Constants.V2_APPS_URL
                 + "?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin="
-                + parameters[1] + "&" + Constants.ANONYMOUS_PARAMETER;
+                + parameters[0] + "&" + Constants.ANONYMOUS_PARAMETER;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java
index c5ec67a..20006f5 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/url/SparkJobServiceURLBuilderImpl.java
@@ -23,9 +23,9 @@ import org.apache.eagle.jpm.util.Constants;
 
 public class SparkJobServiceURLBuilderImpl implements ServiceURLBuilder {
 
-    public String build(String... parameters) {
-        String serverAddress = URLUtil.removeTrailingSlash(parameters[0]);
+    public String build(String url, String... parameters) {
+        String serverAddress = URLUtil.removeTrailingSlash(url);
 
-        return serverAddress + Constants.SPARK_APPS_URL + parameters[1];
+        return serverAddress + Constants.SPARK_APPS_URL + parameters[0];
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java
b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java
index 8c958a3..9d1f73c 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/ha/HAURLSelectorImplTest.java
@@ -44,7 +44,7 @@ public class HAURLSelectorImplTest {
     @Test
     public void testCheckUrl() throws Exception {
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream("http://www.xxx.com:8088", null, Constants.CompressionType.GZIP)).thenReturn(null);
         Assert.assertTrue(haurlSelector.checkUrl("http://www.xxx.com:8088"));
@@ -53,7 +53,7 @@ public class HAURLSelectorImplTest {
     @Test
     public void testCheckUrl1() throws Exception {
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream("http://www.xxx.com:8088", null, Constants.CompressionType.GZIP)).thenThrow(new
Exception());
         Assert.assertFalse(haurlSelector.checkUrl("http://www.xxx.com:8088"));
@@ -62,29 +62,29 @@ public class HAURLSelectorImplTest {
     @Test
     public void testGetSelectedUrl() {
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl());
     }
 
     @Test
     public void testReSelectUrl() throws Exception {
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenThrow(new Exception());
         when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenReturn(null);
-        haurlSelector.reSelectUrl();
+        haurlSelector.checkUrl();
         Assert.assertEquals(rmBasePaths[1], haurlSelector.getSelectedUrl());
     }
 
     @Test
     public void testReSelectUrl1() throws Exception {
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenReturn(null);
         when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenThrow(new Exception());
-        haurlSelector.reSelectUrl();
+        haurlSelector.checkUrl();
         Assert.assertEquals(rmBasePaths[0], haurlSelector.getSelectedUrl());
     }
 
@@ -93,10 +93,10 @@ public class HAURLSelectorImplTest {
     public void testReSelectUrl2() throws Exception {
         thrown.expect(IOException.class);
         String[] rmBasePaths = new String[]{"http://www.xxx.com:8088", "http://www.yyy.com:8088"};
-        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP);
+        HAURLSelectorImpl haurlSelector = new HAURLSelectorImpl(rmBasePaths, new JobListServiceURLBuilderImpl(),
Constants.CompressionType.GZIP, Constants.JobState.RUNNING);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenThrow(new Exception());
         when(InputStreamUtils.getInputStream("http://www.yyy.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
null, Constants.CompressionType.GZIP)).thenThrow(new Exception());
-        haurlSelector.reSelectUrl();
+        haurlSelector.checkUrl();
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java
b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java
index 6f9aa97..81d9309 100644
--- a/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java
+++ b/eagle-jpm/eagle-jpm-util/src/test/java/org/apache/eagle/jpm/util/resourcefetch/url/JobListServiceURLBuilderImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.jpm.util.resourcefetch.url;
 
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.Constants.JobState;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -25,13 +26,25 @@ public class JobListServiceURLBuilderImplTest {
     @Test
     public void testBuild() {
         JobListServiceURLBuilderImpl jobListServiceURLBuilderImpl = new JobListServiceURLBuilderImpl();
-        String finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/",
Constants.JobState.RUNNING.name());
+        String finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/",
convertRestApi(JobState.RUNNING));
         Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps?state=RUNNING&anonymous=true",
finalUrl);
-        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", Constants.JobState.FINISHED.name());
+        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(JobState.FINISHED));
         Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps?state=FINISHED&anonymous=true",
finalUrl);
-        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", Constants.JobState.ALL.name());
+        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(JobState.ALL));
         Assert.assertEquals("http://www.xxx.com:8088/ws/v1/cluster/apps&anonymous=true",
finalUrl);
-        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", "");
+        finalUrl = jobListServiceURLBuilderImpl.build("http://www.xxx.com:8088/", convertRestApi(null));
         Assert.assertEquals(null, finalUrl);
     }
+
+    private String convertRestApi(Constants.JobState jobState) {
+        if (jobState == null) {
+            return null;
+        }
+        switch (jobState) {
+            case RUNNING : return Constants.V2_APPS_RUNNING_URL;
+            case FINISHED : return Constants.V2_APPS_COMPLETED_URL;
+            case ALL : return Constants.V2_APPS_URL;
+            default : return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 1142e1b..8aa5d8e 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -59,17 +59,17 @@
             <description>number of sinks connected to alert engine</description>
         </property>
         <property>
-            <name>topology.resolverAPIUrl</name>
-            <displayName>Rack Resolver APIUrl</displayName>
-            <description>Use the URL to obtain a Node Object, from a node identified
by the nodeid value.</description>
-            <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value>
-        </property>
-        <property>
             <name>topology.rackResolverCls</name>
             <displayName>Rack Resolver Class</displayName>
             <description>rack resolver class</description>
             <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value>
         </property>
+        <property>
+            <name>topology.resolverAPIUrl</name>
+            <displayName>Rack Resolver APIUrl Required by ClusterNodeAPITopologyRackResolver</displayName>
+            <description>Use the URL to obtain a Node Object, from a node identified
by the nodeid value.</description>
+            <value>http://sandbox.hortonworks.com:8088/ws/v1/cluster/nodes</value>
+        </property>
 
         <property>
             <name>dataSourceConfig.hbase.enabled</name>


Mime
View raw message