eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/3] incubator-eagle git commit: EAGLE-465 Hive security monitoring refactor Hive security monitoring refactor with jpm
Date Mon, 15 Aug 2016 18:18:56 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop ecabbe4ca -> 18ae3bbbb


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
deleted file mode 100644
index 41da60a..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/zkres/JobRunningZKStateManager.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.zkres;
-
-import java.nio.charset.StandardCharsets;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.eagle.common.config.EagleConfigFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jobrunning.common.JobConstants;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.common.DateTimeUtil;
-
-public class JobRunningZKStateManager implements JobRunningZKStateLCM{
-	public static final Logger LOG = LoggerFactory.getLogger(JobRunningZKStateManager.class);
-	private String zkRoot;
-	private CuratorFramework _curator;
-	
-	public static final String DATE_FORMAT_PATTERN = "yyyyMMdd";
-	
-	private CuratorFramework newCurator(RunningJobCrawlConfig config) throws Exception {
-        return CuratorFrameworkFactory.newClient(
-        	config.zkStateConfig.zkQuorum,
-            config.zkStateConfig.zkSessionTimeoutMs,
-            15000,
-            new RetryNTimes(config.zkStateConfig.zkRetryTimes, config.zkStateConfig.zkRetryInterval)
-        );
-    }
-	  
-	public JobRunningZKStateManager(RunningJobCrawlConfig config) {
-		this.zkRoot = config.zkStateConfig.zkRoot;
-        try {
-            _curator = newCurator(config);
-            _curator.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-	
-	public void close() {
-        _curator.close();
-        _curator = null;
-    }
-	
-	public long getTimestampFromDate(String dateStr) throws ParseException {
-		SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT_PATTERN);
-        sdf.setTimeZone(EagleConfigFactory.load().getTimeZone());
-		Date d = sdf.parse(dateStr);
-		return d.getTime();
-	}
-	
-	@Override
-	public List<String> readProcessedJobs(JobConstants.ResourceType type) {
-		String path = zkRoot + "/" + type.name() + "/jobs";
-		InterProcessMutex lock = new InterProcessMutex(_curator, path);
-		try {
-			lock.acquire();
-            if (_curator.checkExists().forPath(path) != null) {
-            	LOG.info("Got processed job list from zk, type: " + type.name());
-            	return _curator.getChildren().forPath(path);
-            } else {
-            	LOG.info("Currently processed job list is empty, type: " + type.name());
-                return new ArrayList<String>();
-            }
-        } catch (Exception e) {
-        	LOG.error("fail read processed jobs", e);
-            throw new RuntimeException(e);
-        }
-		finally {
-        	try{
-        		lock.release();
-        	}catch(Exception e){
-        		LOG.error("fail releasing lock", e);
-        		throw new RuntimeException(e);
-        	}
-		}
-	}
-
-	@Override
-	public void addProcessedJob(JobConstants.ResourceType type, String jobID) {
-		String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
-		try {
-			//we record date for cleanup, e.g. cleanup job's znodes whose created date < 20150801
-			String date = DateTimeUtil.format(System.currentTimeMillis(), DATE_FORMAT_PATTERN);
-			LOG.info("add processed job, jobID: " + jobID + ", type: " + type + ", date: " + date);
-            if (_curator.checkExists().forPath(path) == null) {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, date.getBytes(StandardCharsets.UTF_8));
-            }
-            else {
-                LOG.warn("Job already exist in zk, skip the job: " + jobID + " , type: " + type);
-            }
-        } catch (Exception e) {
-        	LOG.error("fail adding processed jobs", e);
-            throw new RuntimeException(e);
-        }
-	}
-
-	@Override
-	public void truncateJobBefore(JobConstants.ResourceType type, String date) {
-		String path = zkRoot + "/" + type.name() + "/jobs";
-		InterProcessMutex lock = new InterProcessMutex(_curator, path);
-		try {
-			lock.acquire();
-    		long thresholdTime = getTimestampFromDate(date);
-            if (_curator.checkExists().forPath(path) != null) {
-    			LOG.info("Going to delete processed job before " + date + ", type: " + type);
-            	List<String> jobIDList = _curator.getChildren().forPath(path);
-            	for(String jobID : jobIDList) {
-            		if (!jobID.startsWith("job_")) continue; // skip lock node
-            		String jobPath = path + "/" + jobID;
-            		long createTime = getTimestampFromDate(new String(_curator.getData().forPath(jobPath), StandardCharsets.UTF_8));
-            		if (createTime < thresholdTime) {
-            			LOG.info("Going to truncate job: " + jobPath);
-        				_curator.delete().deletingChildrenIfNeeded().forPath(jobPath);
-            		}
-            	}
-            }
-            else {
-            	LOG.info("Currently processed job list is empty, type: " + type.name());                
-            }
-        } catch (Exception e) {
-        	LOG.error("fail deleting processed jobs", e);
-            throw new RuntimeException(e);
-        }
-		finally {
-        	try{
-        		lock.release();
-        	}catch(Exception e){
-        		LOG.error("fail releasing lock", e);
-        		throw new RuntimeException(e);
-        	}
-		}
-	}
-	
-	@Override
-	public void truncateProcessedJob(JobConstants.ResourceType type, String jobID) {
-		LOG.info("trying to truncate all data for job " + jobID);
-		String path = zkRoot + "/" + type.name() + "/jobs/" + jobID;
-		InterProcessMutex lock = new InterProcessMutex(_curator, path);
-		try {
-			lock.acquire();
-            if (_curator.checkExists().forPath(path) != null) {
-                _curator.delete().deletingChildrenIfNeeded().forPath(path);
-                LOG.info("really truncated all data for jobID: " + jobID);
-            }
-        } catch (Exception e) {
-        	LOG.error("fail truncating processed jobs", e);
-    		throw new RuntimeException(e);
-        }
-		finally {
-        	try{
-        		lock.release();
-        	}catch(Exception e){
-        		LOG.error("fail releasing lock", e);
-        		throw new RuntimeException(e);
-        	}
-		}
-	}
-
-	@Override
-	public void truncateEverything() {
-		String path = zkRoot;
-		InterProcessMutex lock = new InterProcessMutex(_curator, path);
-		try{
-			lock.acquire();
-			if(_curator.checkExists().forPath(path) != null){
-				_curator.delete().deletingChildrenIfNeeded().forPath(path);
-			}
-		}catch(Exception ex){
-			LOG.error("fail truncating verything", ex);
-			throw new RuntimeException(ex);
-		}
-		finally {
-        	try{
-        		lock.release();
-        	}catch(Exception e){
-        		LOG.error("fail releasing lock", e);
-        		throw new RuntimeException(e);
-        	}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/java/org/apache/eagle/jobrunning/job/conf/TestJobConfParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/java/org/apache/eagle/jobrunning/job/conf/TestJobConfParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/java/org/apache/eagle/jobrunning/job/conf/TestJobConfParserImpl.java
deleted file mode 100644
index fdc9359..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/java/org/apache/eagle/jobrunning/job/conf/TestJobConfParserImpl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.eagle.jobrunning.job.conf;
-
-import org.jsoup.Jsoup;
-import org.jsoup.nodes.Document;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.InputStream;
-import java.util.Map;
-
-public class TestJobConfParserImpl {
-
-    public static final String SQL = "CREATE TABLE XXX.XXX as SELECT /*+ MAPJOIN(XXX,XXX) */ trim(x.XXX) AS hc.XXX, hc.XXX, SUM(x.XXX) AS XXX FROM XXX.XXX x INNER JOIN XXX.XXX XXX ON x.XXX = XXX.XXX AND XXX.XXX = 1 INNER JOIN XXX.XXX dp ON XXX.XXX = XXX.XXX AND XXX.XXX = 1 INNER JOIN XXX.XXX hc ON XXX.XXX = XXX.XXX AND XXX.XXX=1 LEFT OUTER JOIN XXX.XXX hsc ON hsc.XXX = hc.XXX AND hsc.XXX=1 WHERE x.ds = 'XXX' AND length(x.XXX) > 0 AND x.XXX = 51 GROUP BY trim(x.XXX), hc.XXX, hc.XXX";
-
-    @Test
-    public void test() throws Exception {
-        InputStream is = this.getClass().getResourceAsStream("/jobconf.html");
-        final Document doc = Jsoup.parse(is, "UTF-8", "{historyUrl}/jobhistory/conf/job_xxxxxxxxxxxxx_xxxxxx");
-        JobConfParser parser = new JobConfParserImpl();
-        Map<String, String> configs = parser.parse(doc);
-        Assert.assertEquals(configs.size(), 3);
-        Assert.assertEquals(SQL, configs.get("mapreduce.workflow.name"));
-        Assert.assertEquals("0.0.0.0:50070", configs.get("dfs.namenode.http-address"));
-        Assert.assertEquals("org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory", configs.get("hive.repl.task.factory"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
deleted file mode 100644
index 63ce446..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/hive-jobrunning.conf
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "cluster",
-    "topologyName" : "hiveQueryRunningTopology",
-    "stormConfigFile" : "hive.storm.yaml",
-    "parallelismConfig" : {
-      "msgConsumer" : 2
-    }
-  },
-  "dataSourceConfig": {
-    "flavor" : "stormrunning",
-    "zkQuorum" : "localhost:12181",
-    "zkRoot" : "/jobrunning",
-    "zkSessionTimeoutMs" : 15000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 2000,    
-    "RMEndPoints" : "http://localhost:8088/",
-    "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl"
-  },
-  "eagleProps" : {
-    "site" : "sandbox",
-    "mailHost" : "mailHost.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
-    "eagleService": {
-      "host": "localhost",
-      "port": "9099",
-      "username": "admin",
-      "password": "secret"
-    }
-  },
-  "dynamicConfigSource" : {
-    "enabled" : true,
-    "initDelayMillis" : 0,
-    "delayMillis" : 30000,
-    "ignoreDeleteFromSource" : true
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/jobconf.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/jobconf.html b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/jobconf.html
deleted file mode 100644
index b579a51..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/jobconf.html
+++ /dev/null
@@ -1,230 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
-<html>
-  <meta http-equiv="X-UA-Compatible" content="IE=8">
-  <meta http-equiv="Content-type" content="text/html; charset=UTF-8">
-  <style type="text/css">
-    #conf_paginate span {font-weight:normal}
-    #conf .progress {width:8em}
-    #conf_processing {top:-1.5em; font-size:1em;
-      color:#000; background:rgba(255, 255, 255, 0.8)}
-  </style>
-  <title>
-    Configuration for MapReduce Job job_1462444563491_89969
-  </title>
-  <link rel="stylesheet" href="/static/yarn.css">
-  <style type="text/css">
-    #layout { height: 100%; }
-    #layout thead td { height: 3em; }
-    #layout #navcell { width: 11em; padding: 0 1em; }
-    #layout td.content { padding-top: 0 }
-    #layout tbody { vertical-align: top; }
-    #layout tfoot td { height: 4em; }
-  </style>
-  <link rel="stylesheet" href="/static/jquery/themes-1.9.1/base/jquery-ui.css">
-  <link rel="stylesheet" href="/static/dt-1.9.4/css/jui-dt.css">
-  <script type="text/javascript" src="/static/jquery/jquery-1.8.2.min.js">
-  </script>
-  <script type="text/javascript" src="/static/jquery/jquery-ui-1.9.1.custom.min.js">
-  </script>
-  <script type="text/javascript" src="/static/dt-1.9.4/js/jquery.dataTables.min.js">
-  </script>
-  <script type="text/javascript" src="/static/yarn.dt.plugins.js">
-  </script>
-  <style type="text/css">
-    #jsnotice { padding: 0.2em; text-align: center; }
-    .ui-progressbar { height: 1em; min-width: 5em }
-  </style>
-  <script type="text/javascript">
-    $(function() {
-      $('#nav').accordion({autoHeight:false, active:1});
-    confDataTable =  $('#conf').dataTable({bStateSave : true, "fnStateSave": function (oSettings, oData) { sessionStorage.setItem( oSettings.sTableId, JSON.stringify(oData) ); }, "fnStateLoad": function (oSettings) { return JSON.parse( sessionStorage.getItem(oSettings.sTableId) );}, bJQueryUI:true, sPaginationType: 'full_numbers', iDisplayLength:20, aLengthMenu:[20, 40, 60, 80, 100]}).fnSetFilteringDelay(188);
-    var confInitVals = new Array();
-$('tfoot input').keyup( function () 
-{  confDataTable.fnFilter( this.value, $('tfoot input').index(this) );
-} );
-$('tfoot input').each( function (i) {
-  confInitVals[i] = this.value;
-} );
-$('tfoot input').focus( function () {
-  if ( this.className == 'search_init' )
-  {
-    this.className = '';
-    this.value = '';
-  }
-} );
-$('tfoot input').blur( function (i) {
-  if ( this.value == '' )
-  {
-    this.className = 'search_init';
-    this.value = confInitVals[$('tfoot input').index(this)];
-  }
-} );
-
-    });
-  </script>
-  <div id="jsnotice" class="ui-state-error">
-    This page works best with javascript enabled.
-  </div>
-  <script type="text/javascript">
-    $('#jsnotice').hide();
-  </script>
-  <table id="layout" class="ui-widget-content">
-    <thead>
-      <tr>
-        <td colspan="2">
-          <div id="header" class="ui-widget">
-            <div id="user">
-              Logged in as: dr.who
-            </div>
-            <div id="logo">
-              <img src="/static/hadoop-st.png">
-            </div>
-            <h1>
-              Configuration for MapReduce Job job_1462444563491_89969
-            </h1>
-          </div>
-        </td>
-      </tr>
-    </thead>
-    <tfoot>
-      <tr>
-        <td colspan="2">
-          <div id="footer" class="ui-widget">
-          </div>
-        </td>
-      </tr>
-    </tfoot>
-    <tbody>
-      <tr>
-        <td id="navcell">
-          <div id="nav">
-            <h3>
-              Application
-            </h3>
-            <ul>
-              <li>
-                <a href="/jobhistory/about">About</a>
-              <li>
-                <a href="/jobhistory/app">Jobs</a>
-            </ul>
-            <h3>
-              Job
-            </h3>
-            <ul>
-              <li>
-                <a href="/jobhistory/job/job_1462444563491_89969">Overview</a>
-              <li>
-                <a href="/jobhistory/jobcounters/job_1462444563491_89969">Counters</a>
-              <li>
-                <a href="/jobhistory/conf/job_1462444563491_89969">Configuration</a>
-              <li>
-                <a href="/jobhistory/tasks/job_1462444563491_89969/m">Map tasks</a>
-              <li>
-                <a href="/jobhistory/tasks/job_1462444563491_89969/r">Reduce tasks</a>
-            </ul>
-            <h3>
-              Tools
-            </h3>
-            <ul>
-              <li>
-                <a href="/conf">Configuration</a>
-              <li>
-                <a href="/logs">Local logs</a>
-              <li>
-                <a href="/stacks">Server stacks</a>
-              <li>
-                <a href="/metrics">Server metrics</a>
-            </ul>
-          </div>
-        </td>
-        <td class="content">
-          <div>
-            <h3>
-              viewfs://xxx/user/history/done/2016/05/13/000089/job_1462444563491_89969_conf.xml
-            </h3>
-          </div>
-          <table id="conf">
-            <thead>
-              <tr>
-                <th class="ui-state-default">
-                  key
-                </th>
-                <th class="ui-state-default">
-                  value
-                </th>
-                <th class="ui-state-default">
-                  source chain
-                </th>
-              </tr>
-            </thead>
-            <tbody>
-            <tr>
-              <td>
-                mapreduce.workflow.name
-              </td>
-              <td>
-                CREATE TABLE XXX.XXX  as SELECT /*+ MAPJOIN(XXX,XXX) */ trim(x.XXX)  AS   hc.XXX, hc.XXX, SUM(x.XXX)  AS  XXX FROM XXX.XXX x INNER   JOIN XXX.XXX XXX ON x.XXX = XXX.XXX AND XXX.XXX = 1 INNER  JOIN XXX.XXX dp ON XXX.XXX = XXX.XXX AND XXX.XXX = 1 INNER JOIN XXX.XXX hc ON XXX.XXX = XXX.XXX AND XXX.XXX=1 LEFT   OUTER JOIN XXX.XXX hsc ON hsc.XXX = hc.XXX AND hsc.XXX=1 WHERE x.ds = 'XXX' AND length(x.XXX) &gt; 0 AND x.XXX = 51 GROUP BY trim(x.XXX), hc.XXX, hc.XXX
-              </td>
-              <td>
-                job.xml &#11013; programatically
-              </td>
-            </tr>
-            <tr>
-              <td>
-                dfs.namenode.http-address
-              </td>
-              <td>
-                0.0.0.0:50070
-              </td>
-              <td>
-                job.xml &#11013; hdfs-default.xml
-              </td>
-            </tr>
-            <tr>
-              <td>
-                hive.repl.task.factory
-              </td>
-              <td>
-                org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory
-              </td>
-              <td>
-                job.xml &#11013; org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@61284cb6 &#11013; programatically
-              </td>
-            </tr>
-            </tbody>
-            <tfoot>
-              <tr>
-                <th>
-                  <input class="search_init" type="text" name="key" value="key">
-                </th>
-                <th>
-                  <input class="search_init" type="text" name="value" value="value">
-                </th>
-                <th>
-                  <input class="search_init" type="text" name="source chain" value="source chain">
-                </th>
-              </tr>
-            </tfoot>
-          </table>
-        </td>
-      </tr>
-    </tbody>
-  </table>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/pom.xml b/eagle-core/eagle-data-process/pom.xml
index 60bfced..897cbf8 100644
--- a/eagle-core/eagle-data-process/pom.xml
+++ b/eagle-core/eagle-data-process/pom.xml
@@ -28,8 +28,6 @@
     <description>Eagle Data Process Framework</description>
     <modules>
         <module>eagle-stream-process-base</module>
-    	<module>eagle-storm-jobrunning-spout</module>
-    	<module>eagle-job-common</module>
         <module>eagle-stream-process-api</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
index e357cf6..447a59a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
@@ -20,8 +20,8 @@ package org.apache.eagle.jpm.mr.history.common;
 
 import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner;
-import org.apache.eagle.jpm.mr.history.storm.JobIdPartitioner;
+import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -157,7 +157,7 @@ public class JHFConfigManager implements Serializable {
             this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
             assert this.controlConfig.partitionerCls != null;
         } catch (Exception e) {
-            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner", e);
+            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
             this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
         } finally {
             LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 8445434..d3e1816 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -20,7 +20,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.storm.JobIdFilter;
+import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
deleted file mode 100644
index 451b921..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/DefaultJobIdPartitioner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-public class DefaultJobIdPartitioner implements JobIdPartitioner {
-    @Override
-    public int partition(int numTotalParts, String jobId) {
-        int hash = jobId.hashCode();
-        hash = Math.abs(hash);
-        return hash % numTotalParts;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index a0cdba7..6e9ccfa 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -22,12 +22,13 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
 import org.apache.eagle.jpm.mr.history.crawler.*;
 import org.apache.eagle.jpm.mr.history.entities.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.util.JobIdFilter;
+import org.apache.eagle.jpm.util.JobIdFilterByPartition;
+import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
deleted file mode 100644
index b58c84f..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilter.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-public interface JobIdFilter {
-    boolean accept(String jobId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
deleted file mode 100644
index 07b8519..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdFilterByPartition.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-public class JobIdFilterByPartition implements JobIdFilter {
-    private JobIdPartitioner partitioner;
-    private int numTotalPartitions;
-    private int partitionId;
-
-    public JobIdFilterByPartition(JobIdPartitioner partitioner, int numTotalPartitions, int partitionId) {
-        this.partitioner = partitioner;
-        this.numTotalPartitions = numTotalPartitions;
-        this.partitionId = partitionId;
-    }
-
-    @Override
-    public boolean accept(String jobId) {
-        int part = partitioner.partition(numTotalPartitions, jobId);
-        if (part == partitionId) {
-            return true;
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
deleted file mode 100644
index cc7e68c..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobIdPartitioner.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.storm;
-
-public interface JobIdPartitioner {
-    int partition(int numTotalParts, String jobId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 1b97271..60f90a0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -48,7 +48,7 @@
     "jobTrackerName" : "",
     "zeroBasedMonth" : false,
     "dryRun" : false,
-    "partitionerCls" : "org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner",
+    "partitionerCls" : "org.apache.eagle.jpm.util.DefaultJobIdPartitioner",
     "timeZone" : "UTC"
   },
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index 3c8aa92..1786a44 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -67,11 +67,6 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-job-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.jsoup</groupId>
             <artifactId>jsoup</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
index 4f50350..d11d3d5 100644
--- a/eagle-jpm/eagle-jpm-spark-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -29,17 +29,6 @@
   <name>eagle-jpm-spark-history</name>
   <url>http://maven.apache.org</url>
   <dependencies>
-  	<dependency>
-  		<groupId>org.apache.eagle</groupId>
-  		<artifactId>eagle-job-common</artifactId>
-  		<version>${project.version}</version>
-		<exclusions>
-		<exclusion>
-			<groupId>org.wso2.orbit.com.lmax</groupId>
-			<artifactId>disruptor</artifactId>
-		</exclusion>
-		</exclusions>
-	</dependency>
 	  <dependency>
 		  <groupId>org.apache.eagle</groupId>
 		  <artifactId>eagle-jpm-util</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index 7f5e6e8..cc53e7c 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -69,11 +69,6 @@
           </exclusions>
       </dependency>
       <dependency>
-          <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-job-common</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-      <dependency>
         <groupId>org.jsoup</groupId>
         <artifactId>jsoup</artifactId>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index a633fd4..b819340 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -84,7 +84,8 @@ public class Constants {
         UNDEFINED, SUCCEEDED, FAILED, KILLED
     }
     public enum ResourceType {
-         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO
+        COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO, JOB_CONFIGURATION,
+        COMPLETE_MR_JOB
     }
 
     //MR
@@ -110,6 +111,8 @@ public class Constants {
         public static final String CASCADING_JOB = "cascading.app.name";
     }
 
+    public static final String HIVE_QUERY_STRING = "hive.query.string";
+
     /**
      * MR task types
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/DefaultJobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/DefaultJobIdPartitioner.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/DefaultJobIdPartitioner.java
new file mode 100644
index 0000000..cc8df4f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/DefaultJobIdPartitioner.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public class DefaultJobIdPartitioner implements JobIdPartitioner {
+    @Override
+    public int partition(int numTotalParts, String jobId) {
+        int hash = jobId.hashCode();
+        hash = Math.abs(hash);
+        return hash % numTotalParts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilter.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilter.java
new file mode 100644
index 0000000..af9308a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilter.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public interface JobIdFilter {
+    boolean accept(String jobId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilterByPartition.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilterByPartition.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilterByPartition.java
new file mode 100644
index 0000000..c995b1a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdFilterByPartition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public class JobIdFilterByPartition implements JobIdFilter {
+    private JobIdPartitioner partitioner;
+    private int numTotalPartitions;
+    private int partitionId;
+
+    public JobIdFilterByPartition(JobIdPartitioner partitioner, int numTotalPartitions, int partitionId) {
+        this.partitioner = partitioner;
+        this.numTotalPartitions = numTotalPartitions;
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public boolean accept(String jobId) {
+        int part = partitioner.partition(numTotalPartitions, jobId);
+        if (part == partitionId) {
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdPartitioner.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdPartitioner.java
new file mode 100644
index 0000000..487da3b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobIdPartitioner.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.util;
+
+public interface JobIdPartitioner {
+    int partition(int numTotalParts, String jobId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 55ffc17..9b6a28f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -38,6 +38,7 @@ public class RunningJobManager implements Serializable {
     private CuratorFramework curator;
     private final static String ENTITY_TAGS_KEY = "entityTags";
     private final static String APP_INFO_KEY = "appInfo";
+    private final static String ZNODE_LAST_FINISH_TIME = "lastFinishTime";
 
     private CuratorFramework newCurator(String zkQuorum, int zkSessionTimeoutMs, int zkRetryTimes, int zkRetryInterval) throws Exception {
         return CuratorFrameworkFactory.newClient(
@@ -252,4 +253,30 @@ public class RunningJobManager implements Serializable {
         }
         return result;
     }
+
+    public Long recoverLastFinishedTime(int partitionId) {
+        String path = this.zkRoot + "/" + partitionId + "/" + ZNODE_LAST_FINISH_TIME;
+        try {
+            return Long.valueOf(new String(curator.getData().forPath(path)));
+        } catch (Exception e) {
+            LOG.error("failed to recover last finish time {}", e);
+        }
+
+        return 0l;
+    }
+
+    public void updateLastFinishTime(int partitionId, Long lastFinishTime) {
+        String path = this.zkRoot + "/" + partitionId + "/" + ZNODE_LAST_FINISH_TIME;
+        try {
+            if (curator.checkExists().forPath(path) == null) {
+                curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path);
+            }
+            curator.setData().forPath(path, lastFinishTime.toString().getBytes("UTF-8"));
+        } catch (Exception e) {
+            LOG.error("failed to update last finish time {}", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
index 1b29f0c..44336e2 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -30,6 +30,7 @@ import org.apache.eagle.jpm.util.resourceFetch.model.ClusterInfoWrapper;
 import org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl;
 import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
 import org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourceFetch.url.URLUtil;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -46,7 +47,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 	private final HAURLSelector selector;
 	private final ServiceURLBuilder jobListServiceURLBuilder;
 	private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
-	
 	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
 	
 	static {
@@ -66,13 +66,12 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 		}
 	}
 	
-	private List<AppInfo> doFetchSparkFinishApplicationsList(String lastFinishTime) throws Exception {
+	private List<AppInfo> doFetchFinishApplicationsList(String urlString) throws Exception {
 		List<AppInfo> result;
 		InputStream is = null;
 		try {
 			checkUrl();
-			final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), lastFinishTime);
-			LOG.info("Going to call yarn api to fetch finished spark job list: " + urlString);
+			LOG.info("Going to call yarn api to fetch finished application list: " + urlString);
 			is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
 			final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
 			if (appWrapper != null && appWrapper.getApps() != null
@@ -94,25 +93,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 		return sb.toString();
     }
 
-    private List<AppInfo> doFetchSparkRunningApplicationsList() throws Exception {
-        List<AppInfo> result;
-        InputStream is = null;
-        try {
-            checkUrl();
-            final String urlString = getSparkRunningJobURL();
-            LOG.info("Going to call yarn api to fetch running spark job list: " + urlString);
-            is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.NONE);
-            final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
-            if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) {
-                result = appWrapper.getApps().getApp();
-                return result;
-            }
-            return null;
-        } finally {
-            if (is != null)  { try { is.close();} catch (Exception e) { } }
-        }
-    }
-
     private String getMRRunningJobURL() {
         return String.format("%s/%s?applicationTypes=MAPREDUCE&state=RUNNING&%s",
                 selector.getSelectedUrl(),
@@ -120,13 +100,22 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
                 Constants.ANONYMOUS_PARAMETER);
     }
 
-	private List<AppInfo> doFetchMRRunningApplicationsList() throws Exception {
+    public String getMRFinishedJobURL(String lastFinishedTime) {
+        String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
+        StringBuilder sb = new StringBuilder();
+        sb.append(url).append("/").append(Constants.V2_APPS_URL);
+        sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=");
+        sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER);
+
+        return sb.toString();
+    }
+
+	private List<AppInfo> doFetchRunningApplicationsList(String urlString) throws Exception {
         List<AppInfo> result;
         InputStream is = null;
         try {
             checkUrl();
-            final String urlString = getMRRunningJobURL();
-            LOG.info("Going to call yarn api to fetch running mr job list: " + urlString);
+            LOG.info("Going to call yarn api to fetch running application list: " + urlString);
             is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
             final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
             if (appWrapper != null && appWrapper.getApps() != null && appWrapper.getApps().getApp() != null) {
@@ -142,11 +131,14 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 	public List<AppInfo> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
 		switch(resoureType) {
 			case COMPLETE_SPARK_JOB:
-				return doFetchSparkFinishApplicationsList((String)parameter[0]);
+                final String urlString = sparkCompleteJobServiceURLBuilder.build((String)parameter[0]);
+                return doFetchFinishApplicationsList(urlString);
 			case RUNNING_SPARK_JOB:
-                return doFetchSparkRunningApplicationsList();
+                return doFetchRunningApplicationsList(getSparkRunningJobURL());
             case RUNNING_MR_JOB:
-                return doFetchMRRunningApplicationsList();
+                return doFetchRunningApplicationsList(getMRRunningJobURL());
+            case COMPLETE_MR_JOB:
+                return doFetchFinishApplicationsList(getMRFinishedJobURL((String)parameter[0]));
 			default:
 				throw new Exception("Not support resourceType :" + resoureType);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/pom.xml b/eagle-security/eagle-security-hive/pom.xml
index d431d46..a2e5a32 100644
--- a/eagle-security/eagle-security-hive/pom.xml
+++ b/eagle-security/eagle-security-hive/pom.xml
@@ -29,15 +29,19 @@
   <packaging>jar</packaging>
 
   <dependencies>
-  	  <dependency>
-	      <groupId>org.apache.eagle</groupId>
-	  	  <artifactId>eagle-storm-jobrunning-spout</artifactId>
-          <version>${project.version}</version>
-	   </dependency>
 	   <dependency>
 	      <groupId>org.apache.curator</groupId>
 	  	  <artifactId>curator-framework</artifactId>
 	   </dependency>
+	  <dependency>
+		  <groupId>org.apache.eagle</groupId>
+		  <artifactId>eagle-jpm-util</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.jsoup</groupId>
+          <artifactId>jsoup</artifactId>
+      </dependency>
 	   <dependency>
 	      <groupId>org.apache.curator</groupId>
 	  	  <artifactId>curator-recipes</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
new file mode 100644
index 0000000..2662698
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/config/RunningJobCrawlConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.config;
+
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.jpm.util.JobIdPartitioner;
+
+import java.io.Serializable;
+
+public class RunningJobCrawlConfig implements Serializable{
+    private static final long serialVersionUID = 1L;
+    public RunningJobEndpointConfig endPointConfig;
+    public ControlConfig controlConfig;
+    public ZKStateConfig zkStateConfig;
+
+    public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
+        this.endPointConfig = endPointConfig;
+        this.controlConfig = controlConfig;
+        this.zkStateConfig = zkStateConfig;
+    }
+
+    public static class RunningJobEndpointConfig implements Serializable{
+        private static final long serialVersionUID = 1L;
+        public String[] RMBasePaths;
+        public String HSBasePath;
+    }
+
+    public static class ControlConfig implements Serializable{
+        private static final long serialVersionUID = 1L;
+        public boolean jobConfigEnabled;
+        public boolean jobInfoEnabled;
+        public int zkCleanupTimeInday;
+        public int completedJobOutofDateTimeInMin;
+        public int sizeOfJobConfigQueue;
+        public int sizeOfJobCompletedInfoQueue;
+        public Class<? extends JobIdPartitioner> partitionerCls;
+        public int numTotalPartitions = 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
new file mode 100644
index 0000000..0db3d47
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobFetchSpout.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.hive.jobrunning;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.jpm.util.*;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourceFetch.model.MRJob;
+import org.apache.eagle.jpm.util.resourceFetch.model.MRJobsWrapper;
+import org.apache.eagle.security.hive.config.RunningJobCrawlConfig;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.jsoup.Jsoup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.util.*;
+
+public class HiveJobFetchSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveJobFetchSpout.class);
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    private RunningJobCrawlConfig crawlConfig;
+    private SpoutOutputCollector collector;
+    private JobIdFilter jobFilter;
+    private RMResourceFetcher rmResourceFetcher;
+    private static final String XML_HTTP_HEADER = "Accept";
+    private static final String XML_FORMAT = "application/xml";
+    private static final int CONNECTION_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    private Long lastFinishAppTime;
+    private RunningJobManager runningJobManager;
+    private int partitionId;
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public HiveJobFetchSpout(RunningJobCrawlConfig crawlConfig) {
+        this.crawlConfig = crawlConfig;
+    }
+
+    private int calculatePartitionId(TopologyContext context) {
+        int thisGlobalTaskId = context.getThisTaskId();
+        String componentName = context.getComponentId(thisGlobalTaskId);
+        List<Integer> globalTaskIds = context.getComponentTasks(componentName);
+        int index = 0;
+        for (Integer id : globalTaskIds) {
+            if (id == thisGlobalTaskId) {
+                return index;
+            }
+            index++;
+        }
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.rmResourceFetcher = new RMResourceFetcher(crawlConfig.endPointConfig.RMBasePaths);
+        this.partitionId = calculatePartitionId(context);
+        // sanity verify 0<=partitionId<=numTotalPartitions-1
+        if (partitionId < 0 || partitionId > crawlConfig.controlConfig.numTotalPartitions) {
+            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
+                    partitionId + " and numTotalPartitions " + crawlConfig.controlConfig.numTotalPartitions);
+        }
+        Class<? extends JobIdPartitioner> partitionerCls = crawlConfig.controlConfig.partitionerCls;
+        try {
+            this.jobFilter = new JobIdFilterByPartition(partitionerCls.newInstance(),
+                    crawlConfig.controlConfig.numTotalPartitions, partitionId);
+        } catch (Exception e) {
+            LOG.error("failing instantiating job partitioner class " + partitionerCls.getCanonicalName());
+            throw new IllegalStateException(e);
+        }
+        this.collector = collector;
+        this.runningJobManager = new RunningJobManager(crawlConfig.zkStateConfig.zkQuorum,
+                crawlConfig.zkStateConfig.zkSessionTimeoutMs,
+                crawlConfig.zkStateConfig.zkRetryTimes,
+                crawlConfig.zkStateConfig.zkRetryInterval,
+                crawlConfig.zkStateConfig.zkRoot);
+        this.lastFinishAppTime = this.runningJobManager.recoverLastFinishedTime(partitionId);
+        if (this.lastFinishAppTime == 0l) {
+            this.lastFinishAppTime = Calendar.getInstance().getTimeInMillis() - 24 * 60 * 60000l;//one day ago
+            this.runningJobManager.updateLastFinishTime(partitionId, this.lastFinishAppTime);
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        LOG.info("start to fetch job list");
+        try {
+            List<AppInfo> apps = rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
+            handleApps(apps, true);
+
+            long fetchTime = Calendar.getInstance().getTimeInMillis();
+            if (fetchTime - this.lastFinishAppTime > 60000l) {
+                apps = rmResourceFetcher.getResource(Constants.ResourceType.COMPLETE_MR_JOB, Long.toString(this.lastFinishAppTime));
+                handleApps(apps, false);
+                this.lastFinishAppTime = fetchTime;
+                this.runningJobManager.updateLastFinishTime(partitionId, fetchTime);
+            }
+        } catch (Exception e) {
+            LOG.warn("exception found {}", e);
+        } finally {
+            //need to be configured
+            Utils.sleep(60);
+        }
+    }
+
+    private void handleApps(List<AppInfo> apps, boolean isRunning) {
+        List<MRJob> mrJobs = new ArrayList<>();
+        //fetch job config
+        if (isRunning) {
+            for (AppInfo appInfo : apps) {
+                if (!jobFilter.accept(appInfo.getId())) {
+                    continue;
+                }
+
+                String jobURL = appInfo.getTrackingUrl() + Constants.MR_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+                InputStream is = null;
+                try {
+                    is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
+                    LOG.info("fetch mr job from {}", jobURL);
+                    mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob();
+                } catch (Exception e) {
+                    LOG.warn("fetch mr job from {} failed, {}", jobURL, e);
+                    continue;
+                } finally {
+                    Utils.closeInputStream(is);
+                }
+
+                if (fetchRunningConfig(appInfo, mrJobs)) {
+                    continue;
+                }
+            }
+        }
+
+        if (!isRunning) {
+            for (AppInfo appInfo : apps) {
+                if (!jobFilter.accept(appInfo.getId())) {
+                    continue;
+                }
+                MRJob mrJob = new MRJob();
+                mrJob.setId(appInfo.getId().replace("application_", "job_"));
+                mrJobs.add(mrJob);
+                fetchFinishedConfig(appInfo, mrJobs);
+            }
+        }
+    }
+
+    private boolean fetchRunningConfig(AppInfo appInfo, List<MRJob> mrJobs) {
+        InputStream is = null;
+        for (MRJob mrJob : mrJobs) {
+            String confURL = appInfo.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + mrJob.getId() + "/" + Constants.MR_CONF_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+            try {
+                LOG.info("fetch job conf from {}", confURL);
+                final URLConnection connection = URLConnectionUtils.getConnection(confURL);
+                connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
+                connection.setConnectTimeout(CONNECTION_TIMEOUT);
+                connection.setReadTimeout(READ_TIMEOUT);
+                is = connection.getInputStream();
+                Map<String, String> hiveQueryLog = new HashMap<>();
+                DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+                DocumentBuilder db = dbf.newDocumentBuilder();
+                Document dt = db.parse(is);
+                Element element = dt.getDocumentElement();
+                NodeList propertyList = element.getElementsByTagName("property");
+                int length = propertyList.getLength();
+                for (int i = 0; i < length; i++) {
+                    Node property = propertyList.item(i);
+                    String key = property.getChildNodes().item(0).getTextContent();
+                    String value = property.getChildNodes().item(1).getTextContent();
+                    hiveQueryLog.put(key, value);
+                }
+
+                if (hiveQueryLog.containsKey(Constants.HIVE_QUERY_STRING)) {
+                    collector.emit(new ValuesArray(appInfo.getUser(), mrJob.getId(), Constants.ResourceType.JOB_CONFIGURATION, hiveQueryLog), mrJob.getId());
+                }
+            } catch (Exception e) {
+                LOG.warn("fetch job conf from {} failed, {}", confURL, e);
+                e.printStackTrace();
+                return false;
+            } finally {
+                Utils.closeInputStream(is);
+            }
+        }
+        return true;
+    }
+
+    private boolean fetchFinishedConfig(AppInfo appInfo, List<MRJob> mrJobs) {
+        InputStream is = null;
+        for (MRJob mrJob : mrJobs) {
+            String urlString = crawlConfig.endPointConfig.HSBasePath + "jobhistory/conf/" + mrJob.getId() + "?" + Constants.ANONYMOUS_PARAMETER;
+            try {
+                LOG.info("fetch job conf from {}", urlString);
+                is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.NONE);
+                final org.jsoup.nodes.Document doc = Jsoup.parse(is, "UTF-8", urlString);
+                org.jsoup.select.Elements elements = doc.select("table[id=conf]").select("tbody").select("tr");
+                Map<String, String> hiveQueryLog = new HashMap<>();
+                Iterator<org.jsoup.nodes.Element> iter = elements.iterator();
+                while (iter.hasNext()) {
+                    org.jsoup.nodes.Element element = iter.next();
+                    org.jsoup.select.Elements tds = element.children();
+                    String key = tds.get(0).text();
+                    String value = tds.get(1).text();
+                    hiveQueryLog.put(key, value);
+                }
+                if (hiveQueryLog.containsKey(Constants.HIVE_QUERY_STRING)) {
+                    collector.emit(new ValuesArray(appInfo.getUser(), mrJob.getId(), Constants.ResourceType.JOB_CONFIGURATION, hiveQueryLog), mrJob.getId());
+                }
+            } catch (Exception e) {
+                LOG.warn("fetch job conf from {} failed, {}", urlString, e);
+                e.printStackTrace();
+                return false;
+            } finally {
+                Utils.closeInputStream(is);
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "jobId", "type", "config"));
+    }
+
+    @Override
+    public void ack(Object msgId) {
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        //process fail over later
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index f60d463..71f5949 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -17,13 +17,11 @@
 package org.apache.eagle.security.hive.jobrunning;
 
 import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.job.DefaultJobPartitionerImpl;
-import org.apache.eagle.job.JobPartitioner;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
+import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.security.hive.config.RunningJobCrawlConfig;
+import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.ControlConfig;
+import org.apache.eagle.security.hive.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
-import org.apache.eagle.jobrunning.storm.JobRunningSpout;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,15 +65,15 @@ public class HiveJobRunningSourcedStormSpoutProvider {
 		RunningJobCrawlConfig crawlConfig = new RunningJobCrawlConfig(endPointConfig, controlConfig, zkStateConfig);
 
 		try{
-			controlConfig.partitionerCls = (Class<? extends JobPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+			controlConfig.partitionerCls = (Class<? extends DefaultJobIdPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls"));
 		}
 		catch(Exception ex){
 			LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
 			//throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
-            controlConfig.partitionerCls = DefaultJobPartitionerImpl.class;
+            controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
         }
 
-		JobRunningSpout spout = new JobRunningSpout(crawlConfig);
+		HiveJobFetchSpout spout = new HiveJobFetchSpout(crawlConfig);
 		return spout;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryMonitoringApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryMonitoringApplication.java
index f19c9a9..5938314 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryMonitoringApplication.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryMonitoringApplication.java
@@ -56,7 +56,7 @@ public class HiveQueryMonitoringApplication extends StormApplication {
         builder.setSpout("ingest", spout, numOfSpoutTasks);
         JobFilterBolt bolt = new JobFilterBolt();
         BoltDeclarer boltDeclarer = builder.setBolt("filterBolt", bolt, numOfFilterTasks);
-        boltDeclarer.fieldsGrouping("ingest", new Fields("f1"));
+        boltDeclarer.fieldsGrouping("ingest", new Fields("jobId"));
 
         HiveQueryParserBolt parserBolt = new HiveQueryParserBolt();
         BoltDeclarer parserBoltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
index 3f0b95b..3a7c24f 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
@@ -22,10 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
-import org.apache.eagle.jobrunning.common.JobConstants;
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-import org.apache.eagle.jobrunning.storm.JobRunningContentFilter;
-import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl;
+import org.apache.eagle.jpm.util.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,12 +54,12 @@ public class JobFilterBolt extends BaseRichBolt {
     public void execute(Tuple input) {
 		String user = input.getString(0);
         String jobId = input.getString(1);
-        ResourceType type = (ResourceType)input.getValue(2);
-        if (type.equals(ResourceType.JOB_CONFIGURATION)) {
+        Constants.ResourceType type = (Constants.ResourceType)input.getValue(2);
+        if (type.equals(Constants.ResourceType.JOB_CONFIGURATION)) {
             Map<String, String> configs = (Map<String, String>)input.getValue(3);
             if (filter.acceptJobConf(configs)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(JobConstants.HIVE_QUERY_STRING));
+                    LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(Constants.HIVE_QUERY_STRING));
                 } else {
                     LOG.info("Got a hive job, jobID: " + jobId);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilter.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilter.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilter.java
new file mode 100644
index 0000000..90592b5
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public interface JobRunningContentFilter extends Serializable {
+	boolean acceptJobConf(Map<String, String> config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilterImpl.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilterImpl.java
new file mode 100644
index 0000000..99160f9
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobRunningContentFilterImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import java.util.Map;
+
+import org.apache.eagle.jpm.util.Constants;
+
+/**
+ * define what content in job running stream should be streamed
+ */
+public class JobRunningContentFilterImpl implements JobRunningContentFilter {
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public boolean acceptJobConf(Map<String, String> config) {
+		if (config.containsKey(Constants.HIVE_QUERY_STRING)) {
+			return true;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-security/eagle-security-hive/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/resources/application.conf b/eagle-security/eagle-security-hive/src/main/resources/application.conf
index f21b4a0..116242c 100644
--- a/eagle-security/eagle-security-hive/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hive/src/main/resources/application.conf
@@ -30,7 +30,7 @@
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 2000,
-    "RMEndPoints" : "http://server.eagle.apache.org:8088/",
+    "RMEndPoints" : "http://server.eagle.apache.org:8088",
     "HSEndPoint" : "http://server.eagle.apache.org:19888/",
     "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl",
   },



Mime
View raw message