Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E3BD3200BD4 for ; Fri, 16 Dec 2016 08:04:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E23FD160B24; Fri, 16 Dec 2016 07:04:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 49F99160AF6 for ; Fri, 16 Dec 2016 08:04:21 +0100 (CET) Received: (qmail 11682 invoked by uid 500); 16 Dec 2016 07:04:20 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 11669 invoked by uid 99); 16 Dec 2016 07:04:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Dec 2016 07:04:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id F1898C14FD for ; Fri, 16 Dec 2016 07:04:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id XxKlmkmZIGMS for ; Fri, 16 Dec 2016 07:04:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id D18B35F251 for ; Fri, 16 Dec 2016 07:04:07 +0000 (UTC) Received: (qmail 11429 invoked by uid 99); 16 Dec 2016 07:04:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Dec 2016 07:04:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AC54DFCFC; Fri, 16 Dec 2016 07:04:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-834] Update Daily Job Summery Report Date: Fri, 16 Dec 2016 07:04:07 +0000 (UTC) archived-at: Fri, 16 Dec 2016 07:04:23 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 229d7b907 -> a89275bfc [EAGLE-834] Update Daily Job Summery Report https://issues.apache.org/jira/browse/EAGLE-834 * update mail subject & title * adjust mail template and enrich the summary data * fix a query bug in the mr job list API (GET /mrJobs) Author: Zhao, Qingwen Closes #749 from qingwen220/EAGLE-834. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a89275bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a89275bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a89275bf Branch: refs/heads/master Commit: a89275bfc86c4db63688e28ff12196ee97a78207 Parents: 229d7b9 Author: Zhao, Qingwen Authored: Fri Dec 16 15:04:00 2016 +0800 Committer: Zhao, Qingwen Committed: Fri Dec 16 15:04:00 2016 +0800 ---------------------------------------------------------------------- .../mr/history/MRHistoryJobDailyReporter.java | 169 +++++++++---------- .../src/main/resources/JobReportTemplate.vm | 118 +++++++------ .../history/MRHistoryJobDailyReporterTest.java | 43 ++--- .../service/jpm/MRJobExecutionResource.java | 26 +-- 4 files changed, 177 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a89275bf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java index 6b38244..0dc6c5f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java @@ -39,19 +39,20 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; +import static org.apache.eagle.common.config.EagleConfigConstants.EAGLE_TIME_ZONE; import static org.apache.eagle.common.config.EagleConfigConstants.SERVICE_HOST; import static org.apache.eagle.common.config.EagleConfigConstants.SERVICE_PORT; public class MRHistoryJobDailyReporter extends AbstractScheduledService { private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobDailyReporter.class); - private static final String TIMEZONE_PATH = "service.timezone"; + private static final String DAILY_SENT_HOUROFDAY = "application.dailyJobReport.reportHourTime"; private static final String DAILY_SENT_PERIOD = "application.dailyJobReport.reportPeriodInHour"; private static final String NUM_TOP_USERS = "application.dailyJobReport.numTopUsers"; private static final String JOB_OVERTIME_LIMIT_HOUR = "application.dailyJobReport.jobOvertimeLimitInHour"; public static final String SERVICE_PATH = "application.dailyJobReport"; - public static final String APP_TYPE = "MR_HISTORY_JOB_APP"; + protected static final String APP_TYPE = "MR_HISTORY_JOB_APP"; // alert context keys protected static final String NUM_TOP_USERS_KEY = "numTopUsers"; @@ -64,6 +65,12 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { protected static final String FINISHED_JOB_USERS_KEY = "finishedJobUsers"; protected static final String EAGLE_JOB_LINK_KEY = "eagleJobLink"; + // queries + private static final String STATUS_QUERY = "%s[@site=\"%s\" and @endTime<=%s]<@currentState>{count}.{count desc}"; + private static final String FAILED_JOBS_QUERY = "%s[@site=\"%s\" and @currentState=\"FAILED\" and @endTime<=%s]<@user>{count}.{count desc}"; + private static final String SUCCEEDED_JOB_QUERY = "%s[@site=\"%s\" and @currentState=\"SUCCEEDED\" and @durationTime>%s and @endTime<=%s]<@user>{count}.{count desc}"; + private static final String FINISHED_JOB_QUERY = "%s[@site=\"%s\" and @endTime<=%s]<@user>{count}.{count desc}"; + private Config config; private IEagleServiceClient client; private ApplicationEntityService applicationResource; @@ -77,13 +84,13 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { private int jobOvertimeLimit = 6; // scheduler - private int initialDelayMin = 10; + private int initialDelayMin = 5; private int periodInMin = 60; private TimeZone timeZone; @Inject public MRHistoryJobDailyReporter(Config config, ApplicationEntityService applicationEntityService) { - this.timeZone = TimeZone.getTimeZone(config.getString(TIMEZONE_PATH)); + this.timeZone = TimeZone.getTimeZone(config.getString(EAGLE_TIME_ZONE)); if (config.hasPath(SERVICE_PATH) && config.hasPath(AlertEmailConstants.EAGLE_APPLICATION_EMAIL_SERVICE)) { this.emailService = new ApplicationEmailService(config, SERVICE_PATH); @@ -139,8 +146,10 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { for (String site : sites) { int reportHour = currentHour / dailySentPeriod * dailySentPeriod; calendar.set(Calendar.HOUR_OF_DAY, reportHour); - String subject = String.format("%s %s", site.toUpperCase(), config.getString(SERVICE_PATH + "." + AlertEmailConstants.SUBJECT)); - Map alertData = buildAlertData(site, calendar.getTimeInMillis()); + long endTime = calendar.getTimeInMillis() / DateTimeUtil.ONEHOUR * DateTimeUtil.ONEHOUR; + long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod; + String subject = buildAlertSubject(site, startTime, endTime); + Map alertData = buildAlertData(site, startTime, endTime); sendByEmailWithSubject(alertData, subject); } } catch (Exception ex) { @@ -159,27 +168,30 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { emailService.onAlert(alertContext, alertData); } - private Map buildAlertData(String site, long endTime) { + protected String buildAlertSubject(String site, long startTime, long endTime) { + String subjectFormat = "[%s] Job Report by %s"; + String date = DateTimeUtil.format(endTime, "yyyyMMdd HH:mm"); + //String startHour = DateTimeUtil.format(startTime, "HH:mm"); + //String endHour = DateTimeUtil.format(endTime, "kk:mm"); + return String.format(subjectFormat, site.toUpperCase(), date); + } + + private Map buildAlertData(String site, long startTime, long endTime) { StopWatch watch = new StopWatch(); Map data = new HashMap<>(); this.client = new EagleServiceClientImpl(config); - long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod; - LOG.info("Going to report job summery info for site {} from {} to {}", site, - DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime), - DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime)); + String startTimeStr = DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime); + String endTimeStr = DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime); + LOG.info("Going to report job summery info for site {} from {} to {}", site, startTimeStr, endTimeStr); try { watch.start(); data.putAll(buildJobSummery(site, startTime, endTime)); - data.putAll(buildFailedJobInfo(site, startTime, endTime)); - data.putAll(buildSucceededJobInfo(site, startTime, endTime)); - data.putAll(buildFinishedJobInfo(site, startTime, endTime)); data.put(NUM_TOP_USERS_KEY, numTopUsers); data.put(JOB_OVERTIME_LIMIT_KEY, jobOvertimeLimit); - data.put(ALERT_TITLE_KEY, String.format("[%s] Daily Job Report", site.toUpperCase())); - data.put(REPORT_RANGE_KEY, String.format("%s ~ %s %s", - DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime), - DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime), - DateTimeUtil.CURRENT_TIME_ZONE.getID())); + data.put(ALERT_TITLE_KEY, String.format("[%s] Job Report for 12 Hours", site.toUpperCase())); + data.put(REPORT_RANGE_KEY, String.format("%s ~ %s %s", startTimeStr, endTimeStr, DateTimeUtil.CURRENT_TIME_ZONE.getID())); + data.put(EAGLE_JOB_LINK_KEY, String.format("http://%s:%d/#/site/%s/jpm/list?startTime=%s&endTime=%s", + config.getString(SERVICE_HOST), config.getInt(SERVICE_PORT), site, startTimeStr, endTimeStr)); watch.stop(); LOG.info("Fetching DailyJobReport tasks {} seconds", watch.getTime() / DateTimeUtil.ONESECOND); } finally { @@ -192,6 +204,48 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { return data; } + private Map buildJobSummery(String site, long startTime, long endTime) { + Map data = new HashMap<>(); + + String query = String.format(STATUS_QUERY, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); + Map jobSummery = queryGroupByMetrics(query, startTime, endTime, Integer.MAX_VALUE); + if (jobSummery == null || jobSummery.isEmpty()) { + LOG.warn("Result set is empty for query={}", query); + return data; + } + Long totalJobs = jobSummery.values().stream().reduce((a, b) -> a + b).get(); + String finishedJobQuery = String.format(FINISHED_JOB_QUERY, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); + String failedJobQuery = String.format(FAILED_JOBS_QUERY, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); + String succeededJobQuery = String.format(SUCCEEDED_JOB_QUERY, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, jobOvertimeLimit * DateTimeUtil.ONEHOUR, endTime); + data.put(SUMMARY_INFO_KEY, processResult(jobSummery, totalJobs)); + data.put(FAILED_JOB_USERS_KEY, buildJobSummery(failedJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.FAILED.toString()))); + data.put(SUCCEEDED_JOB_USERS_KEY, buildJobSummery(succeededJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.SUCCEEDED.toString()))); + data.put(FINISHED_JOB_USERS_KEY, buildJobSummery(finishedJobQuery, startTime, endTime, totalJobs)); + + return data; + } + + private List buildJobSummery(String query, long startTime, long endTime, long totalJobs) { + Map jobUsers = queryGroupByMetrics(query, startTime, endTime, numTopUsers); + if (jobUsers == null || jobUsers.isEmpty()) { + LOG.warn("Result set is empty for query={}", query); + return null; + } + return processResult(jobUsers, totalJobs); + } + + private List processResult(Map parsedResult, long totalJobs) { + List summaryInfoList = new ArrayList<>(); + for (Map.Entry entry : parsedResult.entrySet()) { + JobSummaryInfo summaryInfo = new JobSummaryInfo(); + summaryInfo.key = entry.getKey(); + summaryInfo.numOfJobs = entry.getValue(); + summaryInfo.ratio = Double.parseDouble(String.format("%.2f", summaryInfo.numOfJobs * 100d / totalJobs)); + summaryInfoList.add(summaryInfo); + } + return summaryInfoList; + } + private Map parseQueryResult(List, List>> result, int limit) { Map stateCount = new LinkedHashMap<>(); for (Map, List> map : result) { @@ -213,6 +267,7 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { .startTime(startTime) .endTime(endTime).send(); if (!response.isSuccess()) { + LOG.error(response.getException()); return null; } List, List>> result = response.getObj(); @@ -223,86 +278,18 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { } } - private Map buildJobSummery(String site, long startTime, long endTime) { - Map data = new HashMap<>(); - List statusCount = new ArrayList<>(); - String query = String.format("%s[@site=\"%s\" and @endTime<=%s]<@currentState>{count}", - Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); - Map jobSummery = queryGroupByMetrics(query, startTime, endTime, Integer.MAX_VALUE); - if (jobSummery == null || jobSummery.isEmpty()) { - return data; - } - Optional totalJobs = jobSummery.values().stream().reduce((a, b) -> a + b); - for (Map.Entry entry : jobSummery.entrySet()) { - JobSummeryInfo summeryInfo = new JobSummeryInfo(); - summeryInfo.status = entry.getKey(); - summeryInfo.numOfJobs = entry.getValue(); - summeryInfo.ratio = Double.parseDouble(String.format("%.2f", entry.getValue() * 1d / totalJobs.get())); - statusCount.add(summeryInfo); - } - data.put(SUMMARY_INFO_KEY, statusCount); - return data; - } - - private Map buildFailedJobInfo(String site, long startTime, long endTime) { - Map data = new HashMap<>(); - String query = String.format("%s[@site=\"%s\" and @currentState=\"FAILED\" and @endTime<=%s]<@user>{count}.{count desc}", - Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); - - Map failedJobUsers = queryGroupByMetrics(query, startTime, endTime, numTopUsers); - if (failedJobUsers == null || failedJobUsers.isEmpty()) { - LOG.warn("Result set is empty for query={}", query); - return data; - } - data.put(FAILED_JOB_USERS_KEY, failedJobUsers); - data.put(EAGLE_JOB_LINK_KEY, String.format("http://%s:%d/#/site/%s/jpm/list?startTime=%s&endTime=%s", - config.getString(SERVICE_HOST), - config.getInt(SERVICE_PORT), - site, - DateTimeUtil.millisecondsToHumanDateWithSeconds(startTime), - DateTimeUtil.millisecondsToHumanDateWithSeconds(endTime))); - return data; - } - - private Map buildSucceededJobInfo(String site, long startTime, long endTime) { - Map data = new HashMap<>(); - long overtimeLimit = jobOvertimeLimit * DateTimeUtil.ONEHOUR; - String query = String.format("%s[@site=\"%s\" and @currentState=\"SUCCEEDED\" and @durationTime>%s and @endTime<=%s]<@user>{count}.{count desc}", - Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, overtimeLimit, endTime); - Map succeededJobUsers = queryGroupByMetrics(query, startTime, endTime, numTopUsers); - if (succeededJobUsers == null || succeededJobUsers.isEmpty()) { - LOG.warn("Result set is empty for query={}", query); - return data; - } - data.put(SUCCEEDED_JOB_USERS_KEY, succeededJobUsers); - return data; - } - - private Map buildFinishedJobInfo(String site, long startTime, long endTime) { - Map data = new HashMap<>(); - String query = String.format("%s[@site=\"%s\" and @endTime<=%s]<@user>{count}.{count desc}", - Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, endTime); - Map jobUsers = queryGroupByMetrics(query, startTime, endTime, numTopUsers); - if (jobUsers == null || jobUsers.isEmpty()) { - LOG.warn("Result set is empty for query={}", query); - return data; - } - data.put(FINISHED_JOB_USERS_KEY, jobUsers); - return data; - } - @Override protected Scheduler scheduler() { return Scheduler.newFixedRateSchedule(initialDelayMin, periodInMin, TimeUnit.MINUTES); } - public static class JobSummeryInfo { - public String status; + public static class JobSummaryInfo { + public String key; public long numOfJobs; public double ratio; - public String getStatus() { - return status; + public String getKey() { + return key; } public long getNumOfJobs() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a89275bf/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobReportTemplate.vm ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobReportTemplate.vm b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobReportTemplate.vm index c3afa58..c878d6e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobReportTemplate.vm +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/JobReportTemplate.vm @@ -139,7 +139,7 @@

Summary @@ -176,12 +176,12 @@ #set($statusColor = "#337ab7") - #if($item.status == "SUCCEEDED") + #if($item.key == "SUCCEEDED") #set($statusColor = "green") - #elseif($item.status == "FAILED" || $item.status == "KILLED") + #elseif($item.key == "FAILED" || $item.key == "KILLED") #set($statusColor = "red") #end - $item.status + $item.key @@ -189,8 +189,9 @@ - #set ($ratio_percentage = $item.ratio * 100) - $ratio_percentage % +## #set ($ratio_percentage = $item.ratio * 100) +## $ratio_percentage % + $item.ratio % #end @@ -203,11 +204,11 @@ ## Top $alert["numTopUsers"] Users (Order by Number of Failed Jobs)

- Top $alert["numTopUsers"] Failed Job Users + Top $alert["numTopUsers"] Users by Failed Jobs

@@ -236,19 +237,21 @@ Ratio - #foreach($userItem in $alert["failedJobUsers"].entrySet()) - - - $userItem.key - - $userItem.value - - $TODO_RATIO - - + #foreach($item in $alert["failedJobUsers"]) + + + $item.key + + + $item.numOfJobs + + + $item.ratio % + + #end @@ -261,15 +264,15 @@ - - View Jobs on Eagle + + View Failed Jobs on Eagle - +

- Top $alert["numTopUsers"] Finished* Job Users + Top $alert["numTopUsers"] Users by Finished* Job

@@ -298,20 +301,21 @@ Ratio - #foreach($userItem in $alert["finishedJobUsers"].entrySet()) - - - $userItem.key - - - $userItem.value - - $TODO_RATIO - - + #foreach($item in $alert["finishedJobUsers"]) + + + $item.key + + + $item.numOfJobs + + + $item.ratio % + + #end @@ -322,10 +326,10 @@

- Top $alert["numTopUsers"] Succeeded Long Running* Job Users + Top $alert["numTopUsers"] Users by Long Running* Job

@@ -355,19 +359,21 @@ Ratio - #foreach($userItem in $alert["succeededJobUsers"].entrySet()) - - - $userItem.key - - $userItem.value - - $TODO_RATIO - - + #foreach($item in $alert["succeededJobUsers"]) + + + $item.key + + + $item.numOfJobs + + + $item.ratio % + + #end @@ -380,8 +386,8 @@ style="text-align:left; font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 0 0 20px;" valign="top"> * Notes:
- 1) Finished jobs include those SUCCEEDED, FAILED and KILLED.
- 2) Long running jobs mean those duration over $alert["jobOvertimeLimit"] hours. + 1) Finished jobs include those SUCCEEDED, FAILED, KILLED, etc.
+ 2) Long running jobs mean those succeeded jobs with duration over $alert["jobOvertimeLimit"] hours. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a89275bf/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java index d80ae37..3b297ae 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporterTest.java @@ -52,6 +52,7 @@ public class MRHistoryJobDailyReporterTest { server.stop(); } } + @Test public void test() throws Exception { MRHistoryJobDailyReporter reporter = new MRHistoryJobDailyReporter(config, null); @@ -65,33 +66,26 @@ public class MRHistoryJobDailyReporterTest { private Map mockAlertData() { Map alertData = new HashMap<>(); - List summeryInfos = new ArrayList<>(); - MRHistoryJobDailyReporter.JobSummeryInfo summeryInfo1 = new MRHistoryJobDailyReporter.JobSummeryInfo(); - summeryInfo1.status = "FAILED"; - summeryInfo1.numOfJobs = 10; - summeryInfo1.ratio = 0.1; - MRHistoryJobDailyReporter.JobSummeryInfo summeryInfo2 = new MRHistoryJobDailyReporter.JobSummeryInfo(); - summeryInfo2.status = "SUCCEEDED"; - summeryInfo2.numOfJobs = 90; - summeryInfo2.ratio = 0.9; - summeryInfos.add(summeryInfo1); - summeryInfos.add(summeryInfo2); + List summeryInfos = new ArrayList<>(); + summeryInfos.add(buildJobSummaryInfo("FAILED", 8, 8)); + summeryInfos.add(buildJobSummaryInfo("SUCCEEDED", 90, 89.9)); + summeryInfos.add(buildJobSummaryInfo("KILLED", 2, 2)); alertData.put(SUMMARY_INFO_KEY, summeryInfos); - Map failedJobUsers = new TreeMap<>(); - failedJobUsers.put("alice", 100d); - failedJobUsers.put("bob", 97d); + List failedJobUsers = new ArrayList<>(); + failedJobUsers.add(buildJobSummaryInfo("alice", 100L, 98d)); + failedJobUsers.add(buildJobSummaryInfo("bob", 97L, 2)); alertData.put(FAILED_JOB_USERS_KEY, failedJobUsers); - Map succeededJobUsers = new TreeMap<>(); - succeededJobUsers.put("alice1", 100d); - succeededJobUsers.put("bob1", 97d); + List succeededJobUsers = new ArrayList<>(); + succeededJobUsers.add(buildJobSummaryInfo("alice1", 100L, 98)); + succeededJobUsers.add(buildJobSummaryInfo("bob1", 97, 2)); alertData.put(SUCCEEDED_JOB_USERS_KEY, succeededJobUsers); - Map finishedJobUsers = new TreeMap<>(); - finishedJobUsers.put("alice2", 100d); - finishedJobUsers.put("bob2", 97d); + List finishedJobUsers = new ArrayList<>(); + finishedJobUsers.add(buildJobSummaryInfo("alice2", 100L, 98)); + finishedJobUsers.add(buildJobSummaryInfo("bob2", 97, 2)); alertData.put(FINISHED_JOB_USERS_KEY, finishedJobUsers); alertData.put(ALERT_TITLE_KEY, "[TEST_CLUSTER] Daily Job Report"); @@ -104,4 +98,13 @@ public class MRHistoryJobDailyReporterTest { alertData.put(EAGLE_JOB_LINK_KEY, "http://localhost:9090/#/site/sandbox/jpm/statistics"); return alertData; } + + private JobSummaryInfo buildJobSummaryInfo(String key, long number, double ratio) { + JobSummaryInfo jobSummaryInfo = new JobSummaryInfo(); + jobSummaryInfo.numOfJobs = number; + jobSummaryInfo.ratio = ratio; + jobSummaryInfo.key = key; + return jobSummaryInfo; + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a89275bf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java index 42b6584..28e6bb3 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java @@ -59,30 +59,32 @@ public class MRJobExecutionResource { @QueryParam("filterIfMissing") boolean filterIfMissing, @QueryParam("parallel") int parallel, @QueryParam("metricName") String metricName, - @QueryParam("verbose") Boolean verbose) { + @QueryParam("verbose") Boolean verbose) throws ParseException { GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity(); List jobs = new ArrayList<>(); - List finishedJobs = new ArrayList<>(); + List finishedJobs = new ArrayList<>(); Set jobIds = new HashSet<>(); final Map meta = new HashMap<>(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); String jobQuery = String.format(query, Constants.JPA_JOB_EXECUTION_SERVICE_NAME); - GenericServiceAPIResponseEntity res = - resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, - top, filterIfMissing, parallel, metricName, verbose); + GenericServiceAPIResponseEntity res = + resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName, verbose); if (res.isSuccess() && res.getObj() != null) { - for (TaggedLogAPIEntity o : res.getObj()) { - finishedJobs.add(o); - jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString())); + long maxFinishedTime = DateTimeUtil.humanDateToSeconds(endTime) * DateTimeUtil.ONESECOND; + for (JobExecutionAPIEntity o : res.getObj()) { + if (o.getEndTime() <= maxFinishedTime) { + finishedJobs.add(o); + jobIds.add(o.getTags().get(MRJobTagName.JOB_ID.toString())); + } } jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME); - res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, - top, filterIfMissing, parallel, metricName, verbose); - if (res.isSuccess() && res.getObj() != null) { - for (TaggedLogAPIEntity o : res.getObj()) { + GenericServiceAPIResponseEntity runningRes = + resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing, parallel, metricName, verbose); + if (runningRes.isSuccess() && runningRes.getObj() != null) { + for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity o : runningRes.getObj()) { String key = o.getTags().get(MRJobTagName.JOB_ID.toString()); if (!ResourceUtils.isDuplicate(jobIds, key)) { jobs.add(o);