eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [1/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework
Date Wed, 24 Aug 2016 07:49:11 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop b52405f12 -> 0bde482be


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 3daae37..ca4a94f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,19 +18,20 @@
 
 package org.apache.eagle.jpm.mr.history.storm;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
-import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.util.JobIdFilterByPartition;
 import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,28 +44,30 @@ import java.util.Map;
 /**
  * Zookeeper znode structure
  * -zkRoot
- *   - partitions
- *      - 0 (20150101)
- *      - 1 (20150101)
- *      - 2 (20150101)
- *      - ... ...
- *      - N-1 (20150102)
- *   - jobs
- *      - 20150101
- *        - job1
- *        - job2
- *        - job3
- *      - 20150102
- *        - job1
- *        - job2
- *        - job3
- *
+ * - partitions
+ * - 0 (20150101)
+ * - 1 (20150101)
+ * - 2 (20150101)
+ * - ... ...
+ * - N-1 (20150102)
+ * - jobs
+ * - 20150101
+ * - job1
+ * - job2
+ * - job3
+ * - 20150102
+ * - job1
+ * - job2
+ * - job3
+ * <p>
  * Spout can have multiple instances, which is supported by storm parallelism primitive.
- *
+ * </p>
+ * <p>
  * Under znode partitions, N child znodes (name is 0 based integer) would be created with
each znode mapped to one spout instance. All jobs will be partitioned into N
  * partitions by applying JobPartitioner class to each job Id. The value of each partition
znode is the date when the last job in this partition
  * is successfully processed.
- *
+ * </p>
+ * <p>
  * processing steps
  * 1) In constructor,
  * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same
to spout taskId within TopologyContext)
@@ -74,10 +77,9 @@ import java.util.Map;
  * 7) process job files (job history file and job configuration xml file)
  * 8) add job Id to current date slot say for example 20150102 after this job is successfully
processed
  * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should
be configurable)
- *
+ * </p>
  * Note:
  * if one spout instance crashes and is brought up again, open() method would be invoked
again, we need think of this scenario.
- *
  */
 
 public class JobHistorySpout extends BaseRichSpout {
@@ -90,20 +92,18 @@ public class JobHistorySpout extends BaseRichSpout {
     private JobHistoryContentFilter contentFilter;
     private JobHistorySpoutCollectorInterceptor interceptor;
     private JHFInputStreamCallback callback;
-    private JHFConfigManager configManager;
-    private JobHistoryLCM m_jhfLCM;
-    private final static int MAX_RETRY_TIMES = 3;
+    private MRHistoryJobConfig configManager;
+    private JobHistoryLCM jhfLCM;
+    private static final int MAX_RETRY_TIMES = 3;
 
-    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager)
{
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager)
{
         this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
     }
 
     /**
-     * mostly this constructor signature is for unit test purpose as you can put customized
interceptor here
-     * @param filter
-     * @param adaptor
+     * mostly this constructor signature is for unit test purpose as you can put customized
interceptor here.
      */
-    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager,
JobHistorySpoutCollectorInterceptor adaptor) {
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager,
JobHistorySpoutCollectorInterceptor adaptor) {
         this.contentFilter = filter;
         this.configManager = configManager;
         this.interceptor = adaptor;
@@ -131,15 +131,15 @@ public class JobHistorySpout extends BaseRichSpout {
         partitionId = calculatePartitionId(context);
         // sanity verify 0<=partitionId<=numTotalPartitions-1
         if (partitionId < 0 || partitionId > numTotalPartitions) {
-            throw new IllegalStateException("partitionId should be less than numTotalPartitions
with partitionId " +
-                    partitionId + " and numTotalPartitions " + numTotalPartitions);
+            throw new IllegalStateException("partitionId should be less than numTotalPartitions
with partitionId "
+                + partitionId + " and numTotalPartitions " + numTotalPartitions);
         }
         Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
         JobIdPartitioner partitioner;
         try {
             partitioner = partitionerCls.newInstance();
         } catch (Exception e) {
-            LOG.error("failing instantiating job partitioner class " + partitionerCls,e);
+            LOG.error("failing instantiating job partitioner class " + partitionerCls, e);
             throw new IllegalStateException(e);
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions,
partitionId);
@@ -148,14 +148,14 @@ public class JobHistorySpout extends BaseRichSpout {
         interceptor.setSpoutOutputCollector(collector);
 
         try {
-            m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+            jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
             driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(),
-                    configManager.getControlConfig(),
-                    callback,
-                    zkState,
-                    m_jhfLCM,
-                    jobIdFilter,
-                    partitionId);
+                configManager.getControlConfig(),
+                callback,
+                zkState,
+                jhfLCM,
+                jobIdFilter,
+                partitionId);
         } catch (Exception e) {
             LOG.error("failing creating crawler driver");
             throw new IllegalStateException(e);
@@ -171,7 +171,7 @@ public class JobHistorySpout extends BaseRichSpout {
         } catch (Exception ex) {
             LOG.error("fail crawling job history file and continue ...", ex);
             try {
-                m_jhfLCM.freshFileSystem();
+                jhfLCM.freshFileSystem();
             } catch (Exception e) {
                 LOG.error("failed to fresh file system ", e);
             }
@@ -179,27 +179,27 @@ public class JobHistorySpout extends BaseRichSpout {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
-
+                // ignored
             }
         }
     }
 
     /**
-     * empty because framework will take care of output fields declaration
+     * empty because framework will take care of output fields declaration.
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }
 
     /**
-     * add to processedJob
+     * add to processedJob.
      */
     @Override
     public void ack(Object jobId) {
     }
 
     /**
-     * job is not fully processed
+     * job is not fully processed.
      */
     @Override
     public void fail(Object jobId) {
@@ -227,26 +227,28 @@ public class JobHistorySpout extends BaseRichSpout {
             }
         }
 
-        if (minTimeStamp == 0l) {
+        if (minTimeStamp == 0L) {
             return;
         }
 
         LOG.info("update process time stamp {}", minTimeStamp);
-        final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
-        Map<String, String> baseTags = new HashMap<String, String>() { {
-            put("site", jobExtractorConfig.site);
-        } };
+        final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        Map<String, String> baseTags = new HashMap<String, String>() {
+            {
+                put("site", jobExtractorConfig.site);
+            }
+        };
         JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
         entity.setCurrentTimeStamp(minTimeStamp);
         entity.setTimestamp(minTimeStamp);
         entity.setTags(baseTags);
 
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
 
@@ -267,7 +269,7 @@ public class JobHistorySpout extends BaseRichSpout {
                     LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES,
ex);
                 }
             }
-            tried ++;
+            tried++;
         }
 
         client.getJerseyClient().destroy();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
index 933b347..cbde88c 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -22,12 +22,20 @@ import java.util.List;
 
 public interface JobHistoryZKStateLCM {
     void ensureJobPartitions(int numTotalPartitions);
+
     String readProcessedDate(int partitionId);
+
     List<String> readProcessedJobs(String date);
+
     void updateProcessedDate(int partitionId, String date);
+
     void addProcessedJob(String date, String jobId);
+
     void truncateProcessedJob(String date);
+
     void truncateEverything();
+
     long readProcessedTimeStamp(int partitionId);
+
     void updateProcessedTimeStamp(int partitionId, long timeStamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 33d3cb2..feb896e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -18,11 +18,12 @@
 
 package org.apache.eagle.jpm.mr.history.zkres;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +46,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
 
     private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
         return CuratorFrameworkFactory.newClient(
-                config.zkQuorum,
-                config.zkSessionTimeoutMs,
-                15000,
-                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+            config.zkQuorum,
+            config.zkSessionTimeoutMs,
+            15000,
+            new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
         );
     }
 
@@ -86,7 +87,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
             if (_curator.checkExists().forPath(path) != null) {
                 _curator.delete().forPath(path);
             }
-        } catch(Exception ex) {
+        } catch (Exception ex) {
             LOG.error("fail reading forceStartFrom znode", ex);
         }
     }
@@ -102,27 +103,28 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
     /**
      * under zkRoot, znode forceStartFrom is used to force job is crawled from that date
      * IF
-     *    forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
+     * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
      * THEN
-     *    rebuild all partitions with the forceStartFrom
+     * rebuild all partitions with the forceStartFrom
      * ELSE
-     *    IF
-     *       partition structure is changed
-     *    THEN
-     *       IF
-     *          there is valid mindate for existing partitions
-     *       THEN
-     *          rebuild job partitions from that valid mindate
-     *       ELSE
-     *          rebuild job partitions from (today - BACKOFF_DAYS)
-     *       END
-     *    ELSE
-     *      do nothing
-     *    END
+     * IF
+     * partition structure is changed
+     * THEN
+     * IF
+     * there is valid mindate for existing partitions
+     * THEN
+     * rebuild job partitions from that valid mindate
+     * ELSE
+     * rebuild job partitions from (today - BACKOFF_DAYS)
+     * END
+     * ELSE
+     * do nothing
      * END
-     *
-     *
+     * END
+     * <p>
      * forceStartFrom is deleted once its value is used, so next time when topology is restarted,
program can run from where topology is stopped last time
+     * </p>
+     * .
      */
     @Override
     public void ensureJobPartitions(int numTotalPartitions) {
@@ -137,7 +139,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
             if (forceStartFrom != null) {
                 try {
                     minDate = Integer.valueOf(forceStartFrom);
-                } catch(Exception ex) {
+                } catch (Exception ex) {
                     LOG.error("failing converting forceStartFrom znode value to integer with
value " + forceStartFrom);
                     throw new IllegalStateException();
                 }
@@ -153,16 +155,18 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
                         LOG.info("znode partitions structure is changed, current partition
count " + currentCount + ", future count " + numTotalPartitions);
                     }
                 }
-                if (!structureChanged)
+                if (!structureChanged) {
                     return; // do nothing
+                }
 
                 if (pathExists) {
                     List<String> partitions = _curator.getChildren().forPath(path);
                     for (String partition : partitions) {
                         String date = new String(_curator.getData().forPath(path + "/" +
partition), "UTF-8");
                         int tmp = Integer.valueOf(date);
-                        if(tmp < minDate)
+                        if (tmp < minDate) {
                             minDate = tmp;
+                        }
                     }
                 }
 
@@ -178,7 +182,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
         } finally {
             try {
                 lock.release();
-            } catch(Exception e) {
+            } catch (Exception e) {
                 LOG.error("fail releasing lock", e);
                 throw new RuntimeException(e);
             }
@@ -195,9 +199,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
 
         for (int i = 0; i < numTotalPartitions; i++) {
             _curator.create()
-                    .creatingParentsIfNeeded()
-                    .withMode(CreateMode.PERSISTENT)
-                    .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
+                .creatingParentsIfNeeded()
+                .withMode(CreateMode.PERSISTENT)
+                .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
         }
     }
 
@@ -222,9 +226,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, date.getBytes("UTF-8"));
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, date.getBytes("UTF-8"));
             } else {
                 _curator.setData().forPath(path, date.getBytes("UTF-8"));
             }
@@ -240,9 +244,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
             } else {
                 _curator.setData().forPath(path);
             }
@@ -311,10 +315,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
-                return 0l;
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
+                return 0L;
             } else {
                 return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8"));
             }
@@ -330,9 +334,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM
{
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
             }
 
             _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8"));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
new file mode 100644
index 0000000..5e69a16
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>SPARK_HISTORY_JOB_APP</type>
+    <name>Spark History Job Monitoring</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.jpm.mr.history.MRHistoryJobApplication</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig -->
+        <property>
+            <name>jobExtractorConfig.site</name>
+            <displayName>Site ID</displayName>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.mrVersion</name>
+            <value>MRVer2</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.readTimeOutSeconds</name>
+            <displayName>zkPort</displayName>
+            <value>10</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkQuorum</name>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkPort</name>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkSessionTimeoutMs</name>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryTimes</name>
+            <value>3</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryInterval</name>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRoot</name>
+            <value>/test_mrjobhistory</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.basePath</name>
+            <value>/mr-history/done</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.jobTrackerName</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.nnEndpoint</name>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.pathContainsJobTrackerName</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.principal</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.keytab</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.dryRun</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.partitionerCls</name>
+            <value>org.apache.eagle.jpm.util.DefaultJobIdPartitioner</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zeroBasedMonth</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>MRConfigureKeys.jobConfigKey</name>
+            <value>mapreduce.map.output.compress,
+                mapreduce.map.output.compress.codec,
+                mapreduce.output.fileoutputformat.compress,
+                mapreduce.output.fileoutputformat.compress.type,
+                mapreduce.output.fileoutputformat.compress.codec,
+                mapred.output.format.class,
+                dataplatform.etl.info,
+                mapreduce.map.memory.mb,
+                mapreduce.reduce.memory.mb,
+                mapreduce.map.java.opts,
+                mapreduce.reduce.java.opts</value>
+        </property>
+        <property>
+            <name>MRConfigureKeys.jobNameKey</name>
+            <value>eagle.job.name</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.mrHistoryJobExecutor</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.mrHistoryJobExecutor</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+        </install>
+        <uninstall>
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..56a30bd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 23a51fc..13e411f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -15,9 +15,6 @@
 
 {
   "envContextConfig" : {
-    "env" : "local",
-    "topologyName" : "mr_history",
-    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrHistoryJobExecutor" : 6
     },
@@ -62,21 +59,10 @@
       "password": "secret"
     }
   },
-
+  "appId":"mr_history",
+  "mode":"LOCAL",
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
-    "jobConfigKey" : [
-        "mapreduce.map.output.compress",
-        "mapreduce.map.output.compress.codec",
-        "mapreduce.output.fileoutputformat.compress",
-        "mapreduce.output.fileoutputformat.compress.type",
-        "mapreduce.output.fileoutputformat.compress.codec",
-        "mapred.output.format.class",
-        "dataplatform.etl.info",
-        "mapreduce.map.memory.mb",
-        "mapreduce.reduce.memory.mb",
-        "mapreduce.map.java.opts",
-        "mapreduce.reduce.java.opts"
-    ]
+    "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class,
dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
new file mode 100644
index 0000000..0a3a3a1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.AppJUnitRunner;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AppJUnitRunner.class)
+public class MRHistoryJobApplicationProviderTest {
+    @Inject private ApplicationSimulator simulator;
+
+    @Test
+    public void testRunAsManagedApplicationWithSimulator(){
+        simulator.start(MRHistoryJobApplicationProvider.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
new file mode 100644
index 0000000..318a641
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class MRHistoryJobApplicationTest {
+    @Test
+    public void testRun(){
+        new MRHistoryJobApplication().run(ConfigFactory.load());
+    }
+}


Mime
View raw message