Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 684A9200B3E for ; Wed, 7 Sep 2016 19:42:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 66D32160AD3; Wed, 7 Sep 2016 17:42:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D9A28160ACF for ; Wed, 7 Sep 2016 19:42:07 +0200 (CEST) Received: (qmail 13109 invoked by uid 500); 7 Sep 2016 17:42:07 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 13100 invoked by uid 99); 7 Sep 2016 17:42:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:42:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 857E31A04BD for ; Wed, 7 Sep 2016 17:42:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Rl6HznffqrVa for ; Wed, 7 Sep 2016 17:41:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F0CF060CFA for ; Wed, 7 Sep 2016 17:41:57 +0000 (UTC) Received: (qmail 11864 invoked by uid 99); 7 Sep 2016 17:41:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Sep 2016 17:41:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24E71E04A2; Wed, 7 Sep 2016 17:41:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 07 Sep 2016 17:41:57 -0000 Message-Id: <7201ad3b09274d489e31c344a4940ff4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/52] [abbrv] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework [Forced Update!] archived-at: Wed, 07 Sep 2016 17:42:09 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 0474d5916 -> 21187b55c (forced update) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 3daae37..ca4a94f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -18,19 +18,20 @@ package org.apache.eagle.jpm.mr.history.storm; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.*; -import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; +import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.util.JobIdFilter; import org.apache.eagle.jpm.util.JobIdFilterByPartition; import org.apache.eagle.jpm.util.JobIdPartitioner; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +44,30 @@ import java.util.Map; /** * Zookeeper znode structure * -zkRoot - * - partitions - * - 0 (20150101) - * - 1 (20150101) - * - 2 (20150101) - * - ... ... - * - N-1 (20150102) - * - jobs - * - 20150101 - * - job1 - * - job2 - * - job3 - * - 20150102 - * - job1 - * - job2 - * - job3 - * + * - partitions + * - 0 (20150101) + * - 1 (20150101) + * - 2 (20150101) + * - ... ... + * - N-1 (20150102) + * - jobs + * - 20150101 + * - job1 + * - job2 + * - job3 + * - 20150102 + * - job1 + * - job2 + * - job3 + *

* Spout can have multiple instances, which is supported by storm parallelism primitive. - * + *

+ *

* Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N * partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition * is successfully processed. - * + *

+ *

* processing steps * 1) In constructor, * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext) @@ -74,10 +77,9 @@ import java.util.Map; * 7) process job files (job history file and job configuration xml file) * 8) add job Id to current date slot say for example 20150102 after this job is successfully processed * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable) - * + *

* Note: * if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario. - * */ public class JobHistorySpout extends BaseRichSpout { @@ -90,20 +92,18 @@ public class JobHistorySpout extends BaseRichSpout { private JobHistoryContentFilter contentFilter; private JobHistorySpoutCollectorInterceptor interceptor; private JHFInputStreamCallback callback; - private JHFConfigManager configManager; - private JobHistoryLCM m_jhfLCM; - private final static int MAX_RETRY_TIMES = 3; + private MRHistoryJobConfig configManager; + private JobHistoryLCM jhfLCM; + private static final int MAX_RETRY_TIMES = 3; - public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) { this(filter, configManager, new JobHistorySpoutCollectorInterceptor()); } /** - * mostly this constructor signature is for unit test purpose as you can put customized interceptor here - * @param filter - * @param adaptor + * mostly this constructor signature is for unit test purpose as you can put customized interceptor here. */ - public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) { this.contentFilter = filter; this.configManager = configManager; this.interceptor = adaptor; @@ -131,15 +131,15 @@ public class JobHistorySpout extends BaseRichSpout { partitionId = calculatePartitionId(context); // sanity verify 0<=partitionId<=numTotalPartitions-1 if (partitionId < 0 || partitionId > numTotalPartitions) { - throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + - partitionId + " and numTotalPartitions " + numTotalPartitions); + throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + + partitionId + " and numTotalPartitions " + numTotalPartitions); } Class partitionerCls = configManager.getControlConfig().partitionerCls; JobIdPartitioner partitioner; try { partitioner = partitionerCls.newInstance(); } catch (Exception e) { - LOG.error("failing instantiating job partitioner class " + partitionerCls,e); + LOG.error("failing instantiating job partitioner class " + partitionerCls, e); throw new IllegalStateException(e); } JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId); @@ -148,14 +148,14 @@ public class JobHistorySpout extends BaseRichSpout { interceptor.setSpoutOutputCollector(collector); try { - m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig()); + jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig()); driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(), - configManager.getControlConfig(), - callback, - zkState, - m_jhfLCM, - jobIdFilter, - partitionId); + configManager.getControlConfig(), + callback, + zkState, + jhfLCM, + jobIdFilter, + partitionId); } catch (Exception e) { LOG.error("failing creating crawler driver"); throw new IllegalStateException(e); @@ -171,7 +171,7 @@ public class JobHistorySpout extends BaseRichSpout { } catch (Exception ex) { LOG.error("fail crawling job history file and continue ...", ex); try { - m_jhfLCM.freshFileSystem(); + jhfLCM.freshFileSystem(); } catch (Exception e) { LOG.error("failed to fresh file system ", e); } @@ -179,27 +179,27 @@ public class JobHistorySpout extends BaseRichSpout { try { Thread.sleep(1000); } catch (Exception e) { - + // ignored } } } /** - * empty because framework will take care of output fields declaration + * empty because framework will take care of output fields declaration. */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } /** - * add to processedJob + * add to processedJob. */ @Override public void ack(Object jobId) { } /** - * job is not fully processed + * job is not fully processed. */ @Override public void fail(Object jobId) { @@ -227,26 +227,28 @@ public class JobHistorySpout extends BaseRichSpout { } } - if (minTimeStamp == 0l) { + if (minTimeStamp == 0L) { return; } LOG.info("update process time stamp {}", minTimeStamp); - final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); - final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); - Map baseTags = new HashMap() { { - put("site", jobExtractorConfig.site); - } }; + final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + Map baseTags = new HashMap() { + { + put("site", jobExtractorConfig.site); + } + }; JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity(); entity.setCurrentTimeStamp(minTimeStamp); entity.setTimestamp(minTimeStamp); entity.setTags(baseTags); IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleServiceHost, - eagleServiceConfig.eagleServicePort, - eagleServiceConfig.username, - eagleServiceConfig.password); + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); @@ -267,7 +269,7 @@ public class JobHistorySpout extends BaseRichSpout { LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); } } - tried ++; + tried++; } client.getJerseyClient().destroy(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java index 933b347..cbde88c 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java @@ -22,12 +22,20 @@ import java.util.List; public interface JobHistoryZKStateLCM { void ensureJobPartitions(int numTotalPartitions); + String readProcessedDate(int partitionId); + List readProcessedJobs(String date); + void updateProcessedDate(int partitionId, String date); + void addProcessedJob(String date, String jobId); + void truncateProcessedJob(String date); + void truncateEverything(); + long readProcessedTimeStamp(int partitionId); + void updateProcessedTimeStamp(int partitionId, long timeStamp); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java index 33d3cb2..feb896e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java @@ -18,11 +18,12 @@ package org.apache.eagle.jpm.mr.history.zkres; +import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; -import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +46,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { private CuratorFramework newCurator(ZKStateConfig config) throws Exception { return CuratorFrameworkFactory.newClient( - config.zkQuorum, - config.zkSessionTimeoutMs, - 15000, - new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) + config.zkQuorum, + config.zkSessionTimeoutMs, + 15000, + new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) ); } @@ -86,7 +87,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { if (_curator.checkExists().forPath(path) != null) { _curator.delete().forPath(path); } - } catch(Exception ex) { + } catch (Exception ex) { LOG.error("fail reading forceStartFrom znode", ex); } } @@ -102,27 +103,28 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { /** * under zkRoot, znode forceStartFrom is used to force job is crawled from that date * IF - * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD", + * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD", * THEN - * rebuild all partitions with the forceStartFrom + * rebuild all partitions with the forceStartFrom * ELSE - * IF - * partition structure is changed - * THEN - * IF - * there is valid mindate for existing partitions - * THEN - * rebuild job partitions from that valid mindate - * ELSE - * rebuild job partitions from (today - BACKOFF_DAYS) - * END - * ELSE - * do nothing - * END + * IF + * partition structure is changed + * THEN + * IF + * there is valid mindate for existing partitions + * THEN + * rebuild job partitions from that valid mindate + * ELSE + * rebuild job partitions from (today - BACKOFF_DAYS) + * END + * ELSE + * do nothing * END - * - * + * END + *

* forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time + *

+ * . */ @Override public void ensureJobPartitions(int numTotalPartitions) { @@ -137,7 +139,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { if (forceStartFrom != null) { try { minDate = Integer.valueOf(forceStartFrom); - } catch(Exception ex) { + } catch (Exception ex) { LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom); throw new IllegalStateException(); } @@ -153,16 +155,18 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions); } } - if (!structureChanged) + if (!structureChanged) { return; // do nothing + } if (pathExists) { List partitions = _curator.getChildren().forPath(path); for (String partition : partitions) { String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8"); int tmp = Integer.valueOf(date); - if(tmp < minDate) + if (tmp < minDate) { minDate = tmp; + } } } @@ -178,7 +182,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { } finally { try { lock.release(); - } catch(Exception e) { + } catch (Exception e) { LOG.error("fail releasing lock", e); throw new RuntimeException(e); } @@ -195,9 +199,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { for (int i = 0; i < numTotalPartitions; i++) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path + "/" + i, startingDate.getBytes("UTF-8")); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path + "/" + i, startingDate.getBytes("UTF-8")); } } @@ -222,9 +226,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, date.getBytes("UTF-8")); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, date.getBytes("UTF-8")); } else { _curator.setData().forPath(path, date.getBytes("UTF-8")); } @@ -240,9 +244,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); } else { _curator.setData().forPath(path); } @@ -311,10 +315,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); - return 0l; + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + return 0L; } else { return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8")); } @@ -330,9 +334,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM { try { if (_curator.checkExists().forPath(path) == null) { _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path); + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); } _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8")); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml new file mode 100644 index 0000000..5e69a16 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -0,0 +1,154 @@ + + + + + SPARK_HISTORY_JOB_APP + Spark History Job Monitoring + 0.5.0-incubating + org.apache.eagle.jpm.mr.history.MRHistoryJobApplication + /apps/jpm + + + + jobExtractorConfig.site + Site ID + sandbox + + + jobExtractorConfig.mrVersion + MRVer2 + + + jobExtractorConfig.readTimeOutSeconds + zkPort + 10 + + + dataSourceConfig.zkQuorum + sandbox.hortonworks.com:2181 + + + dataSourceConfig.zkPort + 2181 + + + dataSourceConfig.zkSessionTimeoutMs + 15000 + + + dataSourceConfig.zkRetryTimes + 3 + + + dataSourceConfig.zkRetryInterval + 20000 + + + dataSourceConfig.zkRoot + /test_mrjobhistory + + + dataSourceConfig.basePath + /mr-history/done + + + dataSourceConfig.jobTrackerName + + + + dataSourceConfig.nnEndpoint + hdfs://sandbox.hortonworks.com:8020 + + + dataSourceConfig.pathContainsJobTrackerName + false + + + dataSourceConfig.principal + + + + dataSourceConfig.keytab + + + + dataSourceConfig.dryRun + false + + + dataSourceConfig.partitionerCls + org.apache.eagle.jpm.util.DefaultJobIdPartitioner + + + dataSourceConfig.zeroBasedMonth + false + + + MRConfigureKeys.jobConfigKey + mapreduce.map.output.compress, + mapreduce.map.output.compress.codec, + mapreduce.output.fileoutputformat.compress, + mapreduce.output.fileoutputformat.compress.type, + mapreduce.output.fileoutputformat.compress.codec, + mapred.output.format.class, + dataplatform.etl.info, + mapreduce.map.memory.mb, + mapreduce.reduce.memory.mb, + mapreduce.map.java.opts, + mapreduce.reduce.java.opts + + + MRConfigureKeys.jobNameKey + eagle.job.name + + + envContextConfig.parallelismConfig.mrHistoryJobExecutor + 6 + + + envContextConfig.tasks.mrHistoryJobExecutor + 6 + + + eagleProps.eagleService.host + eagleProps.eagleService.host + sandbox.hortonworks.com + + + eagleProps.eagleService.port + eagleProps.eagleService.port + 9099 + + + eagleProps.eagleService.username + eagleProps.eagleService.username + admin + + + eagleProps.eagleService.password + eagleProps.eagleService.password + secret + + + + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..56a30bd --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 23a51fc..13e411f 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -15,9 +15,6 @@ { "envContextConfig" : { - "env" : "local", - "topologyName" : "mr_history", - "stormConfigFile" : "storm.yaml", "parallelismConfig" : { "mrHistoryJobExecutor" : 6 }, @@ -62,21 +59,10 @@ "password": "secret" } }, - + "appId":"mr_history", + "mode":"LOCAL", "MRConfigureKeys" : { "jobNameKey" : "eagle.job.name", - "jobConfigKey" : [ - "mapreduce.map.output.compress", - "mapreduce.map.output.compress.codec", - "mapreduce.output.fileoutputformat.compress", - "mapreduce.output.fileoutputformat.compress.type", - "mapreduce.output.fileoutputformat.compress.codec", - "mapred.output.format.class", - "dataplatform.etl.info", - "mapreduce.map.memory.mb", - "mapreduce.reduce.memory.mb", - "mapreduce.map.java.opts", - "mapreduce.reduce.java.opts" - ] + "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java new file mode 100644 index 0000000..0a3a3a1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import com.google.inject.Inject; +import org.apache.eagle.app.test.AppJUnitRunner; +import org.apache.eagle.app.test.ApplicationSimulator; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(AppJUnitRunner.class) +public class MRHistoryJobApplicationProviderTest { + @Inject private ApplicationSimulator simulator; + + @Test + public void testRunAsManagedApplicationWithSimulator(){ + simulator.start(MRHistoryJobApplicationProvider.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java new file mode 100644 index 0000000..318a641 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.mr.history; + +import com.typesafe.config.ConfigFactory; +import org.junit.Test; + +public class MRHistoryJobApplicationTest { + @Test + public void testRun(){ + new MRHistoryJobApplication().run(ConfigFactory.load()); + } +}