Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 485BA200B82 for ; Fri, 2 Sep 2016 05:49:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3BC65160AB7; Fri, 2 Sep 2016 03:49:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B33FF160AB5 for ; Fri, 2 Sep 2016 05:49:11 +0200 (CEST) Received: (qmail 92983 invoked by uid 500); 2 Sep 2016 03:49:10 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 92974 invoked by uid 99); 2 Sep 2016 03:49:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 03:49:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7A0F518A2D0 for ; Fri, 2 Sep 2016 03:49:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id xY7NlJ9MlQXf for ; Fri, 2 Sep 2016 03:49:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A6D8A5FE06 for ; Fri, 2 Sep 2016 03:49:03 +0000 (UTC) Received: (qmail 92783 invoked by uid 99); 2 Sep 2016 03:49:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2016 03:49:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3481E0243; Fri, 2 Sep 2016 03:49:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-518] add Job counter metrics for mr history job Date: Fri, 2 Sep 2016 03:49:02 +0000 (UTC) archived-at: Fri, 02 Sep 2016 03:49:13 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop 9488afc15 -> b2b16b745 [EAGLE-518] add Job counter metrics for mr history job Author: wujinhu Closes #414 from wujinhu/EAGLE-518. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b2b16b74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b2b16b74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b2b16b74 Branch: refs/heads/develop Commit: b2b16b745e210eb80929d970820c322d557d93e0 Parents: 9488afc Author: wujinhu Authored: Fri Sep 2 11:48:54 2016 +0800 Committer: Qingwen Zhao Committed: Fri Sep 2 11:48:54 2016 +0800 ---------------------------------------------------------------------- .../history/crawler/JHFCrawlerDriverImpl.java | 53 +++--------- .../metrics/JobCountMetricsGenerator.java | 88 ++++++++++++++++++++ .../JobExecutionMetricsCreationListener.java | 75 +++++++++++++++++ .../JobEntityCreationEagleServiceListener.java | 13 +++ .../AbstractMetricsCreationListener.java | 42 ---------- .../JobExecutionMetricsCreationListener.java | 9 +- .../TaskExecutionMetricsCreationListener.java | 3 +- .../org/apache/eagle/jpm/util/Constants.java | 4 + .../AbstractMetricsCreationListener.java | 42 ++++++++++ 9 files changed, 239 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 077f4e1..2f326fe 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.mr.history.crawler; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; +import org.apache.eagle.jpm.mr.history.metrics.JobCountMetricsGenerator; import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; import org.apache.eagle.jpm.mr.historyentity.JobCountEntity; @@ -62,15 +63,12 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private JobIdFilter jobFilter; private int partitionId; private TimeZone timeZone; - private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig; - private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig; + private JobCountMetricsGenerator jobCountMetricsGenerator; public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig, MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig, MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader, JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception { - this.eagleServiceConfig = eagleServiceConfig; - this.jobExtractorConfig = jobExtractorConfig; this.zeroBasedMonth = controlConfig.zeroBasedMonth; this.dryRun = controlConfig.dryRun; if (this.dryRun) { @@ -81,6 +79,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { this.partitionId = partitionId; this.jobFilter = jobFilter; timeZone = TimeZone.getTimeZone(controlConfig.timeZone); + jobCountMetricsGenerator = new JobCountMetricsGenerator(eagleServiceConfig, jobExtractorConfig, timeZone); } /** @@ -191,7 +190,13 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { jobHistoryFile); processedJobFileNames.add(jobHistoryFile); - flushJobCount(); + jobCountMetricsGenerator.flush( + String.format(FORMAT_JOB_PROCESS_DATE, + this.processDate.year, + this.processDate.month + 1, + this.processDate.day), + this.processDate.year, this.processDate.month, this.processDate.day + ); Long modifiedTime = item.getLeft(); return modifiedTime; } @@ -237,44 +242,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { } } - private void flushJobCount() throws Exception { - List> jobs = JobHistoryZKStateManager.instance().getProcessedJobs( - String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day) - ); - JobCountEntity entity = new JobCountEntity(); - entity.setTotal(jobs.size()); - entity.setFail(0); - jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach( - job -> entity.setFail(1 + entity.getFail()) - ); - - IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); - - - GregorianCalendar cal = new GregorianCalendar(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0); - cal.setTimeZone(timeZone); - entity.setTimestamp(cal.getTimeInMillis()); - @SuppressWarnings("serial") - Map baseTags = new HashMap() { - { - put("site", jobExtractorConfig.site); - } - }; - entity.setTags(baseTags); - List entities = new ArrayList<>(); - entities.add(entity); - - LOG.info("start flushing entities of total number " + entities.size()); - client.create(entities); - LOG.info("finish flushing entities of total number " + entities.size()); - client.getJerseyClient().destroy(); - client.close(); - } - private void advanceOneDay() throws Exception { //flushJobCount(); GregorianCalendar cal = new GregorianCalendar(timeZone); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java new file mode 100644 index 0000000..0e0e5e9 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.jpm.mr.history.metrics; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; +import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus; +import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class JobCountMetricsGenerator { + private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class); + + private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig; + private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig; + private TimeZone timeZone; + + public JobCountMetricsGenerator(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig, + MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig, + TimeZone timeZone) { + this.eagleServiceConfig = eagleServiceConfig; + this.jobExtractorConfig = jobExtractorConfig; + this.timeZone = timeZone; + } + + public void flush(String date, int year, int month, int day) throws Exception { + List> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(date); + int total = jobs.size(); + int fail = 0; + for (Pair job : jobs) { + if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) { + ++fail; + } + } + + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + + GregorianCalendar cal = new GregorianCalendar(year, month, day); + cal.setTimeZone(timeZone); + GenericMetricEntity metricEntity = new GenericMetricEntity(); + metricEntity.setTimestamp(cal.getTimeInMillis()); + metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY); + metricEntity.setValue(new double[]{total, fail}); + @SuppressWarnings("serial") + Map baseTags = new HashMap() { + { + put("site", jobExtractorConfig.site); + } + }; + metricEntity.setTags(baseTags); + List entities = new ArrayList<>(); + entities.add(metricEntity); + + LOG.info("start flushing entities of total number " + entities.size()); + client.create(entities); + LOG.info("finish flushing entities of total number " + entities.size()); + client.getJerseyClient().destroy(); + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java new file mode 100644 index 0000000..2129bed --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.jpm.mr.history.metrics; + +import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class JobExecutionMetricsCreationListener extends AbstractMetricsCreationListener { + + @Override + public List generateMetrics(JobExecutionAPIEntity entity) { + List metrics = new ArrayList<>(); + if (entity != null) { + Long timeStamp = entity.getTimestamp(); + Map tags = entity.getTags(); + metrics.add(metricWrapper(timeStamp, + Constants.JOB_EXECUTION_TIME, + new double[]{entity.getDurationTime()}, + tags)); + + metrics.add(metricWrapper( + timeStamp, + Constants.MAP_COUNT_RATIO, + new double[]{entity.getNumTotalMaps(), 1.0 * entity.getNumFailedMaps() / entity.getNumTotalMaps()}, + tags)); + + metrics.add(metricWrapper( + timeStamp, + Constants.REDUCE_COUNT_RATIO, + new double[]{entity.getNumTotalReduces(), 1.0 * entity.getNumFailedReduces() / entity.getNumTotalReduces()}, + tags)); + + org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters(); + if (jobCounters != null && jobCounters.getCounters() != null) { + for (Map metricGroup : jobCounters.getCounters().values()) { + for (Map.Entry entry : metricGroup.entrySet()) { + String metricName = entry.getKey().toLowerCase(); + metrics.add(metricWrapper(timeStamp, "history." + metricName, new double[]{entry.getValue()}, tags)); + } + } + } + } + return metrics; + } + + @Override + public String buildMetricName(String field) { + return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field); + } + + +} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 30eeb54..623a776 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -19,9 +19,11 @@ package org.apache.eagle.jpm.mr.history.parser; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; +import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; import org.apache.eagle.jpm.mr.historyentity.*; import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -43,6 +45,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr List jobEvents = new ArrayList<>(); List taskExecs = new ArrayList<>(); List taskAttemptExecs = new ArrayList<>(); + private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener(); private TimeZone timeZone; public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) { @@ -91,6 +94,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); logger.info("start flushing entities of total number " + list.size()); + List metricEntities = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { JobBaseAPIEntity entity = list.get(i); if (entity instanceof JobExecutionAPIEntity) { @@ -98,6 +102,8 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()), entity.getTags().get(MRJobTagName.JOB_ID.toString()), ((JobExecutionAPIEntity) entity).getCurrentState()); + + metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity)); } else if (entity instanceof JobEventAPIEntity) { jobEvents.add((JobEventAPIEntity) entity); } else if (entity instanceof TaskExecutionAPIEntity) { @@ -113,6 +119,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr checkResult(result); jobs.clear(); } + if (metricEntities.size() > 0) { + logger.info("flush job metrics of number " + metricEntities.size()); + result = client.create(metricEntities); + checkResult(result); + metricEntities.clear(); + } if (jobEvents.size() > 0) { logger.info("flush JobEventAPIEntity of number " + jobEvents.size()); result = client.create(jobEvents); @@ -131,6 +143,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr checkResult(result); taskAttemptExecs.clear(); } + logger.info("finish flushing entities of total number " + list.size()); list.clear(); client.getJerseyClient().destroy(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java deleted file mode 100644 index 8634b6a..0000000 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.jpm.mr.running.parser.metrics; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericMetricEntity; - -import java.util.List; -import java.util.Map; - -public abstract class AbstractMetricsCreationListener { - - public abstract List generateMetrics(E entity); - - protected abstract String buildMetricName(String field); - - protected GenericMetricEntity metricWrapper(Long timestamp, String field, double value, Map tags) { - String metricName = buildMetricName(field); - GenericMetricEntity metricEntity = new GenericMetricEntity(); - metricEntity.setTimestamp(timestamp); - metricEntity.setTags(tags); - metricEntity.setPrefix(metricName); - metricEntity.setValue(new double[]{value}); - return metricEntity; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java index 76d2a19..8b30d45 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java @@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics; import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener; import java.util.ArrayList; import java.util.List; @@ -34,15 +35,15 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation if (entity != null) { Long currentTime = System.currentTimeMillis(); Map tags = entity.getTags(); - metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, entity.getAllocatedMB(), tags)); - metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, entity.getAllocatedVCores(), tags)); - metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, entity.getRunningContainers(), tags)); + metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, new double[]{entity.getAllocatedMB()}, tags)); + metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, new double[]{entity.getAllocatedVCores()}, tags)); + metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, new double[]{entity.getRunningContainers()}, tags)); org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters(); if (jobCounters != null && jobCounters.getCounters() != null) { for (Map metricGroup : jobCounters.getCounters().values()) { for (Map.Entry entry : metricGroup.entrySet()) { String metricName = entry.getKey().toLowerCase(); - metrics.add(metricWrapper(currentTime, metricName, entry.getValue(), tags)); + metrics.add(metricWrapper(currentTime, metricName, new double[]{entry.getValue()}, tags)); } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java index d0b0d57..9f22a7f 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java @@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics; import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener; import java.util.ArrayList; import java.util.List; @@ -33,7 +34,7 @@ public class TaskExecutionMetricsCreationListener extends AbstractMetricsCreatio if (entity != null) { Long currentTime = System.currentTimeMillis(); Map tags = entity.getTags(); - metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getDuration(), tags)); + metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, new double[]{entity.getDuration()}, tags)); } return metrics; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index ec56eac..e18fe07 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -172,7 +172,11 @@ public class Constants { public static final String ALLOCATED_VCORES = "allocatedvcores"; public static final String RUNNING_CONTAINERS = "runningcontainers"; public static final String TASK_EXECUTION_TIME = "taskduration"; + public static final String JOB_EXECUTION_TIME = "jobduration"; + public static final String MAP_COUNT_RATIO = "map.count.ratio"; + public static final String REDUCE_COUNT_RATIO = "reduce.count.ratio"; public static final String JOB_LEVEL = "job"; public static final String TASK_LEVEL = "task"; + public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count"; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java new file mode 100644 index 0000000..dd61432 --- /dev/null +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.jpm.util.metrics; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericMetricEntity; + +import java.util.List; +import java.util.Map; + +public abstract class AbstractMetricsCreationListener { + + public abstract List generateMetrics(E entity); + + protected abstract String buildMetricName(String field); + + protected GenericMetricEntity metricWrapper(Long timestamp, String field, double[] values, Map tags) { + String metricName = buildMetricName(field); + GenericMetricEntity metricEntity = new GenericMetricEntity(); + metricEntity.setTimestamp(timestamp); + metricEntity.setTags(tags); + metricEntity.setPrefix(metricName); + metricEntity.setValue(values); + return metricEntity; + } +}