eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [3/3] incubator-eagle git commit: EAGLE-465 Hive security monitoring refactor Hive security monitoring refactor with jpm
Date Mon, 15 Aug 2016 18:18:58 GMT
EAGLE-465 Hive security monitoring refactor
Hive security monitoring refactor with jpm

https://issues.apache.org/jira/browse/EAGLE-465

Author: @wujinhu <jinhuwu@ebay.com>
Reviewer: @yonzhang <yonzhang2012@apache.org>
Closes: #349


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/18ae3bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/18ae3bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/18ae3bbb

Branch: refs/heads/develop
Commit: 18ae3bbbbdbfbfa6b2462670d7c55c7322d81fc1
Parents: ecabbe4
Author: yonzhang <yonzhang2012@gmail.com>
Authored: Mon Aug 15 11:17:57 2016 -0700
Committer: yonzhang <yonzhang2012@gmail.com>
Committed: Mon Aug 15 11:17:57 2016 -0700

----------------------------------------------------------------------
 .../eagle-data-process/eagle-job-common/pom.xml |  31 --
 .../eagle/job/DefaultJobPartitionerImpl.java    |  26 --
 .../java/org/apache/eagle/job/JobFilter.java    |  21 --
 .../apache/eagle/job/JobFilterByPartition.java  |  37 --
 .../org/apache/eagle/job/JobPartitioner.java    |  21 --
 .../eagle-storm-jobrunning-spout/pom.xml        |  65 ----
 .../DefaultRunningJobInputStreamCallback.java   |  58 ---
 .../jobrunning/callback/RunningJobCallback.java |  37 --
 .../callback/RunningJobMessageId.java           |  56 ---
 .../eagle/jobrunning/common/JobConstants.java   |  53 ---
 .../config/RunningJobCrawlConfig.java           |  53 ---
 .../jobrunning/counter/CounterGroupKey.java     |  30 --
 .../eagle/jobrunning/counter/CounterKey.java    |  28 --
 .../eagle/jobrunning/counter/JobCounters.java   |  38 --
 .../counter/parser/JobCountersParser.java       |  26 --
 .../counter/parser/JobCountersParserImpl.java   |  44 ---
 .../jobrunning/crawler/ConfigWorkTask.java      |  66 ----
 .../eagle/jobrunning/crawler/JobContext.java    |  55 ---
 .../jobrunning/crawler/RMResourceFetcher.java   | 272 --------------
 .../jobrunning/crawler/ResourceFetcher.java     |  27 --
 .../jobrunning/crawler/RunningJobCrawler.java   |  28 --
 .../crawler/RunningJobCrawlerImpl.java          | 356 -------------------
 .../eagle/jobrunning/crawler/XmlHelper.java     |  55 ---
 .../eagle/jobrunning/ha/HAURLSelector.java      |  31 --
 .../eagle/jobrunning/ha/HAURLSelectorImpl.java  | 100 ------
 .../jobrunning/job/conf/JobConfParser.java      |  26 --
 .../jobrunning/job/conf/JobConfParserImpl.java  |  42 ---
 .../storm/JobRunningContentFilter.java          |  27 --
 .../storm/JobRunningContentFilterImpl.java      |  36 --
 .../eagle/jobrunning/storm/JobRunningSpout.java | 198 -----------
 .../JobRunningSpoutCollectorInterceptor.java    |  47 ---
 ...JobCompleteCounterServiceURLBuilderImpl.java |  29 --
 .../JobCompleteDetailServiceURLBuilderImpl.java |  31 --
 ...JobCompletedConfigServiceURLBuilderImpl.java |  30 --
 .../url/JobCountersServiceURLBuilderImpl.java   |  33 --
 .../url/JobDetailServiceURLBuilderImpl.java     |  28 --
 .../url/JobListServiceURLBuilderImpl.java       |  37 --
 .../JobRunningConfigServiceURLBuilderImpl.java  |  33 --
 .../url/JobStatusServiceURLBuilderImpl.java     |  29 --
 .../eagle/jobrunning/url/ServiceURLBuilder.java |  21 --
 .../eagle/jobrunning/util/InputStreamUtils.java |  66 ----
 .../apache/eagle/jobrunning/util/JobUtils.java  |  43 ---
 .../jobrunning/util/URLConnectionUtils.java     | 108 ------
 .../apache/eagle/jobrunning/yarn/model/App.java | 145 --------
 .../eagle/jobrunning/yarn/model/AppInfo.java    | 146 --------
 .../eagle/jobrunning/yarn/model/AppWrapper.java |  35 --
 .../jobrunning/yarn/model/Applications.java     |  38 --
 .../jobrunning/yarn/model/AppsWrapper.java      |  36 --
 .../eagle/jobrunning/yarn/model/Counter.java    |  51 ---
 .../jobrunning/yarn/model/CounterGroup.java     |  39 --
 .../yarn/model/JobCompleteWrapper.java          |  34 --
 .../jobrunning/yarn/model/JobCounters.java      |  39 --
 .../yarn/model/JobCountersWrapper.java          |  32 --
 .../jobrunning/yarn/model/JobDetailInfo.java    | 243 -------------
 .../eagle/jobrunning/yarn/model/Jobs.java       |  38 --
 .../jobrunning/yarn/model/JobsWrapper.java      |  37 --
 .../jobrunning/zkres/JobRunningZKStateLCM.java  |  35 --
 .../zkres/JobRunningZKStateManager.java         | 210 -----------
 .../src/main/resources/log4j.properties         |  34 --
 .../job/conf/TestJobConfParserImpl.java         |  43 ---
 .../src/test/resources/hive-jobrunning.conf     |  54 ---
 .../src/test/resources/jobconf.html             | 230 ------------
 .../src/test/resources/log4j.properties         |  34 --
 eagle-core/eagle-data-process/pom.xml           |   2 -
 .../jpm/mr/history/common/JHFConfigManager.java |   6 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |   2 +-
 .../history/storm/DefaultJobIdPartitioner.java  |  28 --
 .../jpm/mr/history/storm/JobHistorySpout.java   |   5 +-
 .../eagle/jpm/mr/history/storm/JobIdFilter.java |  23 --
 .../history/storm/JobIdFilterByPartition.java   |  40 ---
 .../jpm/mr/history/storm/JobIdPartitioner.java  |  23 --
 .../src/main/resources/application.conf         |   2 +-
 eagle-jpm/eagle-jpm-mr-running/pom.xml          |   5 -
 eagle-jpm/eagle-jpm-spark-history/pom.xml       |  11 -
 eagle-jpm/eagle-jpm-spark-running/pom.xml       |   5 -
 .../org/apache/eagle/jpm/util/Constants.java    |   5 +-
 .../eagle/jpm/util/DefaultJobIdPartitioner.java |  28 ++
 .../org/apache/eagle/jpm/util/JobIdFilter.java  |  23 ++
 .../eagle/jpm/util/JobIdFilterByPartition.java  |  40 +++
 .../apache/eagle/jpm/util/JobIdPartitioner.java |  23 ++
 .../jpm/util/jobrecover/RunningJobManager.java  |  27 ++
 .../util/resourceFetch/RMResourceFetcher.java   |  50 ++-
 eagle-security/eagle-security-hive/pom.xml      |  14 +-
 .../hive/config/RunningJobCrawlConfig.java      |  53 +++
 .../hive/jobrunning/HiveJobFetchSpout.java      | 265 ++++++++++++++
 ...HiveJobRunningSourcedStormSpoutProvider.java |  16 +-
 .../HiveQueryMonitoringApplication.java         |   2 +-
 .../security/hive/jobrunning/JobFilterBolt.java |  11 +-
 .../jobrunning/JobRunningContentFilter.java     |  27 ++
 .../jobrunning/JobRunningContentFilterImpl.java |  36 ++
 .../src/main/resources/application.conf         |   2 +-
 91 files changed, 577 insertions(+), 4228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-job-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-job-common/pom.xml b/eagle-core/eagle-data-process/eagle-job-common/pom.xml
deleted file mode 100644
index c3862d5..0000000
--- a/eagle-core/eagle-data-process/eagle-job-common/pom.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.eagle</groupId>
-    <artifactId>eagle-data-process-parent</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <artifactId>eagle-job-common</artifactId>
-  <name>eagle-job-common</name>
-  <url>http://maven.apache.org</url>
-  <dependencies>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/DefaultJobPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/DefaultJobPartitionerImpl.java b/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/DefaultJobPartitionerImpl.java
deleted file mode 100644
index 7c4953f..0000000
--- a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/DefaultJobPartitionerImpl.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.job;
-
-public class DefaultJobPartitionerImpl implements JobPartitioner {
-	@Override
-	public int partition(int numTotalParts, String key) {
-		int hash = key.hashCode();
-		hash = Math.abs(hash);
-		return hash % numTotalParts;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilter.java b/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilter.java
deleted file mode 100644
index 4a072f0..0000000
--- a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilter.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.job;
-
-public interface JobFilter {
-	public boolean accept(String key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilterByPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilterByPartition.java b/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilterByPartition.java
deleted file mode 100644
index 8373766..0000000
--- a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobFilterByPartition.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.job;
-
-public class JobFilterByPartition implements JobFilter {
-	private JobPartitioner partitioner;
-	private int numTotalPartitions;
-	private int partitionId;
-	
-	public JobFilterByPartition(JobPartitioner partitioner, int numTotalPartitions, int partitionId){
-		this.partitioner = partitioner;
-		this.numTotalPartitions = numTotalPartitions;
-		this.partitionId = partitionId;
-	}
-
-	@Override
-	public boolean accept(String key) {
-		int part = partitioner.partition(numTotalPartitions, key);
-		if(part == partitionId)
-			return true;
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobPartitioner.java b/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobPartitioner.java
deleted file mode 100644
index 4f4d74a..0000000
--- a/eagle-core/eagle-data-process/eagle-job-common/src/main/java/org/apache/eagle/job/JobPartitioner.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.job;
-
-public interface JobPartitioner {
-	int partition(int numTotalParts, String key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/pom.xml b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/pom.xml
deleted file mode 100644
index 21bb25f..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.eagle</groupId>
-    <artifactId>eagle-data-process-parent</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <artifactId>eagle-storm-jobrunning-spout</artifactId>
-  <name>eagle-storm-jobrunning-spout</name>
-  <url>http://maven.apache.org</url>
-  <dependencies>
-  	<dependency>
-  		<groupId>org.slf4j</groupId>
-  		<artifactId>slf4j-api</artifactId>
-  	</dependency>
-  	<dependency>
-  		<groupId>org.apache.eagle</groupId>
-  		<artifactId>eagle-stream-process-api</artifactId>
-        <version>${project.version}</version>
-  	</dependency>
-      <dependency>
-          <groupId>org.apache.eagle</groupId>
-          <artifactId>eagle-stream-process-base</artifactId>
-          <version>${project.version}</version>
-      </dependency>
-  	<dependency>
-  		<groupId>org.apache.eagle</groupId>
-  		<artifactId>eagle-job-common</artifactId>
-  		<version>${project.version}</version>
-  	</dependency>  	  	
-  	<dependency>
-		<groupId>org.jsoup</groupId>
-		<artifactId>jsoup</artifactId>
-	</dependency>
-  	<dependency>
-  		<groupId>org.apache.storm</groupId>
-  		<artifactId>storm-core</artifactId>
-  		<exclusions>
-      		<exclusion>
-      			<groupId>ch.qos.logback</groupId>
-        		<artifactId>logback-classic</artifactId>
-      		</exclusion>
-      	</exclusions> 
-  	</dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
deleted file mode 100644
index fb77bc7..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/DefaultRunningJobInputStreamCallback.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.callback;
-
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.dataproc.core.EagleOutputCollector;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-import org.apache.eagle.jobrunning.crawler.JobContext;
-
-public class DefaultRunningJobInputStreamCallback implements RunningJobCallback{
-	
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(DefaultRunningJobInputStreamCallback.class);
-	
-	private EagleOutputCollector eagleCollector;
-	
-	public DefaultRunningJobInputStreamCallback(EagleOutputCollector eagleCollector){
-		this.eagleCollector = eagleCollector;
-	}
-
-	@Override
-	public void onJobRunningInformation(JobContext context, ResourceType type, List<Object> objects) {
-		String jobId = context.jobId;
-		LOG.info(jobId + " information fetched , type: " + type);
-		if (type.equals(ResourceType.JOB_CONFIGURATION)) {
-			@SuppressWarnings("unchecked")
-			Map<String, String> config = (Map<String, String>) objects.get(0);
-			// the fist field is fixed as messageId
-			RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, context.fetchedTime);
-			eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, config));
-		}
-		else if (type.equals(ResourceType.JOB_RUNNING_INFO) || type.equals(ResourceType.JOB_COMPLETE_INFO)) {
-			// Here timestamp is meaningless, set to null
-			RunningJobMessageId messageId = new RunningJobMessageId(jobId, type, null);
-			eagleCollector.collect(new ValuesArray(messageId, context.user, jobId, type, objects));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
deleted file mode 100644
index f36e8a5..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobCallback.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.callback;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.eagle.jobrunning.crawler.JobContext;
-import org.apache.eagle.jobrunning.common.JobConstants;
-
-/**
- * callback when running job info is ready
- */
-public interface RunningJobCallback extends Serializable{
-		
-	/**
-	 * this is called when running job resource is ready
-	 * @param jobContext
-	 * @param type
-	 * @param objects
-	 */
-	void onJobRunningInformation(JobContext jobContext, JobConstants.ResourceType type, List<Object> objects);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
deleted file mode 100644
index 08b58f2..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/callback/RunningJobMessageId.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.callback;
-
-import com.google.common.base.Objects;
-import org.apache.eagle.jobrunning.common.JobConstants;
-
-public class RunningJobMessageId {
-	public String jobID;
-	public JobConstants.ResourceType type;
-	// If type = JOB_RUNNING_INFO, timestamp = fetchedTime, otherwise timestamp is meaningless, set to null
-	public Long timestamp;
-	
-	public RunningJobMessageId(String jobID, JobConstants.ResourceType type, Long timestamp) {
-		this.jobID = jobID;
-		this.type = type;
-		this.timestamp = timestamp;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final RunningJobMessageId other = (RunningJobMessageId) obj;
-        return Objects.equal(this.jobID, other.jobID) 
-        	  && Objects.equal(this.type, other.type)
-        	  && Objects.equal(this.timestamp, other.timestamp);
-	}
-	
-	@Override
-	public int hashCode() {
-		return Objects.hashCode(jobID.hashCode(), type.hashCode(), timestamp.hashCode());
-	}
-	
-	@Override
-	public String toString() {
-		return "jobID=" + jobID 
-			 + ", type=" + type.name() 
-			 + ", timestamp= " + timestamp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
deleted file mode 100644
index 6a1d7c8..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/common/JobConstants.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.common;
-
-public class JobConstants {
-	public static final String APPLICATION_PREFIX = "application";
-	public static final String JOB_PREFIX = "job";
-	public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
-	public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";	
-	public static final String V2_APPS_URL = "ws/v1/cluster/apps";
-	public static final String ANONYMOUS_PARAMETER = "anonymous=true";
-	
-	public static final String V2_PROXY_PREFIX_URL = "proxy/";
-	public static final String V2_APP_DETAIL_URL = "/ws/v1/mapreduce/jobs";
-	public static final String V2_MR_APPMASTER_PREFIX = "/ws/v1/mapreduce/jobs/";
-	public static final String V2_CONF_URL = "/conf";
-	public static final String V2_COMPLETE_APPS_URL = "ws/v1/cluster/apps/";
-	public static final String V2_MR_COUNTERS_URL = "/counters";
-	
-
-	public static final String HIVE_QUERY_STRING = "hive.query.string";
-	public static final String JOB_STATE_RUNNING = "RUNNING";
-	
-	public enum YarnApplicationType {
-		MAPREDUCE, UNKNOWN
-	}
-	
-	public enum CompressionType {
-		GZIP, NONE
-	}
-	
-	public enum ResourceType {
-		JOB_RUNNING_INFO, JOB_COMPLETE_INFO, JOB_CONFIGURATION, JOB_LIST
-	}
-	
-	public enum JobState {
-		RUNNING, COMPLETED, ALL
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
deleted file mode 100644
index 79a8928..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.config;
-
-import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
-import org.apache.eagle.job.JobPartitioner;
-
-import java.io.Serializable;
-
-public class RunningJobCrawlConfig implements Serializable{
-	private static final long serialVersionUID = 1L;
-	public RunningJobEndpointConfig endPointConfig;
-	public ControlConfig controlConfig;
-	public ZKStateConfig zkStateConfig;
-
-	public RunningJobCrawlConfig(RunningJobEndpointConfig endPointConfig, ControlConfig controlConfig, ZKStateConfig zkStateConfig){
-		this.endPointConfig = endPointConfig;
-		this.controlConfig = controlConfig;
-		this.zkStateConfig = zkStateConfig;
-	}	
-    
-    public static class RunningJobEndpointConfig implements Serializable{
-		private static final long serialVersionUID = 1L;
-		public String[] RMBasePaths;
-		public String HSBasePath;
-    }
-    
-    public static class ControlConfig implements Serializable{
-		private static final long serialVersionUID = 1L;
-    	public boolean jobConfigEnabled; 
-    	public boolean jobInfoEnabled;
-    	public int zkCleanupTimeInday;
-    	public int completedJobOutofDateTimeInMin;
-    	public int sizeOfJobConfigQueue;
-    	public int sizeOfJobCompletedInfoQueue;
-        public Class<? extends JobPartitioner> partitionerCls;
-        public int numTotalPartitions = 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
deleted file mode 100644
index 146fbe8..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterGroupKey.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.counter;
-
-import java.util.List;
-
-public interface CounterGroupKey {
-
-	String getName();
-	String getDescription();
-	int getIndex();
-	int getCounterNumber();
-	List<CounterKey> listCounterKeys();
-	CounterKey getCounterKeyByName(String name);
-	CounterKey getCounterKeyByID(int index);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
deleted file mode 100644
index a39d7b4..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/CounterKey.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.counter;
-
-import java.util.List;
-
-public interface CounterKey {
-
-	List<String> getNames();
-	String getDescription();
-	int getIndex();
-	CounterGroupKey getGroupKey();
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
deleted file mode 100644
index ba44ac6..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/JobCounters.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.counter;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-
-public final class JobCounters {
-	
-	private Map<String, Map<String, Long>> counters = new TreeMap<String, Map<String, Long>>();
-
-	public Map<String, Map<String, Long>> getCounters() {
-		return counters;
-	}
-
-	public void setCounters(Map<String, Map<String, Long>> counters) {
-		this.counters = counters;
-	}
-	
-	public String toString(){
-		return counters.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
deleted file mode 100644
index d0f3ae9..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParser.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.counter.parser;
-
-import java.util.Map;
-
-import org.jsoup.nodes.Document;
-
-public interface JobCountersParser {
-	
-	Map<String, Long> parse(Document doc);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
deleted file mode 100644
index f098c54..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/counter/parser/JobCountersParserImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.counter.parser;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.jsoup.nodes.Document;
-import org.jsoup.nodes.Element;
-import org.jsoup.select.Elements;
-
-public class JobCountersParserImpl implements JobCountersParser {
-
-	@Override
-	public Map<String, Long> parse(Document doc) {
-		Elements elements = doc.select("a[href*=singlejobcounter]");
-		Iterator<Element> iter = elements.iterator();
-		Map<String, Long> counters = new HashMap<String, Long>();
-		while(iter.hasNext()) {
-			Element element = iter.next().parent();
-			String metricName = element.text();
-			long metricValue = Long.parseLong(element.nextElementSibling()
-								   .nextElementSibling().nextElementSibling().text()
-								    .replace(",", "").trim());
-			counters.put(metricName, metricValue);
-		}
-		return counters;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
deleted file mode 100644
index d9cf79c..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ConfigWorkTask.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import java.util.List;
-
-import org.apache.eagle.jobrunning.util.JobUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.jobrunning.callback.RunningJobCallback;
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-
-public class ConfigWorkTask implements Runnable {
-	
-	public JobContext context;
-	public ResourceFetcher fetcher;
-	public RunningJobCallback callback;
-	public RunningJobCrawler crawler;	
-	
-	private static final Logger LOG = LoggerFactory.getLogger(ConfigWorkTask.class);
-
-	public ConfigWorkTask(JobContext context, ResourceFetcher fetcher, RunningJobCallback callback, RunningJobCrawler crawler) {
-		this.context = context;
-		this.fetcher = fetcher;
-		this.callback = callback;
-		this.crawler = crawler;
-	}
-	
-	public void run() {
-		runConfigCrawlerWorkhread(context);
-	}
-
-	private void runConfigCrawlerWorkhread(JobContext context) {
-		LOG.info("Going to fetch job configuration information, jobId:" + context.jobId);
-		try {
-			List<Object> objs = fetcher.getResource(ResourceType.JOB_CONFIGURATION, JobUtils.getAppIDByJobID(context.jobId));
-			callback.onJobRunningInformation(context, ResourceType.JOB_CONFIGURATION, objs);
-		}
-		catch (Exception ex) {
-	        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
-	        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
-	        	// the job remains in processing list, thus we will not do infructuous retry next round
-	        	// TODO need remove it from processing list when job finished to avoid memory leak       
-	        }
-	        else {
-	        	LOG.warn("Got an exception when fetching job config: ", ex);
-	        	crawler.removeFromProcessingList(ResourceType.JOB_CONFIGURATION, context);
-	        }
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
deleted file mode 100644
index 7599457..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/JobContext.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-public class JobContext {
-	public String jobId;
-	public String user;
-	public Long fetchedTime;
-	
-	public JobContext() {
-		
-	}
-	
-	public JobContext(JobContext context) {
-		this.jobId = new String(context.jobId);
-		this.user = new String(context.user);
-		this.fetchedTime = new Long(context.fetchedTime);
-	}
-	
-	public JobContext(String jobId, String user, Long fetchedTime) {
-		this.jobId = jobId;
-		this.user = user;
-		this.fetchedTime = fetchedTime;
-	}
-	
-	@Override
-	public int hashCode() {	
-		return jobId.hashCode() ;
-	}			
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof JobContext) {
-			JobContext context = (JobContext)obj;
-			if (this.jobId.equals(context.jobId)) {
-				return true;
-			}
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
deleted file mode 100644
index 7c41431..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RMResourceFetcher.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URLConnection;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.ZipException;
-
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
-import org.apache.eagle.jobrunning.counter.parser.JobCountersParser;
-import org.apache.eagle.jobrunning.counter.parser.JobCountersParserImpl;
-import org.apache.eagle.jobrunning.ha.HAURLSelector;
-import org.apache.eagle.jobrunning.ha.HAURLSelectorImpl;
-import org.apache.eagle.jobrunning.job.conf.JobConfParser;
-import org.apache.eagle.jobrunning.job.conf.JobConfParserImpl;
-import org.apache.eagle.jobrunning.util.InputStreamUtils;
-import org.apache.eagle.jobrunning.util.JobUtils;
-import org.apache.eagle.jobrunning.util.URLConnectionUtils;
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.jobrunning.common.JobConstants;
-import org.apache.eagle.jobrunning.yarn.model.AppWrapper;
-import org.apache.eagle.jobrunning.yarn.model.JobCompleteWrapper;
-import org.apache.eagle.jobrunning.yarn.model.JobCountersWrapper;
-import org.apache.eagle.jobrunning.yarn.model.JobsWrapper;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.jsoup.Jsoup;
-import org.jsoup.nodes.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.jobrunning.url.JobCompleteCounterServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobCompleteDetailServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobCompletedConfigServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobCountersServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobDetailServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobListServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobRunningConfigServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.JobStatusServiceURLBuilderImpl;
-import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
-import org.apache.eagle.jobrunning.yarn.model.AppInfo;
-import org.apache.eagle.jobrunning.yarn.model.AppsWrapper;
-import org.apache.eagle.jobrunning.yarn.model.JobDetailInfo;
-
-public class RMResourceFetcher implements ResourceFetcher{
-	
-	private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
-	private final HAURLSelector selector;
-	private final String historyBaseUrl;
-	private final ServiceURLBuilder jobListServiceURLBuilder;
-	private final ServiceURLBuilder jobDetailServiceURLBuilder;
-	private final ServiceURLBuilder jobCounterServiceURLBuilder;
-	private final ServiceURLBuilder jobRunningConfigServiceURLBuilder;
-	private final ServiceURLBuilder jobCompleteDetailServiceURLBuilder;
-	private final ServiceURLBuilder jobCompleteCounterServiceURLBuilder;
-	private final ServiceURLBuilder jobCompletedConfigServiceURLBuilder;
-	private final ServiceURLBuilder jobStatusServiceURLBuilder;
-		
-	private static final int CONNECTION_TIMEOUT = 10000;
-	private static final int READ_TIMEOUT = 10000;
-	private static final String XML_HTTP_HEADER = "Accept";
-	private static final String XML_FORMAT = "application/xml";
-	
-	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
-	
-	static {
-		OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
-	}
-	
-	public RMResourceFetcher(RunningJobCrawlConfig.RunningJobEndpointConfig config) {
-		this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
-		this.jobDetailServiceURLBuilder = new JobDetailServiceURLBuilderImpl();
-		this.jobCounterServiceURLBuilder = new JobCountersServiceURLBuilderImpl();
-		this.jobRunningConfigServiceURLBuilder = new JobRunningConfigServiceURLBuilderImpl();
-		this.jobCompleteDetailServiceURLBuilder = new JobCompleteDetailServiceURLBuilderImpl();
-		this.jobCompleteCounterServiceURLBuilder = new JobCompleteCounterServiceURLBuilderImpl();
-		this.jobCompletedConfigServiceURLBuilder = new JobCompletedConfigServiceURLBuilderImpl();
-		this.jobStatusServiceURLBuilder = new JobStatusServiceURLBuilderImpl();
-
-		this.selector = new HAURLSelectorImpl(config.RMBasePaths, jobListServiceURLBuilder, JobConstants.CompressionType.GZIP);
-		this.historyBaseUrl = config.HSBasePath;
-	}
-	
-	private void checkUrl() throws IOException {
-		if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), JobConstants.JobState.RUNNING.name()))) {
-			selector.reSelectUrl();
-		}
-	}
-	
-	private List<Object> doFetchApplicationsList(String state) throws Exception {		
-		List<AppInfo> result = null;
-		InputStream is = null;
-		try {
-			checkUrl();
-			final String urlString = jobListServiceURLBuilder.build(selector.getSelectedUrl(), state);
-			LOG.info("Going to call yarn api to fetch running job list: " + urlString);
-			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
-			final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
-			if (appWrapper != null && appWrapper.getApps() != null
-					&& appWrapper.getApps().getApp() != null) {
-				result = appWrapper.getApps().getApp();
-				return Arrays.asList((Object)result);
-			}
-			return null;
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){} }
-		}
-	}
-	
-	private List<Object> doFetchRunningJobInfo(String appID) throws Exception{
-		InputStream is = null;
-		InputStream is2 = null;
-		try {
-			final String urlString = jobDetailServiceURLBuilder.build(selector.getSelectedUrl(), appID);
-			LOG.info("Going to fetch job detail information for " + appID + " , url: " + urlString);
-			try {
-				is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
-			}
-			catch (ZipException ex) {
-				// Here if job already completed, it will be redirected to job history page and throw java.util.zip.ZipException
-				LOG.info(appID + " has finished, skip this job");
-				return null;
-			}
-			final JobsWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobsWrapper.class);
-			JobDetailInfo jobDetail = null;
-			if (jobWrapper != null && jobWrapper.getJobs() != null && jobWrapper.getJobs().getJob() != null
-				&& jobWrapper.getJobs().getJob().size() > 0) {
-				jobDetail = jobWrapper.getJobs().getJob().get(0);
-			}
-			final String urlString2 = jobCounterServiceURLBuilder.build(selector.getSelectedUrl(), appID);
-			LOG.info("Going to fetch job counters for application " + appID + " , url: " + urlString2);
-			is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.GZIP);
-			final JobCountersWrapper jobCounterWrapper = OBJ_MAPPER.readValue(is2,JobCountersWrapper.class);
-			
-			return Arrays.asList(jobDetail, jobCounterWrapper);
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){} }
-			if (is2 != null) { try {is2.close();} catch (Exception e){} }
-		}
-	}
-	
-	private List<Object> doFetchCompleteJobInfo(String appId) throws Exception{
-		InputStream is = null;
-		InputStream is2 = null;
-		try {
-			checkUrl();
-			String jobID = JobUtils.getJobIDByAppID(appId);
-			String urlString = jobCompleteDetailServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
-			LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
-			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
-			final JobCompleteWrapper jobWrapper = OBJ_MAPPER.readValue(is, JobCompleteWrapper.class);
-			
-			String urlString2 = jobCompleteCounterServiceURLBuilder.build(historyBaseUrl, jobID);
-			LOG.info("Going to fetch job completed counters for " + jobID + " , url: " + urlString2);
-			is2 = InputStreamUtils.getInputStream(urlString2, JobConstants.CompressionType.NONE, (int) (2 * DateUtils.MILLIS_PER_MINUTE));
-			final Document doc = Jsoup.parse(is2, StandardCharsets.UTF_8.name(), urlString2);
-			JobCountersParser parser = new JobCountersParserImpl();
-			Map<String, Long> counters = parser.parse(doc);
-			return Arrays.asList(jobWrapper, counters);
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){}  }
-			if (is2 != null) { try {is2.close();} catch (Exception e){}  }
-		}
-	}
-	
-	private List<Object> doFetchRunningJobConfiguration(String appID) throws Exception {
-		InputStream is = null;
-		try {
-			checkUrl();
-			String jobID = JobUtils.getJobIDByAppID(appID);
-			String urlString = jobRunningConfigServiceURLBuilder.build(selector.getSelectedUrl(), jobID);
-			LOG.info("Going to fetch job completed information for " + jobID + " , url: " + urlString);
-			final URLConnection connection = URLConnectionUtils.getConnection(urlString);
-			connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
-			connection.setConnectTimeout(CONNECTION_TIMEOUT);
-			connection.setReadTimeout(READ_TIMEOUT);
-			is = connection.getInputStream();
-			Map<String, String> configs = XmlHelper.getConfigs(is);
-			return Arrays.asList((Object)configs);
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){}  }
-		}
-	}
-	
-	private List<Object> doFetchCompletedJobConfiguration(String appID) throws Exception {
-		InputStream is = null;
-		try {
-			String urlString = jobCompletedConfigServiceURLBuilder.build(historyBaseUrl, JobUtils.getJobIDByAppID(appID));
-			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.NONE);
-			final Document doc = Jsoup.parse(is, "UTF-8", urlString);
-			JobConfParser parser = new JobConfParserImpl();
-			Map<String, String> configs = parser.parse(doc);
-			return Arrays.asList((Object)configs);
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){}  }
-		}
-	}
-	
-	public boolean checkIfJobIsRunning(String appID) throws Exception{
-		InputStream is = null;
-		try {
-			checkUrl();
-			final String urlString = jobStatusServiceURLBuilder.build(selector.getSelectedUrl(), appID);
-			LOG.info("Going to call yarn api to fetch job status: " + urlString);
-			is = InputStreamUtils.getInputStream(urlString, JobConstants.CompressionType.GZIP);
-			final AppWrapper appWrapper = OBJ_MAPPER.readValue(is, AppWrapper.class);
-			if (appWrapper != null && appWrapper.getApp() != null) {
-				AppInfo result = appWrapper.getApp();
-				if (result.getState().equals(JobConstants.JOB_STATE_RUNNING)) {
-					return true;
-				}
-				return false;
-			}
-			else {
-				LOG.error("The status of " + appID + " is not available");
-				throw new IllegalStateException("The status of " + appID + " is not available");
-			}
-		}
-		finally {
-			if (is != null) { try {is.close();} catch (Exception e){}  }
-		}
-	}
-	
-	public List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception{
-		switch(resoureType) {
-			case JOB_LIST:
-				return doFetchApplicationsList((String)parameter[0]);
-			case JOB_RUNNING_INFO:
-				//parameter[0]= appId
-				return doFetchRunningJobInfo((String)parameter[0]);
-			case JOB_COMPLETE_INFO:
-				//parameter[0]= appId
-				return doFetchCompleteJobInfo((String)parameter[0]);
-			case JOB_CONFIGURATION:
-				//parameter[0]= appId
-				boolean isRunning = checkIfJobIsRunning((String)parameter[0]);
-				if (isRunning)
-					return doFetchRunningJobConfiguration((String)parameter[0]);
-				else
-					return doFetchCompletedJobConfiguration((String)parameter[0]); 
-			default:
-				throw new Exception("Not support ressourceType :" + resoureType);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
deleted file mode 100644
index 5a5150b..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/ResourceFetcher.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import java.util.List;
-
-import org.apache.eagle.jobrunning.common.JobConstants;
-
-public interface ResourceFetcher {
-
-	List<Object> getResource(JobConstants.ResourceType resoureType, Object... parameter) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
deleted file mode 100644
index 103a2ff..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawler.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-
-public interface RunningJobCrawler {
-	
-	public void crawl() throws Exception;
-	
-	public void addIntoProcessingList(ResourceType type, JobContext context);
-	
-	public void removeFromProcessingList(ResourceType type, JobContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
deleted file mode 100644
index 4bfa614..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.job.JobFilter;
-import org.apache.eagle.jobrunning.callback.RunningJobCallback;
-import org.apache.eagle.jobrunning.common.JobConstants.JobState;
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-import org.apache.eagle.jobrunning.common.JobConstants.YarnApplicationType;
-import org.apache.eagle.jobrunning.util.JobUtils;
-import org.apache.eagle.jobrunning.yarn.model.AppInfo;
-import org.apache.eagle.common.DateTimeUtil;
-
-public class RunningJobCrawlerImpl implements RunningJobCrawler{
-
-	protected RunningJobCrawlConfig.RunningJobEndpointConfig endpointConfig;
-	protected RunningJobCrawlConfig.ControlConfig controlConfig;
-	protected JobFilter jobFilter;
-	private ResourceFetcher fetcher;
-	private JobRunningZKStateManager zkStateManager;
-	private Thread jobConfigProcessThread;
-	private Thread jobCompleteInfoProcessThread;
-	private Thread jobCompleteStatusCheckerThread;
-	private Thread zkCleanupThread;
-	private final RunningJobCallback callback;
-	private ReadWriteLock readWriteLock;
-	private Map<ResourceType, Map<String, JobContext>> processingJobMap = new ConcurrentHashMap<ResourceType, Map<String, JobContext>>();
-	
-	private BlockingQueue<JobContext> queueOfConfig;
-	private BlockingQueue<JobContext> queueOfCompleteJobInfo;
-	private static final int DEFAULT_CONFIG_THREAD_COUNT = 20;
-	private final long DELAY_TO_UPDATE_COMPLETION_JOB_INFO = 5 * DateUtils.MILLIS_PER_MINUTE;
-	private static final Logger LOG = LoggerFactory.getLogger(RunningJobCrawlerImpl.class);
-	
-	private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
-
-	static {
-		OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
-	}
-	
-	public RunningJobCrawlerImpl(RunningJobCrawlConfig config, JobRunningZKStateManager zkStateManager, 
-			RunningJobCallback callback, JobFilter jobFilter, ReadWriteLock readWriteLock) {
-		this.endpointConfig = config.endPointConfig;
-		this.controlConfig = config.controlConfig;
-		this.callback = callback;
-		this.fetcher = new RMResourceFetcher(endpointConfig);
-		this.jobFilter = jobFilter;
-		this.readWriteLock = readWriteLock;
-		if (config.controlConfig.jobInfoEnabled) {
-			jobCompleteInfoProcessThread = new Thread() {
-				@Override
-				public void run() {
-					startCompleteJobInfoProcessThread();
-				}
-			};
-			jobCompleteInfoProcessThread.setName("JobCompleteInfo-process-thread");
-			jobCompleteInfoProcessThread.setDaemon(true);
-			
-			jobCompleteStatusCheckerThread = new Thread() {
-				@Override
-				public void run() {
-					startCompleteStatusCheckerThread();
-				}
-			};
-			jobCompleteStatusCheckerThread.setName("JobComplete-statusChecker-thread");
-			jobCompleteStatusCheckerThread.setDaemon(true);
-		}
-
-		if (config.controlConfig.jobConfigEnabled) {
-			jobConfigProcessThread = new Thread() {
-				@Override
-				public void run() {
-					startJobConfigProcessThread();
-				}
-			};
-			jobConfigProcessThread.setName("JobConfig-process-thread");
-			jobConfigProcessThread.setDaemon(true);
-		}
-				
-		zkCleanupThread = new Thread() {
-			@Override
-			public void run() {
-				startzkCleanupThread();
-			}
-		};		
-		zkCleanupThread.setName("zk-cleanup-thread");
-		zkCleanupThread.setDaemon(true);
-		
-		this.zkStateManager = zkStateManager;
-		this.processingJobMap.put(ResourceType.JOB_CONFIGURATION, new ConcurrentHashMap<String, JobContext>());
-		this.processingJobMap.put(ResourceType.JOB_COMPLETE_INFO, new ConcurrentHashMap<String, JobContext>());
-		this.queueOfConfig = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobConfigQueue);
-		this.queueOfCompleteJobInfo = new ArrayBlockingQueue<JobContext>(controlConfig.sizeOfJobCompletedInfoQueue);
-	}
-	
-	private void startJobConfigProcessThread() {
-		int configThreadCount = DEFAULT_CONFIG_THREAD_COUNT;
-		LOG.info("Job Config crawler main thread started, pool size: " + DEFAULT_CONFIG_THREAD_COUNT);
-
-    	ThreadFactory factory = new ThreadFactory() {
-			private final AtomicInteger count = new AtomicInteger(0);
-
-			public Thread newThread(Runnable runnable) {
-				count.incrementAndGet();
-				Thread thread = Executors.defaultThreadFactory().newThread(runnable);
-				thread.setName("config-crawler-workthread-" + count.get());
-				return thread;
-			}
-		};
-		
-		ThreadPoolExecutor pool = new ThreadPoolExecutor(configThreadCount, configThreadCount, 0L,
-									  TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
-		
-		while (true) {
-			JobContext context;
-			try {
-				context = queueOfConfig.take();
-				LOG.info("queueOfConfig size: " + queueOfConfig.size());
-				Runnable configCrawlerThread = new ConfigWorkTask(new JobContext(context), fetcher, callback, this);
-				pool.execute(configCrawlerThread);
-			} catch (InterruptedException e) {
-				LOG.warn("Got an InterruptedException: " + e.getMessage());
-			} catch (RejectedExecutionException e2) {
-				LOG.warn("Got RejectedExecutionException: " + e2.getMessage());
-			}
-			catch (Throwable t) {
-				LOG.warn("Got an throwable t, " + t.getMessage());
-			}
-		}
-	}
-	
-	private void startCompleteJobInfoProcessThread() {
-		while(true) {
-			JobContext context = null;
-			try {
-				context = queueOfCompleteJobInfo.take();
-			} catch (InterruptedException ex) {
-			}
-			/** Delay an interval before fetch job complete info, for history url need some time to be accessible,
-			 *  The default interval is set as 5 min,
-			 *  Also need to consider if need multi thread to do this
-			 */
-			while (System.currentTimeMillis() < 
-			      context.fetchedTime + DELAY_TO_UPDATE_COMPLETION_JOB_INFO) {
-				try {
-					Thread.sleep(50);
-				} catch (InterruptedException e1) {
-				}
-			}
-			try {
-				List<Object> objs = fetcher.getResource(ResourceType.JOB_COMPLETE_INFO, JobUtils.getAppIDByJobID(context.jobId));
-				callback.onJobRunningInformation(context, ResourceType.JOB_COMPLETE_INFO, objs);
-			}
-			catch(Exception ex) {
-		        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
-		        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
-		        	// the job remains in processing list, thus we will not do infructuous retry next round
-		        	// TODO need remove it from processing list when job finished to avoid memory leak 
-		        }
-		        else LOG.error("Got an exception when fetching resource ", ex);
-			}
-		}
-	}
-	
-	public void startCompleteStatusCheckerThread() {
-		while(true) {
-			List<Object> list;
-			try {
-				list = fetcher.getResource(ResourceType.JOB_LIST, JobState.COMPLETED.name());
-				if (list == null) {
-					LOG.warn("Current Completed Job List is Empty");
-					continue;
-				}
-				@SuppressWarnings("unchecked")
-				List<AppInfo> apps = (List<AppInfo>)list.get(0);
-				Set<JobContext> completedJobSet = new HashSet<JobContext>();
-				for (AppInfo app : apps) {
-					//Only fetch MapReduce job
-					if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType()) 
-					|| !jobFilter.accept(app.getUser())) {
-						continue;
-					}
-					if (System.currentTimeMillis() - app.getFinishedTime() < controlConfig.completedJobOutofDateTimeInMin * DateUtils.MILLIS_PER_MINUTE) {
-						completedJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()),app.getUser(), System.currentTimeMillis()));
-					}
-				}
-				
-				if (controlConfig.jobConfigEnabled) {
-					addIntoProcessingQueueAndList(completedJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
-				}
-
-				if (controlConfig.jobInfoEnabled) {
-					addIntoProcessingQueueAndList(completedJobSet, queueOfCompleteJobInfo, ResourceType.JOB_COMPLETE_INFO);
-				}
-			} catch (Throwable t) {
-				LOG.error("Got a throwable in fetching job completed list :", t);
-			}
-			try {
-				Thread.sleep(10 * 1000);
-			}catch(Exception ex){
-
-			}
-		}
-	}
-	
-	public void startzkCleanupThread() {
-		LOG.info("zk cleanup thread started");
-		while(true) {
-			try {
-				long thresholdTime = System.currentTimeMillis() - controlConfig.zkCleanupTimeInday * DateUtils.MILLIS_PER_DAY; 
-				String date = DateTimeUtil.format(thresholdTime, "yyyyMMdd");
-				zkStateManager.truncateJobBefore(ResourceType.JOB_CONFIGURATION, date);
-				zkStateManager.truncateJobBefore(ResourceType.JOB_COMPLETE_INFO, date);
-				Thread.sleep(30 * 60 * 1000);
-			}
-			catch (Throwable t) {
-				LOG.warn("Got an throwable, t: ", t);
-			}
-		}
-	}
-	
-	public void addIntoProcessingQueueAndList(Set<JobContext> jobSet, BlockingQueue<JobContext> queue, ResourceType type) {
-		try {
-			readWriteLock.writeLock().lock();
-			LOG.info("Write lock acquired");
-			List<String> processingList = zkStateManager.readProcessedJobs(type);
-			processingList.addAll(extractJobList(type));
-			for (JobContext context: jobSet) {
-				String jobId = context.jobId;
-				if (!processingList.contains(jobId)) {
-					addIntoProcessingList(type, context);
-					queue.add(context);
-				}
-			}
-		}
-		finally {
-			try {readWriteLock.writeLock().unlock(); LOG.info("Write lock released");}
-			catch (Throwable t) { LOG.error("Fail to release Write lock", t);}
-		}
-	}
-	
-	private List<String> extractJobList(ResourceType type) {
-		Map<String, JobContext> contexts = processingJobMap.get(type);
-		return Arrays.asList(contexts.keySet().toArray(new String[0]));
-	}
-	
-	@Override
-	public void crawl() throws Exception {
-		// bring up crawler threads when crawl method is invoked first time
-		if (jobConfigProcessThread != null && !jobConfigProcessThread.isAlive()) {
-			jobConfigProcessThread.start();
-		}
-		
-		if (jobCompleteInfoProcessThread != null && !jobCompleteInfoProcessThread.isAlive()) {
-			jobCompleteInfoProcessThread.start();
-		}		
- 
-		if (jobCompleteStatusCheckerThread != null && !jobCompleteStatusCheckerThread.isAlive()) {
-			jobCompleteStatusCheckerThread.start();
-		}
-		
-		if (!zkCleanupThread.isAlive()) {
-			zkCleanupThread.start();
-		}
-		
-		List<Object> list = fetcher.getResource(ResourceType.JOB_LIST, JobState.RUNNING.name());		
-		if (list == null) {
-			LOG.warn("Current Running Job List is Empty");
-			return;
-		}
-		
-		@SuppressWarnings("unchecked")
-		List<AppInfo> apps = (List<AppInfo>)list.get(0);
-		LOG.info("Current Running Job List size : " + apps.size());
-		Set<JobContext> currentRunningJobSet = new HashSet<JobContext>();
-		for (AppInfo app : apps) {
-			//Only fetch MapReduce job
-			if (!YarnApplicationType.MAPREDUCE.name().equals(app.getApplicationType()) 
-			|| !jobFilter.accept(app.getUser())) {
-				continue;
-			}
-			currentRunningJobSet.add(new JobContext(JobUtils.getJobIDByAppID(app.getId()), app.getUser(), System.currentTimeMillis()));
-		}
-		
-		if (controlConfig.jobConfigEnabled) {
-			addIntoProcessingQueueAndList(currentRunningJobSet, queueOfConfig, ResourceType.JOB_CONFIGURATION);
-		}
-		
-		if (controlConfig.jobInfoEnabled) {			
-			// fetch job detail & jobcounters	
-			for (JobContext context : currentRunningJobSet) {
-				try {
-					List<Object> objs = fetcher.getResource(ResourceType.JOB_RUNNING_INFO, JobUtils.getAppIDByJobID(context.jobId));
-					callback.onJobRunningInformation(context, ResourceType.JOB_RUNNING_INFO, objs);
-				}
-				catch (Exception ex) {
-			        if (ex.getMessage().contains("Server returned HTTP response code: 500")) {
-			        	LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
-			        	// the job remains in processing list, thus we will not do infructuous retry next round
-			        	// TODO need remove it from processing list when job finished to avoid memory leak 
-			        }
-			        else LOG.error("Got an exception when fetching resource, jobId: " + context.jobId , ex);
-				}
-			}
-		}		
-	}
-
-	@Override
-	public void addIntoProcessingList(ResourceType type, JobContext context) {
-		processingJobMap.get(type).put(context.jobId, context);
-	}
-
-	@Override
-	public void removeFromProcessingList(ResourceType type, JobContext context) {
-		processingJobMap.get(type).remove(context.jobId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
deleted file mode 100644
index 9814f5b..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/crawler/XmlHelper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.crawler;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-public class XmlHelper {
-	
-	public static Map<String, String> getConfigs(InputStream is) throws IOException, SAXException, ParserConfigurationException
-	{		
-		DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-		DocumentBuilder db = dbf.newDocumentBuilder();
-		
-		Document dt = db.parse(is);
-		Element element = dt.getDocumentElement();
-		Map<String, String> config = new TreeMap<String, String>();
-				
-		NodeList propertyList = element.getElementsByTagName("property");
-		int length = propertyList.getLength();
-		for(int i = 0; i < length; i++) {
-			Node property = propertyList.item(i);
-			String key = property.getChildNodes().item(0).getTextContent();
-			String value = property.getChildNodes().item(1).getTextContent();
-			config.put(key, value);
-		}
-		return config;		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
deleted file mode 100644
index 2219868..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.ha;
-
-import java.io.IOException;
-
-/**
- * @since Aug 21, 2014
- */
-public interface HAURLSelector {
-	
-	boolean checkUrl(String url);
-		
-	void reSelectUrl() throws IOException;
-	
-	String getSelectedUrl();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/18ae3bbb/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
deleted file mode 100644
index e030cf3..0000000
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.jobrunning.ha;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-
-import org.apache.eagle.jobrunning.url.ServiceURLBuilder;
-import org.apache.eagle.jobrunning.util.InputStreamUtils;
-import org.apache.eagle.jobrunning.common.JobConstants;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HAURLSelectorImpl implements HAURLSelector {
-
-	private final String[] urls;
-	private volatile String selectedUrl;
-	private final ServiceURLBuilder builder;
-	
-	private volatile boolean reselectInProgress;
-	private final JobConstants.CompressionType compressionType;
-	private static final long MAX_RETRY_TIME = 3;
-	private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
-	
-	public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, JobConstants.CompressionType compressionType) {
-		this.urls = urls;
-		this.compressionType = compressionType;
-		this.builder = builder;
-	}
-	
-	public boolean checkUrl(String urlString) {
-		InputStream is = null;
-		try {
-			LOG.info("Getting input stream from url: " + urlString);
-			is = InputStreamUtils.getInputStream(urlString, compressionType);
-		} catch (Exception ex) {
-			LOG.error("Failed to get input stream from url: " + urlString);
-			return false;
-		} finally {
-			if (is != null) { try {	is.close(); } catch (IOException e) {/*Do nothing*/} }
-		}
-		return true;
-	}
-
-	@Override
-	public String getSelectedUrl() {
-		if (selectedUrl == null) {
-			selectedUrl = urls[0];
-		}
-		return selectedUrl;
-	}
-	
-	@Override
-	public void reSelectUrl() throws IOException {
-		if (reselectInProgress) return;
-		synchronized(this) {
-			if (reselectInProgress) return;
-			reselectInProgress = true;
-			try {
-				LOG.info("Going to reselect url");
-				for (int i = 0; i < urls.length; i++) {		
-					String urlToCheck = urls[i];
-					LOG.info("Going to try url :" + urlToCheck);
-					for (int time = 0; time < MAX_RETRY_TIME; time++) {
-						if (checkUrl(builder.build(urlToCheck, JobConstants.JobState.RUNNING.name()))) {
-							selectedUrl = urls[i];
-							LOG.info("Successfully switch to new url : " + selectedUrl);
-							return;
-						}
-						LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
-						try {
-							Thread.sleep(5 * 1000);
-						}
-						catch (InterruptedException ex) { /* Do Nothing */}
-					}
-				}
-				throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
-			}
-			finally {
-				reselectInProgress = false;
-			}
-		}
-	}
-}



Mime
View raw message