Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 05720200C08 for ; Thu, 12 Jan 2017 03:23:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 03E76160B51; Thu, 12 Jan 2017 02:23:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E9DE160B4E for ; Thu, 12 Jan 2017 03:23:30 +0100 (CET) Received: (qmail 65029 invoked by uid 500); 12 Jan 2017 02:23:29 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 65020 invoked by uid 99); 12 Jan 2017 02:23:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jan 2017 02:23:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7ABF3DFAF2; Thu, 12 Jan 2017 02:23:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.apache.org Date: Thu, 12 Jan 2017 02:23:29 -0000 Message-Id: <1b3bad4b57c04701ac8628c10567b033@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] eagle git commit: [EAGLE-797] add job performance analysis archived-at: Thu, 12 Jan 2017 02:23:32 -0000 Repository: eagle Updated Branches: refs/heads/master 2adbbf59f -> b4695801f http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm new file mode 100644 index 0000000..39cec68 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm @@ -0,0 +1,131 @@ + + + + + + + + + + #set ( $elem = $alertList[0] ) + +

Basic Information:

+ +
    +
  • Site: ${elem["basic"].get("site")}
  • +
  • Job Name: ${elem["basic"].get("name")}
  • +
  • User: ${elem["basic"].get("user")}
  • +
  • Job Status: ${elem["basic"].get("status")}
  • +
  • Start Time: ${elem["basic"].get("start")}
  • +
  • End Time: ${elem["basic"].get("end")}
  • +
  • Duration Time: ${elem["basic"].get("duration")}
  • +
  • Progress: ${elem["basic"].get("progress")}
  • +
  • Job Detail: ${elem["basic"].get("detail")}
  • +
+ +

Analyzer Results:

+ +#foreach($evaluator in ${elem["extend"].keySet()}) + + + + + + + #foreach($message in ${elem["extend"].get($evaluator).keySet()}) + + + + + #end +
Analysis By $evaluator
levelmessage
${elem["extend"].get($evaluator).get($message)}$message
+#end + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql new file mode 100644 index 0000000..3d790d0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS jobs ( + jobDefId VARCHAR(50) NOT NULL, + configuration MEDIUMTEXT NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (jobDefId) +); + +CREATE TABLE IF NOT EXISTS job_evaluators ( + jobDefId VARCHAR(50) NOT NULL, + evaluator VARCHAR(100) NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (jobDefId, evaluator) +); + +CREATE TABLE IF NOT EXISTS job_publishments ( + userId VARCHAR(100) PRIMARY KEY, + mailAddress mediumtext NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (userId) +); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java index 6f337e7..c3d08d4 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java @@ -89,7 +89,6 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java index 6dc5791..01f98c0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -84,7 +84,6 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity } finally { list.clear(); jobConfigurationEntity = null; - client.getJerseyClient().destroy(); client.close(); } tried++; http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 80b4049..2f77456 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -97,7 +97,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); logger.info("start flushing entities of total number " + list.size()); List metricEntities = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { @@ -167,7 +167,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr logger.info("finish flushing entities of total number " + list.size()); list.clear(); - client.getJerseyClient().destroy(); client.close(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index 666b3db..856f051 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -120,7 +120,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List list = new ArrayList<>(); logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size()); // create entity @@ -149,7 +149,6 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe logger.info("end flushing TaskAttemptCounter entities of total number " + counters.size()); counters.clear(); list.clear(); - client.getJerseyClient().destroy(); client.close(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index 40e6432..794f992 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -139,7 +139,6 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { } tried++; } - client.getJerseyClient().destroy(); client.close(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml index f6f1be5..02f2bc4 100644 --- a/eagle-jpm/eagle-jpm-mr-running/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml @@ -79,6 +79,16 @@ ${powermock.version} test + + org.apache.eagle + eagle-jpm-analyzer + ${project.version} + + + org.apache.eagle + eagle-metadata-jdbc + ${project.version} + http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index de0d846..309146e 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt; import org.apache.eagle.jpm.util.Constants; @@ -67,7 +68,8 @@ public class MRRunningJobApplication extends StormApplication { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config), + config, + new MRJobPerformanceAnalyzer(config)), tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); return topologyBuilder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java index 5a57aca..6670282 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java @@ -18,6 +18,11 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceJDBCImpl; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceMemoryImpl; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; +import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore; import java.util.Optional; @@ -31,4 +36,10 @@ public class MRRunningJobApplicationProvider extends AbstractApplicationProvider public Optional getApplicationListener() { return Optional.of(new MRRunningJobApplicationListener()); } + + @Override + protected void onRegister() { + bind(MemoryMetadataStore.class, MetaManagementService.class, MetaManagementServiceMemoryImpl.class); + bind(JDBCMetadataStore.class, MetaManagementService.class, MetaManagementServiceJDBCImpl.class); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java index c2cbbe5..2bc1648 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java @@ -85,7 +85,6 @@ public class MRJobEntityCreationHandler { LOG.warn("exception found when flush entities, {}", e); return false; } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 52c1866..c21eaf1 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -19,6 +19,8 @@ package org.apache.eagle.jpm.mr.running.parser; import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; import org.apache.eagle.jpm.mr.runningentity.JobConfig; @@ -82,6 +84,7 @@ public class MRJobParser implements Runnable { private static final int FLUSH_TASKS_EVERY_TIME = 5; private static final int MAX_TASKS_PERMIT = 5000; private Config config; + private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; static { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); @@ -92,7 +95,8 @@ public class MRJobParser implements Runnable { AppInfo app, Map mrJobMap, MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher, List configKeys, - Config config) { + Config config, + MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { this.app = app; if (mrJobMap == null) { this.mrJobEntityMap = new HashMap<>(); @@ -112,6 +116,7 @@ public class MRJobParser implements Runnable { this.finishedTaskIds = new HashSet<>(); this.configKeys = configKeys; this.config = config; + this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } public void setAppInfo(AppInfo app) { @@ -168,6 +173,7 @@ public class MRJobParser implements Runnable { break; } } + mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId))); } } @@ -586,4 +592,21 @@ public class MRJobParser implements Runnable { } } } + + private AnalyzerEntity convertToAnalysisEntity(JobExecutionAPIEntity jobExecutionAPIEntity) { + AnalyzerEntity mrJobAnalysisEntity = new AnalyzerEntity(); + Map tags = jobExecutionAPIEntity.getTags(); + mrJobAnalysisEntity.setJobDefId(tags.get(MRJobTagName.JOD_DEF_ID.toString())); + mrJobAnalysisEntity.setJobId(tags.get(MRJobTagName.JOB_ID.toString())); + mrJobAnalysisEntity.setSiteId(tags.get(MRJobTagName.SITE.toString())); + mrJobAnalysisEntity.setUserId(tags.get(MRJobTagName.USER.toString())); + + mrJobAnalysisEntity.setStartTime(jobExecutionAPIEntity.getStartTime()); + mrJobAnalysisEntity.setEndTime(jobExecutionAPIEntity.getEndTime()); + mrJobAnalysisEntity.setDurationTime(jobExecutionAPIEntity.getDurationTime()); + mrJobAnalysisEntity.setCurrentState(jobExecutionAPIEntity.getInternalState()); + mrJobAnalysisEntity.setJobConfig(new HashMap<>(jobExecutionAPIEntity.getJobConfig())); + mrJobAnalysisEntity.setProgress(this.app.getProgress()); + return mrJobAnalysisEntity; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index 8ec2dec..915df8a 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.mr.running.storm; import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; @@ -51,18 +52,21 @@ public class MRRunningJobParseBolt extends BaseRichBolt { private ResourceFetcher resourceFetcher; private List configKeys; private Config config; + private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig, MRRunningJobConfig.EndpointConfig endpointConfig, MRRunningJobConfig.ZKStateConfig zkStateConfig, List configKeys, - Config config) { + Config config, + MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { this.eagleServiceConfig = eagleServiceConfig; this.endpointConfig = endpointConfig; this.runningMRParsers = new HashMap<>(); this.zkStateConfig = zkStateConfig; this.configKeys = configKeys; this.config = config; + this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } @Override @@ -83,7 +87,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt { MRJobParser applicationParser; if (!runningMRParsers.containsKey(appInfo.getId())) { applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig, - appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config); + appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config, + mrJobPerformanceAnalyzer); runningMRParsers.put(appInfo.getId(), applicationParser); LOG.info("create application parser for {}", appInfo.getId()); } else { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index 787c9ac..fc674d6 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -23,6 +23,7 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; @@ -88,7 +89,8 @@ public class MRRunningJobApplicationTest { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config); + config, + new MRJobPerformanceAnalyzer(config)); MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class); PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager); mrRunningJobParseBolt.prepare(null, null, null); http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index e0b5533..7046f8b 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; @@ -131,7 +132,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -186,7 +187,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -228,7 +229,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -272,7 +273,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -318,7 +319,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -357,7 +358,7 @@ public class MRJobParserTest { eagleServiceConfig.username, eagleServiceConfig.password).thenReturn(client); when(client.create(any())).thenThrow(Exception.class).thenReturn(null); - when(client.getJerseyClient()).thenReturn(new Client()); + //when(client.getJerseyClient()).thenReturn(new Client()); mockInputJobSteam("/mrjob_30784.json", JOB_URL); mockInputJobSteamWithException(JOB_COUNT_URL); mockGetConnection("/mrconf_30784.xml"); @@ -377,7 +378,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -401,7 +402,7 @@ public class MRJobParserTest { Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null); Assert.assertTrue(entities.isEmpty()); verify(client, times(2)).create(any()); - verify(client, times(1)).getJerseyClient(); + //verify(client, times(1)).getJerseyClient(); verify(client, times(1)).close(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java index fdfcaad..8127aa2 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java @@ -50,7 +50,7 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000); + client.setReadTimeout(eagleServiceConfig.timeout * 1000); String message = ""; try { @@ -88,7 +88,6 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java index 05e35e4..2ef1bd9 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java @@ -708,7 +708,7 @@ public class JHFSparkEventReader { config.eagleInfo.username, config.eagleInfo.password); int timeout = config.eagleInfo.timeout; - client.getJerseyClient().setReadTimeout(timeout * 1000); + client.setReadTimeout(timeout * 1000); return client; } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java index 92adfa8..adef27f 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java @@ -57,7 +57,7 @@ public class SparkAppEntityCreationHandler { eagleServiceConfig.eagleServicePort, eagleServiceConfig.username, eagleServiceConfig.password)) { - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); LOG.info("start to flush spark app entities, size {}", entities.size()); client.create(entities); LOG.info("finish flushing spark app entities, size {}", entities.size()); http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java index 9025d36..a065373 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -39,7 +39,7 @@ public class Utils { try { is.close(); } catch (Exception e) { - e.printStackTrace(); + LOG.warn("{}", e); } } } @@ -48,7 +48,7 @@ public class Utils { try { Thread.sleep(seconds * 1000); } catch (Exception e) { - e.printStackTrace(); + LOG.warn("{}", e); } } @@ -60,7 +60,7 @@ public class Utils { Date parsedDate = dateFormat.parse(date); timestamp = parsedDate.getTime(); } catch (ParseException e) { - e.printStackTrace(); + LOG.warn("{}", e); } if (timestamp == 0L) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml index 1ffdb03..e7ae3c3 100644 --- a/eagle-jpm/pom.xml +++ b/eagle-jpm/pom.xml @@ -32,6 +32,7 @@ pom + eagle-jpm-analyzer eagle-jpm-spark-running eagle-jpm-spark-history eagle-jpm-mr-history http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-server-assembly/src/main/conf/eagle.conf ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index af7e14a..a889914 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -108,6 +108,12 @@ application { recipients: "nobody@abc.com" template: "JobReportTemplate.vm" } + analyzerReport { + sender: "nobody@abc.com" + recipients: "nobody@abc.com" + template: "AnalyzerReportTemplate.vm" + cc: "nobody@abc.com" + } } # --------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java index bf5e695..950bb04 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java @@ -50,7 +50,7 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB topologyCheckAppConfig.getConfig().getString("service.username"), topologyCheckAppConfig.getConfig().getString("service.password")); - client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); + client.setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); String message = ""; try { @@ -80,7 +80,6 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) {