Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C6469200B6B for ; Thu, 25 Aug 2016 17:47:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C4F4A160AA5; Thu, 25 Aug 2016 15:47:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 97903160AA4 for ; Thu, 25 Aug 2016 17:47:13 +0200 (CEST) Received: (qmail 78860 invoked by uid 500); 25 Aug 2016 15:47:12 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 78851 invoked by uid 99); 25 Aug 2016 15:47:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Aug 2016 15:47:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6326EC81C3 for ; Thu, 25 Aug 2016 15:47:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.738 X-Spam-Level: X-Spam-Status: No, score=-3.738 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.519, WEIRD_PORT=0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tQy7sSX-bDhr for ; Thu, 25 Aug 2016 15:47:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id DBA585F30C for ; Thu, 25 Aug 2016 15:47:04 +0000 (UTC) Received: (qmail 77975 invoked by uid 99); 25 Aug 2016 15:47:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Aug 2016 15:47:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35254E0243; Thu, 25 Aug 2016 15:47:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Message-Id: <6ef0e4f22ceb4f90b0e8ee9d2243d953@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-501] add fields and fix bugs Date: Thu, 25 Aug 2016 15:47:04 +0000 (UTC) archived-at: Thu, 25 Aug 2016 15:47:15 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop a846c401c -> 7f3726716 [EAGLE-501] add fields and fix bugs Author: wujinhu Closes #388 from wujinhu/EAGLE-501. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7f372671 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7f372671 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7f372671 Branch: refs/heads/develop Commit: 7f37267160af48b5cb46b4bcbd85a3a82957b192 Parents: a846c40 Author: wujinhu Authored: Thu Aug 25 23:46:52 2016 +0800 Committer: Hao Chen Committed: Thu Aug 25 23:46:52 2016 +0800 ---------------------------------------------------------------------- .../environment/impl/StormExecutionRuntime.java | 6 ++- .../mr/historyentity/JobExecutionAPIEntity.java | 44 ++++++++++++++++ .../TaskAttemptExecutionAPIEntity.java | 2 +- .../mr/runningentity/JobExecutionAPIEntity.java | 11 ++++ eagle-jpm/eagle-jpm-mr-history/pom.xml | 11 ++-- .../jpm/mr/history/parser/EagleJobStatus.java | 1 + .../mr/history/parser/JHFEventReaderBase.java | 27 +++++++++- .../src/main/resources/application.conf | 11 ++-- .../jpm/mr/running/parser/MRJobParser.java | 12 +++-- .../mr/running/storm/MRRunningJobParseBolt.java | 1 + .../src/main/resources/application.conf | 55 +++++++++++--------- 11 files changed, 141 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java index 83d3592..1b989ac 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java @@ -60,6 +60,7 @@ public class StormExecutionRuntime implements ExecutionRuntime org.apache.eagle + eagle-jpm-app + ${project.version} + + + org.apache.eagle eagle-data-process ${project.version} @@ -104,13 +109,7 @@ hadoop-mapreduce-client-core ${hadoop.version} - - org.apache.eagle - eagle-app-base - ${project.version} - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java index 0a137be..fb218e3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java @@ -24,5 +24,6 @@ public enum EagleJobStatus { PREP, RUNNING, SUCCESS, + KILLED, FAILED; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index 5d3d5b4..6916aad 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -111,6 +111,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl jobExecutionEntity = new JobExecutionAPIEntity(); jobExecutionEntity.setTags(new HashMap<>(baseTags)); + jobExecutionEntity.setNumFailedMaps(0); + jobExecutionEntity.setNumFailedReduces(0); taskRunningHosts = new HashMap<>(); @@ -350,7 +352,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl if (values.get(Keys.COUNTERS) != null || counters != null) { entity.setJobCounters(parseCounters(counters)); } - long duration = entity.getEndTime() - jobSubmitEventEntity.getTimestamp(); + long duration = entity.getEndTime() - jobLaunchTime; if (taskType.equals(Constants.TaskType.MAP.toString()) && duration > jobExecutionEntity.getLastMapDuration()) { jobExecutionEntity.setLastMapDuration(duration); } @@ -367,8 +369,16 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl if (taskType.equals(Constants.TaskType.MAP.toString())) { this.sumMapTaskDuration += entity.getDuration(); + if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) + || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) { + jobExecutionEntity.setNumFailedMaps(1 + jobExecutionEntity.getNumFailedMaps()); + } } else { this.sumReduceTaskDuration += entity.getDuration(); + if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) + || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) { + jobExecutionEntity.setNumFailedReduces(1 + jobExecutionEntity.getNumFailedReduces()); + } } entityCreated(entity); @@ -402,6 +412,21 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl entity.setJobCounters(parseCounters(counters)); } entity.setTaskAttemptID(taskAttemptID); + + if (recType == RecordTypes.MapAttempt) { + jobExecutionEntity.setTotalMapAttempts(1 + jobExecutionEntity.getTotalMapAttempts()); + if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) + || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) { + jobExecutionEntity.setFailedMapAttempts(1 + jobExecutionEntity.getFailedMapAttempts()); + } + } else { + jobExecutionEntity.setTotalReduceAttempts(1 + jobExecutionEntity.getTotalReduceAttempts()); + if (entity.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) + || entity.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) { + jobExecutionEntity.setFailedReduceAttempts(1 + jobExecutionEntity.getFailedReduceAttempts()); + } + } + entityCreated(entity); taskAttemptStartTime.remove(taskAttemptID); } else { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 13e411f..db2c716 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -15,13 +15,15 @@ { "envContextConfig" : { + "env" : "local", + "topologyName" : "mrHistoryJob", + "stormConfigFile" : "storm.yaml", "parallelismConfig" : { "mrHistoryJobExecutor" : 6 }, "tasks" : { "mrHistoryJobExecutor" : 6 - }, - "workers" : 3 + } }, "jobExtractorConfig" : { @@ -59,8 +61,11 @@ "password": "secret" } }, - "appId":"mr_history", + + "appId":"mrHistoryJob", "mode":"LOCAL", + "workers" : 3, + application.storm.nimbusHost=localhost "MRConfigureKeys" : { "jobNameKey" : "eagle.job.name", "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 9148c0c..9e156fa 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -110,6 +110,10 @@ public class MRJobParser implements Runnable { this.configKeys = configKeys; } + public void setAppInfo(AppInfo app) { + this.app = app; + } + public ParserStatus status() { return this.parserStatus; } @@ -120,7 +124,8 @@ public class MRJobParser implements Runnable { private void finishMRJob(String mrJobId) { JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId); - jobExecutionAPIEntity.setCurrentState(Constants.AppState.FINISHED.toString()); + jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString()); + jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString()); mrJobConfigs.remove(mrJobId); if (mrJobConfigs.size() == 0) { this.parserStatus = ParserStatus.APP_FINISHED; @@ -205,6 +210,7 @@ public class MRJobParser implements Runnable { jobExecutionAPIEntity.setStartTime(mrJob.getStartTime()); jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime()); jobExecutionAPIEntity.setCurrentState(mrJob.getState()); + jobExecutionAPIEntity.setInternalState(mrJob.getState()); jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal()); jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted()); jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal()); @@ -562,8 +568,8 @@ public class MRJobParser implements Runnable { mrJobEntityMap.keySet() .stream() .filter( - jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString()) - || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString())) + jobId -> mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FINISHED.toString()) + || mrJobEntityMap.get(jobId).getInternalState().equals(Constants.AppState.FAILED.toString())) .forEach( jobId -> this.runningJobManager.delete(app.getId(), jobId)); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index 3174eb1..e918597 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -88,6 +88,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt { LOG.info("create application parser for {}", appInfo.getId()); } else { applicationParser = runningMRParsers.get(appInfo.getId()); + applicationParser.setAppInfo(appInfo); } Set runningParserIds = new HashSet<>(runningMRParsers.keySet()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7f372671/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf index 4b6d4fe..0d1de78 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf @@ -16,24 +16,27 @@ { "appId":"mrRunningJob", "mode":"LOCAL", + application.storm.nimbusHost=localhost, + "workers" : 8, "envContextConfig" : { "env" : "local", + "topologyName" : "mrRunningJob", + "stormConfigFile" : "storm.yaml", "parallelismConfig" : { "mrRunningJobFetchSpout" : 1, - "mrRunningJobParseBolt" : 10 + "mrRunningJobParseBolt" : 16 }, "tasks" : { "mrRunningJobFetchSpout" : 1, - "mrRunningJobParseBolt" : 10 - }, - "workers" : 5 + "mrRunningJobParseBolt" : 16 + } }, "jobExtractorConfig" : { "site" : "sandbox", "fetchRunningJobInterval" : 60, - "parseJobThreadPoolSize" : 5, #job concurrent - "topAndBottomTaskByElapsedTime" : 50 + "parseJobThreadPoolSize" : 6, + "topAndBottomTaskByElapsedTime" : 10 }, "zookeeperConfig" : { @@ -44,9 +47,11 @@ "zkRetryTimes" : 3, "zkRetryInterval" : 20000 }, + "dataSourceConfig" : { "rmUrls": "http://sandbox.hortonworks.com:50030" }, + "eagleProps" : { "mailHost" : "abc.com", "mailDebug" : "true", @@ -63,23 +68,23 @@ "MRConfigureKeys" : { "jobNameKey" : "eagle.job.name", "jobConfigKey" : [ - "mapreduce.map.output.compress", - "mapreduce.map.output.compress.codec", - "mapreduce.output.fileoutputformat.compress", - "mapreduce.output.fileoutputformat.compress.type", - "mapreduce.output.fileoutputformat.compress.codec", - "mapred.output.format.class", - "eagle.job.runid", - "eagle.job.runidfieldname", - "eagle.job.name", - "eagle.job.normalizedfieldname", - "eagle.alert.email", - "eagle.job.alertemailaddress", - "dataplatform.etl.info", - "mapreduce.map.memory.mb", - "mapreduce.reduce.memory.mb", - "mapreduce.map.java.opts", - "mapreduce.reduce.java.opts" - ] + "mapreduce.map.output.compress", + "mapreduce.map.output.compress.codec", + "mapreduce.output.fileoutputformat.compress", + "mapreduce.output.fileoutputformat.compress.type", + "mapreduce.output.fileoutputformat.compress.codec", + "mapred.output.format.class", + "eagle.job.runid", + "eagle.job.runidfieldname", + "eagle.job.name", + "eagle.job.normalizedfieldname", + "eagle.alert.email", + "eagle.job.alertemailaddress", + "dataplatform.etl.info", + "mapreduce.map.memory.mb", + "mapreduce.reduce.memory.mb", + "mapreduce.map.java.opts", + "mapreduce.reduce.java.opts" + ] } -} +} \ No newline at end of file