Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A274F200B68 for ; Fri, 19 Aug 2016 11:50:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A0E7C160AAC; Fri, 19 Aug 2016 09:50:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7FB6D160A8E for ; Fri, 19 Aug 2016 11:50:35 +0200 (CEST) Received: (qmail 52320 invoked by uid 500); 19 Aug 2016 09:50:34 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 52311 invoked by uid 99); 19 Aug 2016 09:50:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 09:50:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EC9FEC007C for ; Fri, 19 Aug 2016 09:50:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.739 X-Spam-Level: X-Spam-Status: No, score=-3.739 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.519] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id KyG0IuLsZx_C for ; Fri, 19 Aug 2016 09:50:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 91AF35FB9A for ; Fri, 19 Aug 2016 09:50:27 +0000 (UTC) Received: (qmail 51005 invoked by uid 99); 19 Aug 2016 09:50:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 09:50:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70320E0252; Fri, 19 Aug 2016 09:50:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Date: Fri, 19 Aug 2016 09:50:26 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] incubator-eagle git commit: [EAGLE-467] Job list apis for querying jobs regardless of the status [Forced Update!] archived-at: Fri, 19 Aug 2016 09:50:37 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop a20656b5e -> d6ec142d3 (forced update) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 9f993a6..2accad8 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -20,12 +20,13 @@ package org.apache.eagle.jpm.mr.running.parser; import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager; -import org.apache.eagle.jpm.mr.running.entities.JobConfig; -import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity; -import org.apache.eagle.jpm.mr.running.entities.TaskAttemptExecutionAPIEntity; -import org.apache.eagle.jpm.mr.running.entities.TaskExecutionAPIEntity; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; +import org.apache.eagle.jpm.mr.runningentity.JobConfig; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.JobNameNormalization; import org.apache.eagle.jpm.util.MRJobTagName; import org.apache.eagle.jpm.util.Utils; import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; @@ -118,7 +119,7 @@ public class MRJobParser implements Runnable { private void finishMRJob(String mrJobId) { JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId); - jobExecutionAPIEntity.setStatus(Constants.AppState.FINISHED.toString()); + jobExecutionAPIEntity.setCurrentState(Constants.AppState.FINISHED.toString()); mrJobConfigs.remove(mrJobId); if (mrJobConfigs.size() == 0) { this.parserStatus = ParserStatus.APP_FINISHED; @@ -186,19 +187,20 @@ public class MRJobParser implements Runnable { mrJobEntityMap.put(id, new JobExecutionAPIEntity()); } + String jobDefId = JobNameNormalization.getInstance().normalize(mrJob.getName()); JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(id); jobExecutionAPIEntity.setTags(new HashMap<>(commonTags)); jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_ID.toString(), id); jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), mrJob.getName()); - jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), mrJob.getName()); + jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId); jobExecutionAPIEntity.setTimestamp(app.getStartedTime()); jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime()); jobExecutionAPIEntity.setStartTime(mrJob.getStartTime()); - jobExecutionAPIEntity.setElapsedTime(mrJob.getElapsedTime()); - jobExecutionAPIEntity.setStatus(mrJob.getState()); - jobExecutionAPIEntity.setMapsTotal(mrJob.getMapsTotal()); + jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime()); + jobExecutionAPIEntity.setCurrentState(mrJob.getState()); + jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal()); jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted()); - jobExecutionAPIEntity.setReducesTotal(mrJob.getReducesTotal()); + jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal()); jobExecutionAPIEntity.setReducesCompleted(mrJob.getReducesCompleted()); jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress()); jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress()); @@ -220,7 +222,6 @@ public class MRJobParser implements Runnable { jobExecutionAPIEntity.setAllocatedMB(app.getAllocatedMB()); jobExecutionAPIEntity.setAllocatedVCores(app.getAllocatedVCores()); jobExecutionAPIEntity.setRunningContainers(app.getRunningContainers()); - runningJobManager.update(app.getId(), id, jobExecutionAPIEntity); } return true; @@ -434,10 +435,10 @@ public class MRJobParser implements Runnable { taskExecutionAPIEntity.setTimestamp(app.getStartedTime()); taskExecutionAPIEntity.setStartTime(task.getStartTime()); - taskExecutionAPIEntity.setFinishTime(task.getFinishTime()); - taskExecutionAPIEntity.setElapsedTime(task.getElapsedTime()); + taskExecutionAPIEntity.setEndTime(task.getFinishTime()); + taskExecutionAPIEntity.setDuration(task.getElapsedTime()); taskExecutionAPIEntity.setProgress(task.getProgress()); - taskExecutionAPIEntity.setStatus(task.getState()); + taskExecutionAPIEntity.setTaskStatus(task.getState()); taskExecutionAPIEntity.setSuccessfulAttempt(task.getSuccessfulAttempt()); taskExecutionAPIEntity.setStatusDesc(task.getStatus()); @@ -449,7 +450,8 @@ public class MRJobParser implements Runnable { TaskAttemptExecutionAPIEntity taskAttemptExecutionAPIEntity = fetchTaskAttempt.apply(Pair.of(jobId, task.getId())); if (taskAttemptExecutionAPIEntity != null) { - taskExecutionAPIEntity.setHost(taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString())); + taskExecutionAPIEntity.getTags().put(MRJobTagName.HOSTNAME.toString(), taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString())); + //taskExecutionAPIEntity.setHost(taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString())); } } @@ -503,6 +505,7 @@ public class MRJobParser implements Runnable { mrJobConfigs.put(jobId, config); mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId)); + runningJobManager.update(app.getId(), jobId, mrJobEntityMap.get(jobId)); } catch (Exception e) { LOG.warn("fetch job conf from {} failed, {}", confURL, e); return false; @@ -537,8 +540,8 @@ public class MRJobParser implements Runnable { mrJobEntityMap.keySet() .stream() .filter( - jobId -> mrJobEntityMap.get(jobId).getStatus().equals(Constants.AppState.FINISHED.toString()) || - mrJobEntityMap.get(jobId).getStatus().equals(Constants.AppState.FAILED.toString())) + jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString()) || + mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString())) .forEach( jobId -> this.runningJobManager.delete(app.getId(), jobId)); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java index d7b84cc..76d2a19 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics; -import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.GenericMetricEntity; @@ -52,7 +52,7 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation @Override public String buildMetricName(String field) { - return String.format(Constants.metricFormat, Constants.JOB_LEVEL, field); + return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java index 6d9525e..d0b0d57 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics; -import org.apache.eagle.jpm.mr.running.entities.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.GenericMetricEntity; @@ -33,13 +33,13 @@ public class TaskExecutionMetricsCreationListener extends AbstractMetricsCreatio if (entity != null) { Long currentTime = System.currentTimeMillis(); Map tags = entity.getTags(); - metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getElapsedTime(), tags)); + metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getDuration(), tags)); } return metrics; } @Override public String buildMetricName(String field) { - return String.format(Constants.metricFormat, Constants.TASK_LEVEL, field); + return String.format(Constants.hadoopMetricFormat, Constants.TASK_LEVEL, field); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java index 352696b..978c3ec 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java @@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.running.recover; import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager; -import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java index 1703b25..a701d74 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java @@ -25,8 +25,8 @@ import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager; -import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.Utils; import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index 92dfbe3..51307e1 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -24,9 +24,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager; -import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher; import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf index 5c0d8f9..f15fc2d 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf @@ -63,6 +63,7 @@ }, "MRConfigureKeys" : { + "jobNameKey" : "eagle.job.name", "jobConfigKey" : [ "mapreduce.map.output.compress", "mapreduce.map.output.compress.codec", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-service/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml new file mode 100644 index 0000000..e218435 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/pom.xml @@ -0,0 +1,48 @@ + + + + + + eagle-jpm-parent + org.apache.eagle + 0.5.0-incubating-SNAPSHOT + + 4.0.0 + + eagle-jpm-service + + + + org.apache.eagle + eagle-jpm-util + ${project.version} + + + org.apache.eagle + eagle-service-base + ${project.version} + + + org.apache.eagle + eagle-jpm-entity + ${project.version} + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java new file mode 100644 index 0000000..be90456 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.service.jpm; + +import org.apache.commons.lang.time.StopWatch; +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.generic.GenericEntityServiceResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.util.*; + +import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID; + +@Path("mrJobs") +public class MRJobExecutionResource { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + public final static String ELAPSEDMS = "elapsedms"; + public final static String TOTAL_RESULTS = "totalResults"; + + private final static Logger LOG = LoggerFactory.getLogger(MRJobExecutionResource.class); + + @GET + @Produces(MediaType.APPLICATION_JSON) + public GenericServiceAPIResponseEntity listJobs(@QueryParam("query") String query, + @QueryParam("startTime") String startTime, @QueryParam("endTime") String endTime, + @QueryParam("pageSize") int pageSize, @QueryParam("startRowkey") String startRowkey, + @QueryParam("treeAgg") boolean treeAgg, @QueryParam("timeSeries") boolean timeSeries, + @QueryParam("intervalmin") long intervalmin, @QueryParam("top") int top, + @QueryParam("filterIfMissing") boolean filterIfMissing, + @QueryParam("parallel") int parallel, + @QueryParam("metricName") String metricName, + @QueryParam("verbose") Boolean verbose) { + GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity(); + + List jobs = new ArrayList<>(); + List finishedJobs = new ArrayList<>(); + Set jobIds = new HashSet<>(); + Map meta = new HashMap<>(); + StopWatch stopWatch = new StopWatch(); + + stopWatch.start(); + String jobQuery = String.format(query, Constants.JPA_JOB_EXECUTION_SERVICE_NAME); + GenericServiceAPIResponseEntity res = + resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, + top,filterIfMissing, parallel, metricName, verbose); + if (res.isSuccess() && res.getObj() != null) { + for (TaggedLogAPIEntity o : res.getObj()) { + finishedJobs.add(o); + jobIds.add(o.getTags().get(JOB_ID.toString())); + } + jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME); + res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, + top,filterIfMissing, parallel, metricName, verbose); + if (res.isSuccess() && res.getObj() != null) { + for (TaggedLogAPIEntity o : res.getObj()) { + if (! isDuplicate(jobIds, o)) { + jobs.add(o); + } + } + jobs.addAll(finishedJobs); + } + } + stopWatch.stop(); + if (res.isSuccess()) { + response.setSuccess(true); + } else { + response.setSuccess(false); + response.setException(new Exception(res.getException())); + } + meta.put(TOTAL_RESULTS, jobs.size()); + meta.put(ELAPSEDMS,stopWatch.getTime()); + response.setObj(jobs); + response.setMeta(meta); + return response; + + } + + private boolean isDuplicate(Set keys, TaggedLogAPIEntity o) { + if (keys.isEmpty()) { + return false; + } + return keys.contains(o.getTags().get(JOB_ID.toString())); + } + + private String buildCondition(String jobId, String jobDefId, String site) { + String conditionFormat = "@site=\"%s\"" ; + String condition = null; + if (jobDefId != null) { + conditionFormat = conditionFormat + " AND @jobDefId=\"%s\""; + condition = String.format(conditionFormat, site, jobDefId); + } + if (jobId != null) { + conditionFormat = conditionFormat + " AND @jobId=\"%s\""; + condition = String.format(conditionFormat, site, jobId); + } + return condition; + } + + @GET + @Path("search") + @Produces(MediaType.APPLICATION_JSON) + public GenericServiceAPIResponseEntity searchJobsById(@QueryParam("jobId") String jobId, + @QueryParam("jobDefId") String jobDefId, + @QueryParam("site") String site) { + GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity(); + if ((jobId == null && jobDefId == null) || site == null) { + response.setException(new IllegalArgumentException("Error: (jobId == null && jobDefId == null) || site == null")); + response.setSuccess(false); + return response; + } + + List jobs = new ArrayList<>(); + Set jobIds = new HashSet<>(); + String condition = buildCondition(jobId, jobDefId, site); + int pageSize = Integer.MAX_VALUE; + if (condition == null) { + response.setException(new Exception("Search condition is empty")); + response.setSuccess(false); + return response; + } + LOG.debug("search condition=" + condition); + + Map meta = new HashMap<>(); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + String queryFormat = "%s[%s]{*}"; + String queryString = String.format(queryFormat, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition); + GenericServiceAPIResponseEntity res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + if (res.isSuccess() && res.getObj() != null) { + for (TaggedLogAPIEntity o : res.getObj()) { + jobs.add(o); + jobIds.add(o.getTags().get(JOB_ID.toString())); + } + } + queryString = String.format(queryFormat, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition); + res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + if (res.isSuccess() && res.getObj() != null) { + for (TaggedLogAPIEntity o : res.getObj()) { + if (! isDuplicate(jobIds, o)) { + jobs.add(o); + } + } + } + if (jobs.size() > 0) { + Collections.sort(jobs, new Comparator() { + @Override + public int compare(TaggedLogAPIEntity o1, TaggedLogAPIEntity o2) { + return o1.getTimestamp() > o2.getTimestamp() ? 1 : (o1.getTimestamp() == o2.getTimestamp() ? 0 : -1); + } + }); + } + stopWatch.stop(); + if (res.isSuccess()) { + response.setSuccess(true); + } else { + response.setSuccess(false); + response.setException(new Exception(res.getException())); + } + meta.put(TOTAL_RESULTS, jobs.size()); + meta.put(ELAPSEDMS,stopWatch.getTime()); + response.setObj(jobs); + response.setMeta(meta); + return response; + } + + public List parseTimeList(String timelist) { + List times = new ArrayList<>(); + String [] strs = timelist.split("[,\\s]"); + for (String str : strs) { + try { + times.add(Long.parseLong(str)); + } catch (Exception ex) { + LOG.warn(str + " is not a number"); + } + } + return times; + } + + public int getPosition(List times, Long duration) { + duration = duration / 1000; + for (int i = 1; i < times.size(); i++) { + if (duration < times.get(i)) { + return i - 1; + } + } + return times.size() - 1; + } + + public void getTopTasks(List list, long top) { + for (MRJobTaskGroupResponse.UnitTaskCount taskCounter : list) { + Iterator iterator = taskCounter.entities.iterator(); + for (int i = 0; i < top && iterator.hasNext(); i++) { + taskCounter.topEntities.add(iterator.next()); + } + taskCounter.entities.clear(); + } + } + + public void initTaskCountList(List runningTaskCount, + List finishedTaskCount, + List times, + Comparator comparator) { + for (int i = 0; i < times.size(); i++) { + runningTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator)); + finishedTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator)); + } + } + + @GET + @Path("{jobId}/taskCounts") + @Produces(MediaType.APPLICATION_JSON) + public MRJobTaskGroupResponse getTaskCounts(@PathParam("jobId") String jobId, + @QueryParam("site") String site, + @QueryParam("timelineInSecs") String timeList, + @QueryParam("top") long top) { + MRJobTaskGroupResponse response = new MRJobTaskGroupResponse(); + if (jobId == null || site == null || timeList == null || timeList.isEmpty()) { + response.errMessage = "IllegalArgumentException: jobId == null || site == null || timelineInSecs == null or isEmpty"; + return response; + } + List runningTaskCount = new ArrayList<>(); + List finishedTaskCount = new ArrayList<>(); + + List times = parseTimeList(timeList); + String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId); + GenericServiceAPIResponseEntity history_res = + resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + if (history_res.isSuccess() && history_res.getObj() != null && history_res.getObj().size() > 0) { + initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator()); + for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : history_res.getObj()) { + int index = getPosition(times, o.getDuration()); + MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } + } else { + query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId); + GenericServiceAPIResponseEntity running_res = + resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + if (running_res.isSuccess() && running_res.getObj() != null) { + initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator()); + for (TaskExecutionAPIEntity o : running_res.getObj()) { + int index = getPosition(times, o.getDuration()); + if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { + MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } else if (o.getEndTime() != 0) { + MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } + } + } + } + if (top > 0) { + getTopTasks(runningTaskCount, top); + response.runningTaskCount = runningTaskCount; + getTopTasks(finishedTaskCount, top); + response.finishedTaskCount = finishedTaskCount; + } + return response; + } + + static class RunningTaskComparator implements Comparator { + @Override + public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + } + } + + static class HistoryTaskComparator implements Comparator { + @Override + public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1, + org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java new file mode 100644 index 0000000..3be9b43 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import java.util.*; + +class MRJobTaskGroupResponse { + public List runningTaskCount; + public List finishedTaskCount; + public String errMessage; + + static class UnitTaskCount { + public long timeBucket; + public int taskCount; + public Set entities; + public List topEntities; + + UnitTaskCount(long timeBucket, Comparator comparator) { + this.timeBucket = timeBucket; + this.taskCount = 0; + entities = new TreeSet<>(comparator); + topEntities = new ArrayList<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java new file mode 100644 index 0000000..824556b --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class TestMRJobExecutionResource { + + @Test + public void test() { + MRJobExecutionResource resource = new MRJobExecutionResource(); + String timeList = " 0, 10,20,40 "; + List times = resource.parseTimeList(timeList); + Assert.assertTrue(times.size() == 4); + + long val = 25 * 1000; + int index = resource.getPosition(times, val); + Assert.assertTrue(index == 2); + } + + @Test + public void test2() { + MRJobExecutionResource resource = new MRJobExecutionResource(); + String timeList = " 0, 10,20,40 "; + List times = resource.parseTimeList(timeList); + + TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity(); + test1.setDuration(15 * 1000); + test1.setTaskStatus("running"); + TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity(); + test4.setDuration(13 * 1000); + test4.setTaskStatus("running"); + TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity(); + test2.setDuration(0 * 1000); + test2.setEndTime(100); + test2.setTaskStatus("x"); + TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity(); + test3.setDuration(19 * 1000); + test3.setTaskStatus("running"); + TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity(); + test5.setDuration(20 * 1000); + test5.setEndTime(28); + test5.setTaskStatus("x"); + List tasks = new ArrayList<>(); + tasks.add(test1); + tasks.add(test2); + tasks.add(test3); + tasks.add(test4); + tasks.add(test5); + + List runningTaskCount = new ArrayList<>(); + List finishedTaskCount = new ArrayList<>(); + + Comparator comparator = new MRJobExecutionResource.RunningTaskComparator(); + resource.initTaskCountList(runningTaskCount, finishedTaskCount, times, comparator); + + for (TaskExecutionAPIEntity o : tasks) { + int index = resource.getPosition(times, o.getDuration()); + if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { + MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } else if (o.getEndTime() != 0) { + MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } + } + int top = 2; + if (top > 0) { + resource.getTopTasks(runningTaskCount, top); + } + Assert.assertTrue(runningTaskCount.get(1).taskCount == 3); + Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-spark-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml index d11d3d5..2e5a657 100644 --- a/eagle-jpm/eagle-jpm-spark-history/pom.xml +++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml @@ -17,95 +17,114 @@ --> - 4.0.0 - - org.apache.eagle - eagle-jpm-parent - 0.5.0-incubating-SNAPSHOT - ../pom.xml - - eagle-jpm-spark-history - eagle-jpm-spark-history - http://maven.apache.org - - - org.apache.eagle - eagle-jpm-util - ${project.version} - - - org.apache.eagle - eagle-jpm-entity - ${project.version} - + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + 4.0.0 + + org.apache.eagle + eagle-jpm-parent + 0.5.0-incubating-SNAPSHOT + ../pom.xml + + eagle-jpm-spark-history + eagle-jpm-spark-history + http://maven.apache.org + + + org.apache.storm + storm-core + ${storm.version} + + + ch.qos.logback + logback-classic + + + log4j + log4j + + + org.slf4j + log4j-over-slf4j + + + + + org.apache.eagle + eagle-jpm-util + ${project.version} + + + org.apache.eagle + eagle-jpm-entity + ${project.version} + - - jline - jline - 2.12 - - - org.apache.curator - curator-recipes - ${curator.version} - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - - - org.apache.hadoop - hadoop-annotations - ${hadoop.version} - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.version} - - - org.apache.hadoop - hadoop-auth - ${hadoop.version} - - - org.apache.hadoop - hadoop-mapreduce-client-app - ${hadoop.version} - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${hadoop.version} - - - - - - src/main/resources - - - - - maven-assembly-plugin - - src/assembly/eagle-jpm-spark-history-assembly.xml - eagle-jpm-spark-history-${project.version} - - - - package - - single - - - posix - - - - - - + + jline + jline + 2.12 + + + org.apache.curator + curator-recipes + ${curator.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-annotations + ${hadoop.version} + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + org.apache.hadoop + hadoop-auth + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-app + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + + src/main/resources + + + + + maven-assembly-plugin + + src/assembly/eagle-jpm-spark-history-assembly.xml + eagle-jpm-spark-history-${project.version} + + + + package + + single + + + posix + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-util/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml index 3df73e7..6a8dd8b 100644 --- a/eagle-jpm/eagle-jpm-util/pom.xml +++ b/eagle-jpm/eagle-jpm-util/pom.xml @@ -36,25 +36,6 @@ 1.1.1 - org.apache.storm - storm-core - ${storm.version} - - - ch.qos.logback - logback-classic - - - log4j - log4j - - - org.slf4j - log4j-over-slf4j - - - - org.apache.commons commons-lang3 @@ -63,7 +44,6 @@ commons-codec 1.9 - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index b819340..07850f9 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -88,6 +88,9 @@ public class Constants { COMPLETE_MR_JOB } + public static final String TASK_RUNNING = "RUNNING"; + public static final String TASK_FINISHED = "FINISHED"; + //MR public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService"; public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService"; @@ -154,7 +157,7 @@ public class Constants { TOTAL_LAUNCHED_MAPS } - public static final String metricFormat = "%s.%s"; + public static final String hadoopMetricFormat = "hadoop.%s.%s"; public static final String ALLOCATED_MB = "allocatedmb"; public static final String ALLOCATED_VCORES = "allocatedvcores"; public static final String RUNNING_CONTAINERS = "runningcontainers"; @@ -162,6 +165,4 @@ public class Constants { public static final String JOB_LEVEL = "job"; public static final String TASK_LEVEL = "task"; - public static final String JOB_DEFINITION_ID_KEY = "jobDefId"; - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java index 7a613eb..12eb1b5 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.util; -import jline.internal.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +82,7 @@ public class Utils { int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1)); return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB; } - Log.info("Cannot parse memory info " + memory); + LOG.info("Cannot parse memory info " + memory); return 0l; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java index 44336e2..2b49f9f 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java @@ -15,7 +15,7 @@ * limitations under the License. */ /** - * + * */ package org.apache.eagle.jpm.util.resourceFetch; @@ -42,37 +42,37 @@ import java.io.InputStream; import java.util.List; public class RMResourceFetcher implements ResourceFetcher { - + private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class); private final HAURLSelector selector; private final ServiceURLBuilder jobListServiceURLBuilder; private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); - + static { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); } - + public RMResourceFetcher(String[] RMBasePaths) { this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl(); this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl(); this.selector = new HAURLSelectorImpl(RMBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP); } - + private void checkUrl() throws IOException { if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) { selector.reSelectUrl(); } } - - private List doFetchFinishApplicationsList(String urlString) throws Exception { + + private List doFetchFinishApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception { List result; InputStream is = null; try { checkUrl(); LOG.info("Going to call yarn api to fetch finished application list: " + urlString); - is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); + is = InputStreamUtils.getInputStream(urlString, null, compressionType); final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) { @@ -85,65 +85,73 @@ public class RMResourceFetcher implements ResourceFetcher { } } - private String getSparkRunningJobURL() { + private String getSparkRunningJobURL() { StringBuilder sb = new StringBuilder(); sb.append(selector.getSelectedUrl()).append("/").append(Constants.V2_APPS_URL); sb.append("?applicationTypes=SPARK&state=RUNNING&"); sb.append(Constants.ANONYMOUS_PARAMETER); return sb.toString(); - } - - private String getMRRunningJobURL() { - return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s", - selector.getSelectedUrl(), - Constants.V2_APPS_URL, - Constants.ANONYMOUS_PARAMETER); - } - - public String getMRFinishedJobURL(String lastFinishedTime) { - String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); - StringBuilder sb = new StringBuilder(); - sb.append(url).append("/").append(Constants.V2_APPS_URL); - sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="); - sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER); - - return sb.toString(); - } - - private List doFetchRunningApplicationsList(String urlString) throws Exception { - List result; - InputStream is = null; - try { - checkUrl(); - LOG.info("Going to call yarn api to fetch running application list: " + urlString); - is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP); - final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); - if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) { - result = appWrapper.getApps().getApp(); - return result; - } - return null; - } finally { - if (is != null) { try { is.close();} catch (Exception e) { } } - } - } - - public List getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{ - switch(resoureType) { + } + + private String getMRRunningJobURL() { + return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s", + selector.getSelectedUrl(), + Constants.V2_APPS_URL, + Constants.ANONYMOUS_PARAMETER); + } + + public String getMRFinishedJobURL(String lastFinishedTime) { + String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); + StringBuilder sb = new StringBuilder(); + sb.append(url).append("/").append(Constants.V2_APPS_URL); + sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="); + sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER); + + return sb.toString(); + } + + private List doFetchRunningApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception { + List result; + InputStream is = null; + try { + checkUrl(); + LOG.info("Going to call yarn api to fetch running application list: " + urlString); + is = InputStreamUtils.getInputStream(urlString, null, compressionType); + final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class); + if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) { + result = appWrapper.getApps().getApp(); + return result; + } + return null; + } finally { + if (is != null) { try { is.close();} catch (Exception e) { } } + } + } + + private List getResource(Constants.ResourceType resoureType, Constants.CompressionType compressionType, Object... parameter) throws Exception { + switch (resoureType) { case COMPLETE_SPARK_JOB: - final String urlString = sparkCompleteJobServiceURLBuilder.build((String)parameter[0]); - return doFetchFinishApplicationsList(urlString); + final String urlString = sparkCompleteJobServiceURLBuilder.build((String) parameter[0]); + return doFetchFinishApplicationsList(urlString, compressionType); case RUNNING_SPARK_JOB: - return doFetchRunningApplicationsList(getSparkRunningJobURL()); - case RUNNING_MR_JOB: - return doFetchRunningApplicationsList(getMRRunningJobURL()); - case COMPLETE_MR_JOB: - return doFetchFinishApplicationsList(getMRFinishedJobURL((String)parameter[0])); + return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType); + case RUNNING_MR_JOB: + return doFetchRunningApplicationsList(getMRRunningJobURL(), compressionType); + case COMPLETE_MR_JOB: + return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType); default: throw new Exception("Not support resourceType :" + resoureType); } } + public List getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception { + try { + return getResource(resoureType, Constants.CompressionType.GZIP, parameter); + } catch (java.util.zip.ZipException ex) { + return getResource(resoureType, Constants.CompressionType.NONE, parameter); + } + } + private String getClusterInfoURL() { StringBuilder sb = new StringBuilder(); sb.append(selector.getSelectedUrl()).append("/").append(Constants.YARN_API_CLUSTER_INFO).append("?" + Constants.ANONYMOUS_PARAMETER); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java index 6518ca1..4052ed0 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java @@ -35,7 +35,7 @@ public class HAURLSelectorImpl implements HAURLSelector { private volatile boolean reselectInProgress; private final Constants.CompressionType compressionType; - private static final long MAX_RETRY_TIME = 3; + private static final long MAX_RETRY_TIME = 2; private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class); public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) { @@ -86,7 +86,7 @@ public class HAURLSelectorImpl implements HAURLSelector { } LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. "); try { - Thread.sleep(5 * 1000); + Thread.sleep(1 * 1000); } catch (InterruptedException ex) { /* Do Nothing */} } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-jpm/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml index c48cdeb..91ef154 100644 --- a/eagle-jpm/pom.xml +++ b/eagle-jpm/pom.xml @@ -40,6 +40,7 @@ eagle-jpm-util eagle-jpm-mr-running eagle-jpm-app + eagle-jpm-service http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d6ec142d/eagle-security/eagle-security-hive/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/pom.xml b/eagle-security/eagle-security-hive/pom.xml index a2e5a32..5f8f3b0 100644 --- a/eagle-security/eagle-security-hive/pom.xml +++ b/eagle-security/eagle-security-hive/pom.xml @@ -17,82 +17,82 @@ --> - 4.0.0 - - org.apache.eagle - eagle-security-parent - 0.5.0-incubating-SNAPSHOT - - eagle-security-hive - eagle-security-hive - http://maven.apache.org - jar + 4.0.0 + + org.apache.eagle + eagle-security-parent + 0.5.0-incubating-SNAPSHOT + + eagle-security-hive + eagle-security-hive + http://maven.apache.org + jar - - - org.apache.curator - curator-framework - - - org.apache.eagle - eagle-jpm-util - ${project.version} - - - org.jsoup - jsoup - - - org.apache.curator - curator-recipes - - - org.apache.curator - curator-client - - - org.apache.eagle - eagle-embed-server - ${project.version} - test - - - org.apache.eagle - eagle-embed-server - ${project.version} - tests - test - - - org.apache.eagle - eagle-embed-hbase - ${project.version} - tests - test - - - org.apache.eagle - eagle-embed-hbase - ${project.version} - test - - - org.apache.eagle - eagle-security-common - ${project.version} - - - org.apache.hive - hive-exec - - - org.scala-lang - scala-library - - - org.apache.eagle - eagle-app-base - ${project.version} - - + + + org.apache.curator + curator-framework + + + org.apache.eagle + eagle-jpm-util + ${project.version} + + + org.jsoup + jsoup + + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-client + + + org.apache.eagle + eagle-embed-server + ${project.version} + test + + + org.apache.eagle + eagle-embed-server + ${project.version} + tests + test + + + org.apache.eagle + eagle-embed-hbase + ${project.version} + tests + test + + + org.apache.eagle + eagle-embed-hbase + ${project.version} + test + + + org.apache.eagle + eagle-security-common + ${project.version} + + + org.apache.hive + hive-exec + + + org.scala-lang + scala-library + + + org.apache.eagle + eagle-app-base + ${project.version} + +