Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9C34F200B88 for ; Wed, 7 Sep 2016 19:42:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9AC13160ABF; Wed, 7 Sep 2016 17:42:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8628F160AC1 for ; Wed, 7 Sep 2016 19:42:38 +0200 (CEST) Received: (qmail 16741 invoked by uid 500); 7 Sep 2016 17:42:37 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 16732 invoked by uid 99); 7 Sep 2016 17:42:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:42:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 30DC7C0D53 for ; Wed, 7 Sep 2016 17:42:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.645 X-Spam-Level: X-Spam-Status: No, score=-4.645 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, WEIRD_PORT=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 8dHXROWGURHV for ; Wed, 7 Sep 2016 17:42:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 638B760E40 for ; Wed, 7 Sep 2016 17:41:59 +0000 (UTC) Received: (qmail 11936 invoked by uid 99); 7 Sep 2016 17:41:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:41:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 08D10EEE1C; Wed, 7 Sep 2016 17:41:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 07 Sep 2016 17:42:23 -0000 Message-Id: <99829470c0ac40a3aac32944dea3bb00@git.apache.org> In-Reply-To: <7201ad3b09274d489e31c344a4940ff4@git.apache.org> References: <7201ad3b09274d489e31c344a4940ff4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/52] [abbrv] incubator-eagle git commit: [EAGLE-514] Add two job count apis archived-at: Wed, 07 Sep 2016 17:42:40 -0000 [EAGLE-514] Add two job count apis https://issues.apache.org/jira/browse/EAGLE-514 1. adding two job counting apis 2. add tracking url in running/history job execution entity 3. unify the status presentation for job/task execution entity 4. unify the name of the common fields between running job entity & history job entity Author: Qingwen Zhao Closes #408 from qingwen220/jobStats. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a66f64cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a66f64cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a66f64cf Branch: refs/heads/master Commit: a66f64cf9f4212f4923f0e6ea6c7270449aa2ce4 Parents: 4aa5b45 Author: Qingwen Zhao Authored: Wed Aug 31 14:21:22 2016 +0800 Committer: Qingwen Zhao Committed: Wed Aug 31 14:21:22 2016 +0800 ---------------------------------------------------------------------- .../mr/historyentity/JobExecutionAPIEntity.java | 11 + .../mr/runningentity/JobExecutionAPIEntity.java | 36 ++- .../jpm/mr/history/MRHistoryJobConfig.java | 2 + .../history/crawler/JHFCrawlerDriverImpl.java | 2 +- .../jpm/mr/history/parser/EagleJobStatus.java | 2 +- .../mr/history/parser/JHFEventReaderBase.java | 21 +- .../mr/history/parser/JHFMRVer1EventReader.java | 5 +- .../mr/history/parser/JHFMRVer2EventReader.java | 15 +- .../jpm/mr/history/parser/JHFParserFactory.java | 4 +- .../JobEntityCreationEagleServiceListener.java | 6 +- .../src/main/resources/application.conf | 1 + .../jpm/mr/running/parser/MRJobParser.java | 5 +- .../eagle/service/jpm/MRJobCountHelper.java | 121 ++++++++ .../service/jpm/MRJobExecutionResource.java | 286 +++++++++++++------ .../service/jpm/MRJobTaskCountResponse.java | 65 +++++ .../service/jpm/MRJobTaskGroupResponse.java | 41 --- .../service/jpm/TaskCountByDurationHelper.java | 106 +++++++ .../jpm/TestJobCountPerBucketHelper.java | 87 ++++++ .../service/jpm/TestMRJobExecutionResource.java | 99 ------- .../service/jpm/TestTaskCountPerJobHelper.java | 96 +++++++ .../org/apache/eagle/jpm/util/Constants.java | 2 + 21 files changed, 752 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java index 97e77b2..cdc5810 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java @@ -89,6 +89,17 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity { private int totalReduceAttempts; @Column("ac") private int failedReduceAttempts; + @Column("ad") + private String trackingUrl; + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } public long getDurationTime() { return durationTime; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java index dd81eb4..245fc0f 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java @@ -49,11 +49,11 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity { @Column("e") private int numTotalMaps; @Column("f") - private int mapsCompleted; + private int numFinishedMaps; @Column("g") private int numTotalReduces; @Column("h") - private int reducesCompleted; + private int numFinishedReduces; @Column("i") private double mapProgress; @Column("j") @@ -112,6 +112,18 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity { private long submissionTime; @Column("ak") private String internalState; + @Column("al") + private String trackingUrl; + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } + public JobConfig getJobConfig() { return jobConfig; @@ -176,13 +188,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity { valueChanged("numTotalMaps"); } - public int getMapsCompleted() { - return mapsCompleted; + public int getNumFinishedMaps() { + return numFinishedMaps; } - public void setMapsCompleted(int mapsCompleted) { - this.mapsCompleted = mapsCompleted; - valueChanged("mapsCompleted"); + public void setNumFinishedMaps(int numFinishedMaps) { + this.numFinishedMaps = numFinishedMaps; + valueChanged("numFinishedMaps"); } public int getNumTotalReduces() { @@ -194,13 +206,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity { valueChanged("numTotalReduces"); } - public int getReducesCompleted() { - return reducesCompleted; + public int getNumFinishedReduces() { + return numFinishedReduces; } - public void setReducesCompleted(int reducesCompleted) { - this.reducesCompleted = reducesCompleted; - valueChanged("reducesCompleted"); + public void setNumFinishedReduces(int numFinishedReduces) { + this.numFinishedReduces = numFinishedReduces; + valueChanged("numFinishedReduces"); } public double getMapProgress() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java index ae86904..c0943de 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java @@ -86,6 +86,7 @@ public class MRHistoryJobConfig implements Serializable { public static class JobHistoryEndpointConfig implements Serializable { public String nnEndpoint; + public String mrHistoryServerUrl; public String basePath; public boolean pathContainsJobTrackerName; public String jobTrackerName; @@ -173,6 +174,7 @@ public class MRHistoryJobConfig implements Serializable { this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath"); this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName"); this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint"); + this.jobHistoryEndpointConfig.mrHistoryServerUrl = config.getString("dataSourceConfig.mrHistoryServerUrl"); this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName"); this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal"); this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index e16ecce..1a17751 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -247,7 +247,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { JobCountEntity entity = new JobCountEntity(); entity.setTotal(jobs.size()); entity.setFail(0); - jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCESS.toString())).forEach( + jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach( job -> entity.setFail(1 + entity.getFail()) ); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java index fb218e3..24fa097 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java @@ -23,7 +23,7 @@ public enum EagleJobStatus { LAUNCHED, PREP, RUNNING, - SUCCESS, + SUCCEEDED, KILLED, FAILED; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 6916aad..1570956 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -18,6 +18,8 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.commons.io.FilenameUtils; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.*; @@ -32,6 +34,8 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -65,6 +69,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl protected String queueName; protected Long jobLaunchTime; protected JobHistoryContentFilter filter; + private JobHistoryEndpointConfig jobHistoryEndpointConfig; protected final List jobEntityLifecycleListeners = new ArrayList<>(); @@ -96,8 +101,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl * * @param baseTags */ - public JHFEventReaderBase(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { + public JHFEventReaderBase(Map baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { this.filter = filter; + this.jobHistoryEndpointConfig = jobHistoryEndpointConfig; this.baseTags = baseTags; jobSubmitEventEntity = new JobEventAPIEntity(); @@ -155,6 +161,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } } + private String buildJobTrackingUrl(String jobId) { + String jobTrackingUrlBase = this.jobHistoryEndpointConfig.mrHistoryServerUrl + "/jobhistory/job/"; + try { + URI oldUri = new URI(jobTrackingUrlBase); + URI resolved = oldUri.resolve(jobId); + return resolved.toString(); + } catch (URISyntaxException e) { + LOG.warn("Tracking url build failed with baseURL=%s, resolvePart=%s", jobTrackingUrlBase, jobId); + return jobTrackingUrlBase; + } + } + /** * ... * @param id @@ -236,6 +254,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), queueName); jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType); + jobExecutionEntity.setTrackingUrl(buildJobTrackingUrl(jobId)); jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS)); jobExecutionEntity.setStartTime(jobLaunchEventEntity.getTimestamp()); jobExecutionEntity.setEndTime(jobFinishEventEntity.getTimestamp()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java index e20836f..0e9458a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; @@ -48,8 +49,8 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer * * @param baseTags */ - public JHFMRVer1EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { - super(baseTags, configuration, filter); + public JHFMRVer1EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { + super(baseTags, configuration, filter, jobHistoryEndpointConfig); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index f21fd41..74f84f6 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.parser; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; import org.apache.eagle.jpm.util.jobcounter.JobCounters; @@ -43,8 +44,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { * * @throws IOException */ - public JHFMRVer2EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { - super(baseTags, configuration, filter); + public JHFMRVer2EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) { + super(baseTags, configuration, filter, jobHistoryEndpointConfig); } @SuppressWarnings("deprecation") @@ -233,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { if (js.getFailedReduces() != null) { values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString()); } - values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name()); + values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name()); handleJob(wrapper.getType(), values, js.getTotalCounters()); } @@ -289,7 +290,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); } if (js.getStatus() != null) { - values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString())); + values.put(Keys.TASK_STATUS, js.getStatus().toString()); } handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters()); } @@ -308,7 +309,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); } if (js.getStatus() != null) { - values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString())); + values.put(Keys.TASK_STATUS, js.getStatus().toString()); } if (js.getError() != null) { values.put(Keys.ERROR, js.getError().toString()); @@ -381,7 +382,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { values.put(Keys.TASK_TYPE, js.getTaskType().toString()); } if (js.getTaskStatus() != null) { - values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString())); + values.put(Keys.TASK_STATUS, js.getTaskStatus().toString()); } if (js.getAttemptId() != null) { values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); @@ -419,7 +420,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { values.put(Keys.TASK_TYPE, js.getTaskType().toString()); } if (js.getTaskStatus() != null) { - values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString())); + values.put(Keys.TASK_STATUS, js.getTaskStatus().toString()); } if (js.getAttemptId() != null) { values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java index 718612d..386d50c 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -46,7 +46,7 @@ public class JHFParserFactory { switch (f) { case MRVer2: - JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter); + JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig()); reader2.addListener(new JobEntityCreationEagleServiceListener(configManager)); reader2.addListener(new TaskFailureListener(configManager)); reader2.addListener(new TaskAttemptCounterListener(configManager)); @@ -57,7 +57,7 @@ public class JHFParserFactory { break; case MRVer1: default: - JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter); + JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig()); reader1.addListener(new JobEntityCreationEagleServiceListener(configManager)); reader1.addListener(new TaskFailureListener(configManager)); reader1.addListener(new TaskAttemptCounterListener(configManager)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index a681aca..520fbbc 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -43,7 +43,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr List jobEvents = new ArrayList<>(); List taskExecs = new ArrayList<>(); List taskAttemptExecs = new ArrayList<>(); - private JobHistoryZKStateManager zkState; private TimeZone timeZone; public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) { @@ -56,7 +55,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided"); } this.batchSize = batchSize; - zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig()); timeZone = TimeZone.getTimeZone(configManager.getControlConfig().timeZone); } @@ -92,12 +90,13 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr eagleServiceConfig.password); client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + JobHistoryZKStateManager zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig()); logger.info("start flushing entities of total number " + list.size()); for (int i = 0; i < list.size(); i++) { JobBaseAPIEntity entity = list.get(i); if (entity instanceof JobExecutionAPIEntity) { jobs.add((JobExecutionAPIEntity) entity); - this.zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()), + zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()), entity.getTags().get(MRJobTagName.JOB_ID.toString()), ((JobExecutionAPIEntity) entity).getCurrentState()); } else if (entity instanceof JobEventAPIEntity) { @@ -108,6 +107,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity); } } + zkState.close(); GenericServiceAPIResponseEntity result; if (jobs.size() > 0) { logger.info("flush JobExecutionAPIEntity of number " + jobs.size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index db2c716..de874a6 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -40,6 +40,7 @@ "zkRetryTimes" : 3, "zkRetryInterval" : 20000, "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020", + "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888", "principal":"", #if not need, then empty "keytab":"", "basePath" : "/mr-history/done", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 9e156fa..5811f72 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -207,14 +207,15 @@ public class MRJobParser implements Runnable { } jobExecutionAPIEntity.setTimestamp(app.getStartedTime()); jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime()); + jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl()); jobExecutionAPIEntity.setStartTime(mrJob.getStartTime()); jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime()); jobExecutionAPIEntity.setCurrentState(mrJob.getState()); jobExecutionAPIEntity.setInternalState(mrJob.getState()); jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal()); - jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted()); + jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted()); jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal()); - jobExecutionAPIEntity.setReducesCompleted(mrJob.getReducesCompleted()); + jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted()); jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress()); jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress()); jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java new file mode 100644 index 0000000..93c6c00 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; + +public class MRJobCountHelper { + + public void initJobCountList(List jobCounts, long startTime, long endTime, long intervalInSecs) { + for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) { + jobCounts.add(new UnitJobCount(i * intervalInSecs)); + } + } + + public String moveTimeforwardOneDay(String startTime) throws ParseException { + long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime); + timeInSecs -= 24L * 60L * 60L; + return DateTimeUtil.secondsToHumanDate(timeInSecs); + } + + public JobCountResponse getRunningJobCount(List jobDurations, + long startTimeInSecs, + long endTimeInSecs, + long intervalInSecs) { + JobCountResponse response = new JobCountResponse(); + List jobCounts = new ArrayList<>(); + initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs); + for (JobExecutionAPIEntity jobDuration: jobDurations) { + countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString())); + } + response.jobCounts = jobCounts; + return response; + } + + public JobCountResponse getHistoryJobCount(List jobDurations, String timeList) { + JobCountResponse response = new JobCountResponse(); + List jobCounts = new ArrayList<>(); + List times = TaskCountByDurationHelper.parseTimeList(timeList); + for (int i = 0; i < times.size(); i++) { + jobCounts.add(new UnitJobCount(times.get(i))); + } + for (JobExecutionAPIEntity job : jobDurations) { + int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime()); + UnitJobCount counter = jobCounts.get(jobIndex); + countJob(counter, job.getTags().get(MRJobTagName.JOB_TYPE.toString())); + } + response.jobCounts = jobCounts; + return response; + } + + public void countJob(UnitJobCount counter, String jobType) { + if (null == jobType) { + jobType = "null"; + } + counter.jobCount++; + if (counter.jobCountByType.containsKey(jobType)) { + counter.jobCountByType.put(jobType, counter.jobCountByType.get(jobType) + 1); + } else { + counter.jobCountByType.put(jobType, 1L); + } + } + + public void countJob(List jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) { + long startCountPoint = jobCounts.get(0).timeBucket; + if (jobEndTimeSecs < startCountPoint) { + return; + } + int startIndex = 0; + if (jobStartTimeSecs > startCountPoint) { + long relativeStartTime = jobStartTimeSecs - startCountPoint; + startIndex = (int) (relativeStartTime / intervalInSecs) + (relativeStartTime % intervalInSecs == 0 ? 0 : 1); + } + long relativeEndTime = jobEndTimeSecs - startCountPoint; + int endIndex = (int) (relativeEndTime / intervalInSecs); + + for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) { + countJob(jobCounts.get(i), jobType); + } + } + + public List getSearchTimeDuration(List jobEntities) { + List pair = new ArrayList<>(); + long minStartTime = System.currentTimeMillis(); + long maxEndTime = 0; + for (JobExecutionAPIEntity jobEntity : jobEntities) { + if (minStartTime > jobEntity.getStartTime()) { + minStartTime = jobEntity.getStartTime(); + } + if (maxEndTime < jobEntity.getEndTime()) { + maxEndTime = jobEntity.getEndTime(); + } + } + pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime)); + pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime)); + return pair; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java index 3e487ae..5af9811 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java @@ -18,19 +18,25 @@ package org.apache.eagle.service.jpm; - import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID; +import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.generic.GenericEntityServiceResource; +import org.apache.eagle.service.generic.ListQueryResource; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse; +import org.apache.eagle.service.jpm.MRJobTaskCountResponse.TaskCountPerJobResponse; import org.apache.commons.lang.time.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.ParseException; import java.util.*; import javax.ws.rs.*; import javax.ws.rs.core.MediaType; @@ -59,14 +65,14 @@ public class MRJobExecutionResource { List jobs = new ArrayList<>(); List finishedJobs = new ArrayList<>(); Set jobIds = new HashSet<>(); - final Map meta = new HashMap<>(); + final Map meta = new HashMap<>(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); String jobQuery = String.format(query, Constants.JPA_JOB_EXECUTION_SERVICE_NAME); GenericServiceAPIResponseEntity res = - resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, - top,filterIfMissing, parallel, metricName, verbose); + resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, + top, filterIfMissing, parallel, metricName, verbose); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { finishedJobs.add(o); @@ -74,10 +80,10 @@ public class MRJobExecutionResource { } jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME); res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, - top,filterIfMissing, parallel, metricName, verbose); + top, filterIfMissing, parallel, metricName, verbose); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { - if (! isDuplicate(jobIds, o)) { + if (!isDuplicate(jobIds, o)) { jobs.add(o); } } @@ -92,7 +98,7 @@ public class MRJobExecutionResource { response.setException(new Exception(res.getException())); } meta.put(TOTAL_RESULTS, jobs.size()); - meta.put(ELAPSEDMS,stopWatch.getTime()); + meta.put(ELAPSEDMS, stopWatch.getTime()); response.setObj(jobs); response.setMeta(meta); return response; @@ -107,7 +113,7 @@ public class MRJobExecutionResource { } private String buildCondition(String jobId, String jobDefId, String site) { - String conditionFormat = "@site=\"%s\"" ; + String conditionFormat = "@site=\"%s\""; String condition = null; if (jobDefId != null) { conditionFormat = conditionFormat + " AND @jobDefId=\"%s\""; @@ -144,12 +150,12 @@ public class MRJobExecutionResource { } LOG.debug("search condition=" + condition); - final Map meta = new HashMap<>(); + final Map meta = new HashMap<>(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); String queryFormat = "%s[%s]{*}"; String queryString = String.format(queryFormat, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition); - GenericServiceAPIResponseEntity res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + GenericServiceAPIResponseEntity res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { jobs.add(o); @@ -157,10 +163,10 @@ public class MRJobExecutionResource { } } queryString = String.format(queryFormat, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition); - res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); + res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false); if (res.isSuccess() && res.getObj() != null) { for (TaggedLogAPIEntity o : res.getObj()) { - if (! isDuplicate(jobIds, o)) { + if (!isDuplicate(jobIds, o)) { jobs.add(o); } } @@ -181,128 +187,228 @@ public class MRJobExecutionResource { response.setException(new Exception(res.getException())); } meta.put(TOTAL_RESULTS, jobs.size()); - meta.put(ELAPSEDMS,stopWatch.getTime()); + meta.put(ELAPSEDMS, stopWatch.getTime()); response.setObj(jobs); response.setMeta(meta); return response; } - public List parseTimeList(String timelist) { - List times = new ArrayList<>(); - String [] strs = timelist.split("[,\\s]"); - for (String str : strs) { - try { - times.add(Long.parseLong(str)); - } catch (Exception ex) { - LOG.warn(str + " is not a number"); - } - } - return times; - } - public int getPosition(List times, Long duration) { - duration = duration / 1000; - for (int i = 1; i < times.size(); i++) { - if (duration < times.get(i)) { - return i - 1; - } - } - return times.size() - 1; - } - public void getTopTasks(List list, long top) { - for (MRJobTaskGroupResponse.UnitTaskCount taskCounter : list) { - Iterator iterator = taskCounter.entities.iterator(); - for (int i = 0; i < top && iterator.hasNext(); i++) { - taskCounter.topEntities.add(iterator.next()); - } - taskCounter.entities.clear(); - } - } - - public void initTaskCountList(List runningTaskCount, - List finishedTaskCount, - List times, - Comparator comparator) { - for (int i = 0; i < times.size(); i++) { - runningTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator)); - finishedTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator)); - } - } @GET - @Path("{jobId}/taskCounts") + @Path("{jobId}/taskCountsByDuration") @Produces(MediaType.APPLICATION_JSON) - public MRJobTaskGroupResponse getTaskCounts(@PathParam("jobId") String jobId, - @QueryParam("site") String site, - @QueryParam("timelineInSecs") String timeList, - @QueryParam("top") long top) { - MRJobTaskGroupResponse response = new MRJobTaskGroupResponse(); + public TaskCountPerJobResponse getTaskCountsPerJob(@PathParam("jobId") String jobId, + @QueryParam("site") String site, + @QueryParam("timelineInSecs") String timeList, + @QueryParam("top") long top) { + TaskCountPerJobResponse response = new TaskCountPerJobResponse(); if (jobId == null || site == null || timeList == null || timeList.isEmpty()) { response.errMessage = "IllegalArgumentException: jobId == null || site == null || timelineInSecs == null or isEmpty"; return response; } - List runningTaskCount = new ArrayList<>(); - List finishedTaskCount = new ArrayList<>(); + TaskCountByDurationHelper helper = new TaskCountByDurationHelper(); + List runningTaskCount = new ArrayList<>(); + List finishedTaskCount = new ArrayList<>(); - List times = parseTimeList(timeList); + List times = helper.parseTimeList(timeList); String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId); GenericServiceAPIResponseEntity historyRes = - resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) { - initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator()); + helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.HistoryTaskComparator()); for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) { - int index = getPosition(times, o.getDuration()); - MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); - counter.taskCount++; + int index = helper.getPosition(times, o.getDuration()); + MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); + helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); counter.entities.add(o); } } else { query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId); GenericServiceAPIResponseEntity runningRes = - resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); if (runningRes.isSuccess() && runningRes.getObj() != null) { - initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator()); + helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator()); for (TaskExecutionAPIEntity o : runningRes.getObj()) { - int index = getPosition(times, o.getDuration()); + int index = helper.getPosition(times, o.getDuration()); if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { - MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index); - counter.taskCount++; + MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index); + helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); counter.entities.add(o); } else if (o.getEndTime() != 0) { - MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); - counter.taskCount++; + MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); + helper.countTask(counter, o.getTags().get(TASK_TYPE.toString())); counter.entities.add(o); } } } } - if (top > 0) { - getTopTasks(runningTaskCount, top); + if (top > 0) { + helper.getTopTasks(runningTaskCount, top); response.runningTaskCount = runningTaskCount; - getTopTasks(finishedTaskCount, top); + helper.getTopTasks(finishedTaskCount, top); response.finishedTaskCount = finishedTaskCount; } + response.topNumber = top; return response; } - static class RunningTaskComparator implements Comparator { - @Override - public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) { - Long time1 = o1.getDuration(); - Long time2 = o2.getDuration(); - return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + @GET + @Path("runningJobCounts") + @Produces(MediaType.APPLICATION_JSON) + public JobCountResponse getRunningJobCount(@QueryParam("site") String site, + @QueryParam("durationBegin") String startTime, + @QueryParam("durationEnd") String endTime, + @QueryParam("intervalInSecs") long intervalInSecs) { + JobCountResponse response = new JobCountResponse(); + MRJobCountHelper helper = new MRJobCountHelper(); + if (site == null || startTime == null || endTime == null) { + response.errMessage = "IllegalArgument: site, durationBegin, durationEnd is null"; + return response; + } + if (intervalInSecs <= 0) { + response.errMessage = String.format("IllegalArgument: intervalInSecs=%s is invalid", intervalInSecs); + return response; + } + long startTimeInMills; + String searchStartTime = startTime; + String searchEndTime = endTime; + try { + startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND; + searchStartTime = helper.moveTimeforwardOneDay(searchStartTime); + } catch (Exception e) { + response.errMessage = e.getMessage(); + return response; + } + String query = String.format("%s[@site=\"%s\" AND @endTime>=%s]{@startTime,@endTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, startTimeInMills); + GenericServiceAPIResponseEntity historyRes = + resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + if (!historyRes.isSuccess() || historyRes.getObj() == null) { + response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query); + return response; } + + try { + long startTimeInSecs = DateTimeUtil.humanDateToSeconds(startTime); + long endTimeInSecs = DateTimeUtil.humanDateToSeconds(endTime); + return helper.getRunningJobCount(historyRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs); + } catch (Exception e) { + response.errMessage = e.getMessage(); + return response; + } + } + + @GET + @Path("jobMetrics/entities") + @Produces(MediaType.APPLICATION_JSON) + public Object getJobMetricsByEntitiesQuery(@QueryParam("site") String site, + @QueryParam("timePoint") String timePoint, + @QueryParam("metricName") String metricName, + @QueryParam("intervalmin") long intervalmin, + @QueryParam("top") int top) { + return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricEntitiesFunc); } - static class HistoryTaskComparator implements Comparator { - @Override - public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1, - org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) { - Long time1 = o1.getDuration(); - Long time2 = o2.getDuration(); - return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + @GET + @Path("jobMetrics/list") + @Produces(MediaType.APPLICATION_JSON) + public Object getJobMetricsByListQuery(@QueryParam("site") String site, + @QueryParam("timePoint") String timePoint, + @QueryParam("metricName") String metricName, + @QueryParam("intervalmin") long intervalmin, + @QueryParam("top") int top) { + return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricListFunc); + } + + public Object getJobMetrics(String site, String timePoint, String metricName, long intervalmin, int top, + Function6 metricQueryFunc) { + GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity(); + MRJobCountHelper helper = new MRJobCountHelper(); + if (site == null || timePoint == null || metricName == null) { + response.setException(new IllegalArgumentException("Error: site, timePoint, metricName may be unset")); + response.setSuccess(false); + return response; + } + if (intervalmin <= 0) { + LOG.warn("query parameter intervalmin <= 0, use default value 5 instead"); + intervalmin = 5; } + if (top <= 0) { + LOG.warn("query parameter top <= 0, use default value 10 instead"); + top = 10; + } + + long timePointsInMills; + String searchStartTime = timePoint; + String searchEndTime = timePoint; + try { + timePointsInMills = DateTimeUtil.humanDateToSeconds(timePoint) * DateTimeUtil.ONESECOND; + searchStartTime = helper.moveTimeforwardOneDay(searchStartTime); + } catch (ParseException e) { + response.setException(e); + response.setSuccess(false); + return response; + } + String query = String.format("%s[@site=\"%s\" AND @startTime<=\"%s\" AND @endTime>=\"%s\"]{@startTime,@endTime}", + Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, timePointsInMills, timePointsInMills); + GenericServiceAPIResponseEntity historyRes = + resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + if (!historyRes.isSuccess() || historyRes.getObj() == null) { + return historyRes; + } + + List timeDuration = helper.getSearchTimeDuration(historyRes.getObj()); + LOG.info(String.format("new search time range: startTime=%s, endTime=%s", timeDuration.get(0), timeDuration.get(1))); + query = String.format("%s[@site=\"%s\"]<@jobId>{sum(value)}.{sum(value) desc}", Constants.GENERIC_METRIC_SERVICE, site); + return metricQueryFunc.apply(query, timeDuration.get(0), timeDuration.get(1), intervalmin, top, metricName); } + Function6 queryMetricEntitiesFunc + = (query, startTime, endTime, intervalmin, top, metricName) -> { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + return resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, + false, true, intervalmin, top, true, 0, metricName, false); + }; + + Function6 queryMetricListFunc + = (query, startTime, endTime, intervalmin, top, metricName) -> { + ListQueryResource resource = new ListQueryResource(); + return resource.listQuery(query, startTime, endTime, Integer.MAX_VALUE, null, + false, true, intervalmin, top, true, 0, metricName, false); + }; + + @FunctionalInterface + interface Function6 { + R apply(A a, B b, C c, D d, E e, F f); + } + + @GET + @Path("jobCountsByDuration") + @Produces(MediaType.APPLICATION_JSON) + public JobCountResponse getJobCountGroupByDuration(@QueryParam("site") String site, + @QueryParam("timelineInSecs") String timeList, + @QueryParam("jobStartTimeBegin") String startTime, + @QueryParam("jobStartTimeEnd") String endTime) { + JobCountResponse response = new JobCountResponse(); + MRJobCountHelper helper = new MRJobCountHelper(); + if (site == null || startTime == null || endTime == null || timeList == null) { + response.errMessage = "IllegalArgument: site, jobStartTimeBegin, jobStartTimeEnd, or timelineInSecs is null"; + return response; + } + String query = String.format("%s[@site=\"%s\"]{@durationTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site); + GenericServiceAPIResponseEntity historyRes = + resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false); + if (!historyRes.isSuccess() || historyRes.getObj() == null) { + response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query); + return response; + } + try { + return helper.getHistoryJobCount(historyRes.getObj(), timeList); + } catch (Exception e) { + response.errMessage = e.getMessage(); + return response; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java new file mode 100644 index 0000000..c546198 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import java.util.*; + +public class MRJobTaskCountResponse { + public String errMessage; + + public static class TaskCountPerJobResponse extends MRJobTaskCountResponse { + public long topNumber; + public List runningTaskCount; + public List finishedTaskCount; + } + + public static class JobCountResponse extends MRJobTaskCountResponse { + public List jobCounts; + } + + static class UnitTaskCount { + public long timeBucket; + public int taskCount; + public int mapTaskCount; + public int reduceTaskCount; + public Set entities; + public List topEntities; + + UnitTaskCount(long timeBucket, Comparator comparator) { + this.timeBucket = timeBucket; + this.taskCount = 0; + this.mapTaskCount = 0; + this.reduceTaskCount = 0; + entities = new TreeSet<>(comparator); + topEntities = new ArrayList<>(); + } + } + + static class UnitJobCount { + public long timeBucket; + public long jobCount; + public Map jobCountByType; + + UnitJobCount(long timeBucket) { + this.timeBucket = timeBucket; + this.jobCount = 0; + this.jobCountByType = new HashMap<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java deleted file mode 100644 index 3be9b43..0000000 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.service.jpm; - -import java.util.*; - -class MRJobTaskGroupResponse { - public List runningTaskCount; - public List finishedTaskCount; - public String errMessage; - - static class UnitTaskCount { - public long timeBucket; - public int taskCount; - public Set entities; - public List topEntities; - - UnitTaskCount(long timeBucket, Comparator comparator) { - this.timeBucket = timeBucket; - this.taskCount = 0; - entities = new TreeSet<>(comparator); - topEntities = new ArrayList<>(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java new file mode 100644 index 0000000..0eeb440 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +public class TaskCountByDurationHelper { + + private static final Logger LOG = LoggerFactory.getLogger(TaskCountByDurationHelper.class); + + public static List parseTimeList(String timelist) { + List times = new ArrayList<>(); + String [] strs = timelist.split("[,\\s]"); + for (String str : strs) { + try { + times.add(Long.parseLong(str)); + } catch (Exception ex) { + LOG.warn(str + " is not a number"); + } + } + return times; + } + + public static int getPosition(List times, Long duration) { + duration = duration / 1000; + for (int i = 1; i < times.size(); i++) { + if (duration < times.get(i)) { + return i - 1; + } + } + return times.size() - 1; + } + + public void getTopTasks(List list, long top) { + for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) { + Iterator iterator = taskCounter.entities.iterator(); + for (int i = 0; i < top && iterator.hasNext(); i++) { + taskCounter.topEntities.add(iterator.next()); + } + taskCounter.entities.clear(); + } + } + + public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String taskType) { + counter.taskCount++; + if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) { + counter.mapTaskCount++; + } else if (taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) { + counter.reduceTaskCount++; + } + } + + public void initTaskCountList(List runningTaskCount, + List finishedTaskCount, + List times, + Comparator comparator) { + for (int i = 0; i < times.size(); i++) { + runningTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); + finishedTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator)); + } + } + + static class RunningTaskComparator implements Comparator { + @Override + public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + } + } + + static class HistoryTaskComparator implements Comparator { + @Override + public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1, + org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) { + Long time1 = o1.getDuration(); + Long time2 = o2.getDuration(); + return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java new file mode 100644 index 0000000..718f068 --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.common.DateTimeUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; + +public class TestJobCountPerBucketHelper { + MRJobCountHelper helper = new MRJobCountHelper(); + + private static final Logger LOG = LoggerFactory.getLogger(TestJobCountPerBucketHelper.class); + + @Test + public void test() throws ParseException { + String timeString = "2016-08-22 20:13:00"; + long timestamp = DateTimeUtil.humanDateToSeconds(timeString); + String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp); + Assert.assertTrue(timeString2.equals(timeString)); + + String timeString3 = helper.moveTimeforwardOneDay(timeString); + Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00")); + } + + @Test + public void test2() throws ParseException { + String startTime = "2016-08-22 20:13:00"; + String endTime = "2016-08-22 24:13:00"; + List jobCounts = new ArrayList<>(); + helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), 15 * 60); + /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) { + LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket)); + }*/ + Assert.assertTrue(DateTimeUtil.secondsToHumanDate(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00")); + } + + @Test + public void test3() { + List jobCounts = new ArrayList<>(); + long intervalSecs = 5; + helper.initJobCountList(jobCounts, 3, 31, intervalSecs); + helper.countJob(jobCounts, 5, 10, intervalSecs, "hive"); + helper.countJob(jobCounts, 13, 18, intervalSecs, "hive"); + helper.countJob(jobCounts, 18, 28, intervalSecs, "hive"); + helper.countJob(jobCounts, 25, 33, intervalSecs, "hive"); + Assert.assertTrue(jobCounts.size() == 7); + Assert.assertTrue(jobCounts.get(1).jobCount == 1); + Assert.assertTrue(jobCounts.get(5).jobCount == 2); + } + + @Test + public void test4() throws ParseException { + List jobCounts = new ArrayList<>(); + long intervalSecs = 60 * 15; + String startTime = "2016-08-22 20:13:00"; + String endTime = "2016-08-22 24:13:00"; + helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), intervalSecs); + helper.countJob(jobCounts, + DateTimeUtil.humanDateToSeconds("2016-08-22 20:23:00"), + DateTimeUtil.humanDateToSeconds("2016-08-22 20:30:00"), + intervalSecs, + "hive"); + Assert.assertTrue(jobCounts.get(2).jobCount == 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java deleted file mode 100644 index 824556b..0000000 --- a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.service.jpm; - -import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; -import org.apache.eagle.jpm.util.Constants; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - -public class TestMRJobExecutionResource { - - @Test - public void test() { - MRJobExecutionResource resource = new MRJobExecutionResource(); - String timeList = " 0, 10,20,40 "; - List times = resource.parseTimeList(timeList); - Assert.assertTrue(times.size() == 4); - - long val = 25 * 1000; - int index = resource.getPosition(times, val); - Assert.assertTrue(index == 2); - } - - @Test - public void test2() { - MRJobExecutionResource resource = new MRJobExecutionResource(); - String timeList = " 0, 10,20,40 "; - List times = resource.parseTimeList(timeList); - - TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity(); - test1.setDuration(15 * 1000); - test1.setTaskStatus("running"); - TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity(); - test4.setDuration(13 * 1000); - test4.setTaskStatus("running"); - TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity(); - test2.setDuration(0 * 1000); - test2.setEndTime(100); - test2.setTaskStatus("x"); - TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity(); - test3.setDuration(19 * 1000); - test3.setTaskStatus("running"); - TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity(); - test5.setDuration(20 * 1000); - test5.setEndTime(28); - test5.setTaskStatus("x"); - List tasks = new ArrayList<>(); - tasks.add(test1); - tasks.add(test2); - tasks.add(test3); - tasks.add(test4); - tasks.add(test5); - - List runningTaskCount = new ArrayList<>(); - List finishedTaskCount = new ArrayList<>(); - - Comparator comparator = new MRJobExecutionResource.RunningTaskComparator(); - resource.initTaskCountList(runningTaskCount, finishedTaskCount, times, comparator); - - for (TaskExecutionAPIEntity o : tasks) { - int index = resource.getPosition(times, o.getDuration()); - if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { - MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index); - counter.taskCount++; - counter.entities.add(o); - } else if (o.getEndTime() != 0) { - MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index); - counter.taskCount++; - counter.entities.add(o); - } - } - int top = 2; - if (top > 0) { - resource.getTopTasks(runningTaskCount, top); - } - Assert.assertTrue(runningTaskCount.get(1).taskCount == 3); - Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java new file mode 100644 index 0000000..2cd0b8e --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TestTaskCountPerJobHelper { + TaskCountByDurationHelper helper = new TaskCountByDurationHelper(); + + @Test + public void test() { + String timeList = " 0, 10,20,40 "; + List times = helper.parseTimeList(timeList); + Assert.assertTrue(times.size() == 4); + + long val = 25 * 1000; + int index = helper.getPosition(times, val); + Assert.assertTrue(index == 2); + } + + @Test + public void test2() { + TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity(); + test1.setDuration(15 * 1000); + test1.setTaskStatus("running"); + TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity(); + test4.setDuration(13 * 1000); + test4.setTaskStatus("running"); + TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity(); + test2.setDuration(0 * 1000); + test2.setEndTime(100); + test2.setTaskStatus("x"); + TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity(); + test3.setDuration(19 * 1000); + test3.setTaskStatus("running"); + TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity(); + test5.setDuration(20 * 1000); + test5.setEndTime(28); + test5.setTaskStatus("x"); + List tasks = new ArrayList<>(); + tasks.add(test1); + tasks.add(test2); + tasks.add(test3); + tasks.add(test4); + tasks.add(test5); + + List runningTaskCount = new ArrayList<>(); + List finishedTaskCount = new ArrayList<>(); + + String timeList = " 0, 10,20,40 "; + List times = helper.parseTimeList(timeList); + + helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator()); + + for (TaskExecutionAPIEntity o : tasks) { + int index = helper.getPosition(times, o.getDuration()); + if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) { + MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } else if (o.getEndTime() != 0) { + MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index); + counter.taskCount++; + counter.entities.add(o); + } + } + int top = 2; + if (top > 0) { + helper.getTopTasks(runningTaskCount, top); + } + Assert.assertTrue(runningTaskCount.get(1).taskCount == 3); + Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index 7dce0a2..ec56eac 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory; public class Constants { private static final Logger LOG = LoggerFactory.getLogger(Constants.class); + public static final String GENERIC_METRIC_SERVICE = "GenericMetricService"; + //SPARK public static final String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService"; public static final String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";