eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [3/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring
Date Tue, 05 Jul 2016 18:07:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
new file mode 100644
index 0000000..4f50350
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-spark-history</artifactId>
+  <name>eagle-jpm-spark-history</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-job-common</artifactId>
+  		<version>${project.version}</version>
+		<exclusions>
+		<exclusion>
+			<groupId>org.wso2.orbit.com.lmax</groupId>
+			<artifactId>disruptor</artifactId>
+		</exclusion>
+		</exclusions>
+	</dependency>
+	  <dependency>
+		  <groupId>org.apache.eagle</groupId>
+		  <artifactId>eagle-jpm-util</artifactId>
+		  <version>${project.version}</version>
+	  </dependency>
+	  <dependency>
+		  <groupId>org.apache.eagle</groupId>
+		  <artifactId>eagle-jpm-entity</artifactId>
+		  <version>${project.version}</version>
+	  </dependency>
+
+      <dependency>
+          <groupId>jline</groupId>
+          <artifactId>jline</artifactId>
+          <version>2.12</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-recipes</artifactId>
+          <version>${curator.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+  </dependencies>
+	<build>
+		<resources>
+			<resource>
+				<directory>src/main/resources</directory>
+			</resource>
+		</resources>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptor>src/assembly/eagle-jpm-spark-history-assembly.xml</descriptor>
+					<finalName>eagle-jpm-spark-history-${project.version}</finalName>
+				</configuration>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<tarLongFileMode>posix</tarLongFileMode>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml b/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml
new file mode 100644
index 0000000..66133a0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/assembly/eagle-jpm-spark-history-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <!--includes>
+                <include>org.apache.hadoop:hadoop-common</include>
+                <include>org.apache.hadoop:hadoop-hdfs</include>
+                <include>org.apache.hadoop:hadoop-client</include>
+                <include>org.apache.hadoop:hadoop-auth</include>
+                <include>org.apache.eagle:eagle-stream-process-api</include>
+                <include>org.apache.eagle:eagle-stream-process-base</include>
+                <include>org.jsoup:jsoup</include>
+            </includes-->
+            <excludes>
+                <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.storm:storm-core</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}/</directory>
+            <outputDirectory>/</outputDirectory>
+            <!--<includes>-->
+            <!--<include>*.conf</include>-->
+            <!--<include>*.xml</include>-->
+            <!--<include>*.properties</include>-->
+            <!--<include>*.config</include>-->
+            <!--<include>classes/META-INF/*</include>-->
+            <!--</includes>-->
+
+            <excludes>
+                <exclude>*.yaml</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
new file mode 100644
index 0000000..4282a64
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
@@ -0,0 +1,122 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.config;
+
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.Serializable;
+
+public class SparkHistoryCrawlConfig implements Serializable{
+    public ZKStateConfig zkStateConfig;
+    public JobHistoryEndpointConfig jobHistoryConfig;
+    public HDFSConfig hdfsConfig;
+    public BasicInfo info;
+    public EagleInfo eagleInfo;
+    public StormConfig stormConfig;
+
+    private Config config;
+    public Config getConfig() {
+        return config;
+    }
+
+    public SparkHistoryCrawlConfig() {
+        this.config = ConfigFactory.load();
+
+        this.zkStateConfig = new ZKStateConfig();
+        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+        this.jobHistoryConfig = new JobHistoryEndpointConfig();
+        jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
+        jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
+        jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.pwd");
+        jobHistoryConfig.rms = config.getStringList("dataSourceConfig.rm.url").toArray(new String[0]);
+
+        this.hdfsConfig = new HDFSConfig();
+        this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.baseDir");
+        this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint");
+        this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal");
+        this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab");
+
+        this.info = new BasicInfo();
+        info.site = String.format("%s-%s",config.getString("basic.cluster"),config.getString("basic.datacenter"));
+        info.jobConf = config.getStringList("basic.jobConf.additional.info").toArray(new String[0]);
+
+        this.eagleInfo = new EagleInfo();
+        this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
+        this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
+
+        this.stormConfig = new StormConfig();
+        this.stormConfig.mode = config.getString("storm.mode");
+        this.stormConfig.topologyName = config.getString("storm.name");
+        this.stormConfig.workerNo = config.getInt("storm.workerNo");
+        this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
+        this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
+        this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
+    }
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+    }
+
+    public static class JobHistoryEndpointConfig implements Serializable {
+        public String[] rms;
+        public String historyServerUrl;
+        public String historyServerUserName;
+        public String historyServerUserPwd;
+    }
+
+    public static class HDFSConfig implements Serializable {
+        public String endpoint;
+        public String baseDir;
+        public String principal;
+        public String keytab;
+    }
+
+    public static class BasicInfo implements Serializable {
+        public String site;
+        public String[] jobConf;
+    }
+
+    public static class StormConfig implements Serializable {
+        public String mode;
+        public int workerNo;
+        public int timeoutSec;
+        public String topologyName;
+        public int spoutPending;
+        public int spoutCrawlInterval;
+    }
+
+    public static class EagleInfo implements Serializable {
+        public String host;
+        public int port;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
new file mode 100644
index 0000000..9c720c8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+public enum EventType {
+    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
+    SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
+    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
new file mode 100644
index 0000000..1f76a2f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFInputStreamReader {
+    public void read(InputStream is) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
new file mode 100644
index 0000000..3fbc769
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFParserBase {
+    /**
+     * this method will ensure to close the inputstream
+     * @param is
+     * @throws Exception
+     */
+    public void parse(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
new file mode 100644
index 0000000..db5d432
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import jline.internal.Log;
+import org.apache.eagle.jpm.entity.*;
+import org.apache.eagle.jpm.util.JSONUtil;
+import org.apache.eagle.jpm.util.JobNameNormalization;
+import org.apache.eagle.jpm.util.SparkEntityConstant;
+import org.apache.eagle.jpm.util.SparkJobTagName;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class JHFSparkEventReader {
+
+    public static final int FLUSH_LIMIT = 500;
+    private static final Logger logger = LoggerFactory.getLogger(JHFSparkEventReader.class);
+
+    private Map<String, SparkExecutor> executors;
+    private SparkApp app;
+    private Map<Integer, SparkJob> jobs;
+    private Map<String, SparkStage> stages;
+    private Map<Integer, Set<String>> jobStageMap;
+    private Map<Integer, SparkTask> tasks;
+    private EagleServiceClientImpl client;
+    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
+
+    private List<TaggedLogAPIEntity> createEntities;
+
+    private Config conf;
+
+    public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
+        app = new SparkApp();
+        app.setTags(new HashMap<String, String>(baseTags));
+        app.setYarnState(info.getState());
+        app.setYarnStatus(info.getFinalStatus());
+        createEntities = new ArrayList<>();
+        jobs = new HashMap<Integer, SparkJob>();
+        stages = new HashMap<String, SparkStage>();
+        jobStageMap = new HashMap<Integer, Set<String>>();
+        tasks = new HashMap<Integer, SparkTask>();
+        executors = new HashMap<String, SparkExecutor>();
+        stageTaskStatusMap = new HashMap<>();
+        conf = ConfigFactory.load();
+        this.initiateClient();
+    }
+
+    public void read(JSONObject eventObj) throws Exception {
+        String eventType = (String) eventObj.get("Event");
+        if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
+            handleAppStarted(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
+            handleEnvironmentSet(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
+            handleExecutorAdd(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) {
+            handleBlockManagerAdd(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
+            handleJobStart(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
+            handleStageSubmit(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
+            handleTaskStart(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
+            handleTaskEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
+            handleStageComplete(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
+            handleJobEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) {
+            handleExecutorRemoved(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
+            handleAppEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) {
+            //nothing to do now
+        } else {
+            logger.info("Not registered event type:" + eventType);
+        }
+
+    }
+
+
+    private void handleEnvironmentSet(JSONObject event) {
+        app.setConfig(new JobConfig());
+        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
+
+        List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info");
+        String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
+                "spark.driver.memory", "ebay.job.name", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
+                "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
+        jobConfs.addAll(Arrays.asList(props));
+        for (String prop : jobConfs) {
+            if (sparkProps.containsKey(prop)) {
+                app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
+            }
+        }
+    }
+
+    private Object getConfigVal(JobConfig config, String configName, String type) {
+        if (config.getConfig().containsKey(configName)) {
+            Object val = config.getConfig().get(configName);
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return Integer.parseInt((String) val);
+            } else {
+                return val;
+            }
+        } else {
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return conf.getInt("spark.defaultVal." + configName);
+            } else {
+                return conf.getString("spark.defaultVal." + configName);
+            }
+
+        }
+    }
+
+
+    private boolean isClientMode(JobConfig config) {
+        if (config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+
+    private void handleAppStarted(JSONObject event) {
+        //need update all entities tag before app start
+        List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+        entities.addAll(this.executors.values());
+        entities.add(this.app);
+
+        for (TaggedLogAPIEntity entity : entities) {
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtil.getString(event, "App ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtil.getString(event, "App Name"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), JSONUtil.getString(event, "App Attempt ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtil.getString(event, "App Name"), this.app.getConfig().getConfig().get("ebay.job.name")));
+            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtil.getString(event, "User"));
+        }
+
+        this.app.setStartTime(JSONUtil.getLong(event, "Timestamp"));
+        this.app.setTimestamp(JSONUtil.getLong(event, "Timestamp"));
+
+    }
+
+    private void handleExecutorAdd(JSONObject event) throws Exception {
+        String executorID = (String) event.get("Executor ID");
+        SparkExecutor executor = this.initiateExecutor(executorID, JSONUtil.getLong(event, "Timestamp"));
+
+        JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info");
+
+    }
+
+    private void handleBlockManagerAdd(JSONObject event) throws Exception {
+        long maxMemory = JSONUtil.getLong(event, "Maximum Memory");
+        long timestamp = JSONUtil.getLong(event, "Timestamp");
+        JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID");
+        String executorID = JSONUtil.getString(blockInfo, "Executor ID");
+        String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port"));
+
+        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
+        executor.setMaxMemory(maxMemory);
+        executor.setHostPort(hostport);
+    }
+
+    private void handleTaskStart(JSONObject event) {
+        this.initializeTask(event);
+    }
+
+    private void handleTaskEnd(JSONObject event) {
+        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
+        Integer taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        SparkTask task = null;
+        if (tasks.containsKey(taskId)) {
+            task = tasks.get(taskId);
+        } else {
+            return;
+        }
+
+        task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed"));
+        JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics");
+        if (null != taskMetrics) {
+            task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, "Executor Deserialize Time"));
+            task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor Run Time"));
+            task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time"));
+            task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size"));
+            task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, "Result Serialization Time"));
+            task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory Bytes Spilled"));
+            task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes Spilled"));
+
+            JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, "Input Metrics");
+            if (null != inputMetrics) {
+                task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes Read"));
+                task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records Read"));
+            }
+
+            JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, "Output Metrics");
+            if (null != outputMetrics) {
+                task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes Written"));
+                task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records Written"));
+            }
+
+            JSONObject shuffleWriteMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+            if (null != shuffleWriteMetrics) {
+                task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
+                task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
+            }
+
+            JSONObject shuffleReadMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+            if (null != shuffleReadMetrics) {
+                task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes Read"));
+                task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote Bytes Read"));
+                task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records Read"));
+            }
+        } else {
+            //for tasks success without task metrics, save in the end if no other information
+            if (!task.isFailed()) {
+                return;
+            }
+        }
+
+        aggregateToStage(task);
+        aggregateToExecutor(task);
+        tasks.remove(taskId);
+        this.flushEntities(task, false);
+    }
+
+    private SparkTask initializeTask(JSONObject event) {
+        SparkTask task = new SparkTask();
+        task.setTags(new HashMap(this.app.getTags()));
+        task.setTimestamp(app.getTimestamp());
+
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), JSONUtil.getLong(event, "Stage ID").toString());
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), JSONUtil.getLong(event, "Stage Attempt ID").toString());
+
+        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
+        task.setTaskId(JSONUtil.getInt(taskInfo, "Task ID"));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), JSONUtil.getInt(taskInfo, "Index").toString());
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), JSONUtil.getInt(taskInfo, "Attempt").toString());
+        task.setLaunchTime(JSONUtil.getLong(taskInfo, "Launch Time"));
+        task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID"));
+        task.setHost(JSONUtil.getString(taskInfo, "Host"));
+        task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality"));
+        task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative"));
+
+        tasks.put(task.getTaskId(), task);
+        return task;
+    }
+
+    private void handleJobStart(JSONObject event) {
+        SparkJob job = new SparkJob();
+        job.setTags(new HashMap(this.app.getTags()));
+        job.setTimestamp(app.getTimestamp());
+
+        Integer jobId = JSONUtil.getInt(event, "Job ID");
+        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString());
+        job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time"));
+
+        //for complete application, no active stages/tasks
+        job.setNumActiveStages(0);
+        job.setNumActiveTasks(0);
+
+        this.jobs.put(jobId, job);
+        this.jobStageMap.put(jobId, new HashSet<String>());
+
+        JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos");
+        job.setNumStages(stages.size());
+        for (int i = 0; i < stages.size(); i++) {
+            JSONObject stageInfo = (JSONObject) stages.get(i);
+            Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+            Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+            String stageName = JSONUtil.getString(stageInfo, "Stage Name");
+            int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+            this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
+
+        }
+
+    }
+
+    private void handleStageSubmit(JSONObject event) {
+        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
+        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
+
+        if (!stages.containsKey(this.generateStageKey(stageId.toString(), stageAttemptId.toString()))) {
+            //may be further attempt for one stage
+            String baseAttempt = this.generateStageKey(stageId.toString(), "0");
+            if (stages.containsKey(baseAttempt)) {
+                SparkStage stage = stages.get(baseAttempt);
+                String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
+
+                String stageName = JSONUtil.getString(event, "Stage Name");
+                int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+                this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
+            }
+        }
+
+    }
+
+    private void handleStageComplete(JSONObject event) {
+        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
+        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        SparkStage stage = stages.get(key);
+        stage.setSubmitTime(JSONUtil.getLong(stageInfo, "Submission Time"));
+        stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time"));
+
+        if (stageInfo.containsKey("Failure Reason")) {
+            stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString());
+        } else {
+            stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString());
+        }
+    }
+
+    private void handleExecutorRemoved(JSONObject event) {
+        String executorID = JSONUtil.getString(event, "Executor ID");
+        SparkExecutor executor = executors.get(executorID);
+        executor.setEndTime(JSONUtil.getLong(event, "Timestamp"));
+
+    }
+
+    private void handleJobEnd(JSONObject event) {
+        Integer jobId = JSONUtil.getInt(event, "Job ID");
+        SparkJob job = jobs.get(jobId);
+        job.setCompletionTime(JSONUtil.getLong(event, "Completion Time"));
+        JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
+        String result = JSONUtil.getString(jobResult, "Result");
+        if (result.equalsIgnoreCase("JobSucceeded")) {
+            job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString());
+        } else {
+            job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString());
+        }
+
+    }
+
+    private void handleAppEnd(JSONObject event) {
+        long endTime = JSONUtil.getLong(event, "Timestamp");
+        app.setEndTime(endTime);
+    }
+
+    public void clearReader() throws Exception {
+        //clear tasks
+        for (SparkTask task : tasks.values()) {
+            logger.info("Task {} does not have result or no task metrics.", task.getTaskId());
+            task.setFailed(true);
+            aggregateToStage(task);
+            aggregateToExecutor(task);
+            this.flushEntities(task, false);
+        }
+
+        List<SparkStage> needStoreStages = new ArrayList<>();
+        for (SparkStage stage : this.stages.values()) {
+            Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
+                SparkJob job = this.jobs.get(jobId);
+                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
+                job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
+            } else {
+                this.aggregateToJob(stage);
+                this.aggregateStageToApp(stage);
+                needStoreStages.add(stage);
+            }
+            String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+            String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
+        }
+
+        this.flushEntities(needStoreStages, false);
+        for (SparkJob job : jobs.values()) {
+            this.aggregateJobToApp(job);
+        }
+        this.flushEntities(jobs.values(), false);
+
+        app.setExecutors(executors.values().size());
+        long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
+        long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName()) : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
+        int executoreCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
+        int driverCore = this.isClientMode(app.getConfig()) ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
+        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
+        long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
+
+        app.setExecMemoryBytes(executorMemory);
+        app.setDriveMemoryBytes(driverMemory);
+        app.setExecutorCores(executoreCore);
+        app.setDriverCores(driverCore);
+        app.setExecutorMemoryOverhead(executorMemoryOverhead);
+        app.setDriverMemoryOverhead(driverMemoryOverhead);
+
+        for (SparkExecutor executor : executors.values()) {
+            String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
+            if (executorID.equalsIgnoreCase("driver")) {
+                executor.setExecMemoryBytes(driverMemory);
+                executor.setCores(driverCore);
+                executor.setMemoryOverhead(driverMemoryOverhead);
+            } else {
+                executor.setExecMemoryBytes(executorMemory);
+                executor.setCores(executoreCore);
+                executor.setMemoryOverhead(executorMemoryOverhead);
+            }
+            if (executor.getEndTime() == 0)
+                executor.setEndTime(app.getEndTime());
+            this.aggregateExecutorToApp(executor);
+        }
+        this.flushEntities(executors.values(), false);
+        //spark code...tricky
+        app.setSkippedTasks(app.getCompleteTasks());
+        this.flushEntities(app, true);
+    }
+
+    private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
+        long result = 0l;
+        if (config.getConfig().containsKey(fieldName)) {
+            result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m");
+            if(result  == 0l){
+               result = this.parseExecutorMemory(config.getConfig().get(fieldName));
+            }
+        }
+
+        if(result == 0l){
+            result =  Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
+        }
+        return result;
+    }
+
+    private void aggregateExecutorToApp(SparkExecutor executor) {
+        app.setTotalExecutorTime(app.getTotalExecutorTime() + (executor.getEndTime() - executor.getStartTime()));
+    }
+
+    private void aggregateJobToApp(SparkJob job) {
+        //aggregate job level metrics
+        app.setNumJobs(app.getNumJobs() + 1);
+        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
+        app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
+        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
+        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
+        app.setTotalStages(app.getTotalStages() + job.getNumStages());
+        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
+        app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
+    }
+
+    private void aggregateStageToApp(SparkStage stage) {
+        //aggregate task level metrics
+        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
+        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
+        app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
+        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
+        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
+        app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
+        app.setResultSize(app.getResultSize() + stage.getResultSize());
+        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
+        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
+        app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
+        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
+        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
+        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
+        app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
+        app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
+    }
+
+    private void aggregateToStage(SparkTask task) {
+        String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+        String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+        String key = this.generateStageKey(stageId, stageAttemptId);
+        SparkStage stage = stages.get(key);
+
+        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
+        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
+        stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
+        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
+        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
+        stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
+        stage.setResultSize(stage.getResultSize() + task.getResultSize());
+        stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
+        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
+        stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
+        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
+        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
+        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
+        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
+        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
+
+        boolean success = !task.isFailed();
+
+        Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
+        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
+            //has previous task attempt, retrieved from task index in one stage
+            boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
+            success = previousResult || success;
+            if (previousResult != success) {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+                stageTaskStatusMap.get(key).put(taskIndex, success);
+            }
+        } else {
+            if (success) {
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+            } else {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
+            }
+            stageTaskStatusMap.get(key).put(taskIndex, success);
+        }
+
+    }
+
+    private void aggregateToExecutor(SparkTask task) {
+        String executorId = task.getExecutorId();
+        SparkExecutor executor = executors.get(executorId);
+
+        if (null != executor) {
+            executor.setTotalTasks(executor.getTotalTasks() + 1);
+            if (task.isFailed()) {
+                executor.setFailedTasks(executor.getFailedTasks() + 1);
+            } else {
+                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
+            }
+            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
+            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+            executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
+            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
+            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+        }
+
+    }
+
+    private void aggregateToJob(SparkStage stage) {
+        Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+        SparkJob job = jobs.get(jobId);
+        job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
+        job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
+        job.setNumTask(job.getNumTask() + stage.getNumTasks());
+
+
+        if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) {
+            //if multiple attempts succeed, just count one
+            if (!hasStagePriorAttemptSuccess(stage)) {
+                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
+            }
+
+        } else {
+            job.setNumFailedStages(job.getNumFailedStages() + 1);
+        }
+    }
+
+    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
+        Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+        for (Integer i = 0; i < stageAttemptId; i++) {
+            SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString()));
+            if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    private String generateStageKey(String stageId, String stageAttemptId) {
+        return String.format("%s-%s", stageId, stageAttemptId);
+    }
+
+    private void initiateStage(Integer jobId, Integer stageId, Integer stageAttemptId, String name, int numTasks) {
+        SparkStage stage = new SparkStage();
+        stage.setTags(new HashMap(this.app.getTags()));
+        stage.setTimestamp(app.getTimestamp());
+        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString());
+        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stageId.toString());
+        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stageAttemptId.toString());
+        stage.setName(name);
+        stage.setNumActiveTasks(0);
+        stage.setNumTasks(numTasks);
+        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+
+        String stageKey = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        stages.put(stageKey, stage);
+        this.jobStageMap.get(jobId).add(stageKey);
+    }
+
+
+    private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception {
+        if (!executors.containsKey(executorID)) {
+            SparkExecutor executor = new SparkExecutor();
+            executor.setTags(new HashMap(this.app.getTags()));
+            executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
+            executor.setStartTime(startTime);
+            executor.setTimestamp(app.getTimestamp());
+
+            this.executors.put(executorID, executor);
+        }
+
+        return this.executors.get(executorID);
+    }
+
+    private String getNormalizedName(String jobName, String assignedName) {
+        if (null != assignedName) {
+            return assignedName;
+        } else {
+            return JobNameNormalization.getInstance().normalize(jobName);
+        }
+    }
+
+    private long parseExecutorMemory(String memory) {
+
+        if (memory.endsWith("g") || memory.endsWith("G")) {
+            int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * executorGB;
+        } else if (memory.endsWith("m") || memory.endsWith("M")) {
+            int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * executorMB;
+        } else if (memory.endsWith("k") || memory.endsWith("K")) {
+            int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * executorKB;
+        } else if (memory.endsWith("t") || memory.endsWith("T")) {
+            int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * executorTB;
+        } else if (memory.endsWith("p") || memory.endsWith("P")) {
+            int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+        }
+        Log.info("Cannot parse memory info " +  memory);
+        return 0l;
+    }
+
+    private void flushEntities(Object entity, boolean forceFlush) {
+        this.flushEntities(Arrays.asList(entity), forceFlush);
+    }
+
+    private void flushEntities(Collection entities, boolean forceFlush) {
+        this.createEntities.addAll(entities);
+
+        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+            try {
+                this.doFlush(this.createEntities);
+                this.createEntities.clear();
+            } catch (Exception e) {
+                logger.error("Fail to flush entities", e);
+            }
+
+        }
+    }
+
+    private EagleServiceBaseClient initiateClient() {
+        String host = conf.getString("eagleProps.eagle.service.host");
+        int port = conf.getInt("eagleProps.eagle.service.port");
+        String userName = conf.getString("eagleProps.eagle.service.userName");
+        String pwd = conf.getString("eagleProps.eagle.service.pwd");
+        client = new EagleServiceClientImpl(host, port, userName, pwd);
+        int timeout = conf.getInt("eagleProps.eagle.service.read_timeout");
+        client.getJerseyClient().setReadTimeout(timeout * 1000);
+
+        return client;
+    }
+
+    private void doFlush(List entities) throws Exception {
+        logger.info("start flushing entities of total number " + entities.size());
+        client.create(entities);
+        logger.info("finish flushing entities of total number " + entities.size());
+//        for(Object entity: entities){
+//            if(entity instanceof SparkApp){
+//                for (Field field : entity.getClass().getDeclaredFields()) {
+//                    field.setAccessible(true); // You might want to set modifier to public first.
+//                    Object value = field.get(entity);
+//                    if (value != null) {
+//                        System.out.println(field.getName() + "=" + value);
+//                    }
+//                }
+//            }
+//        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
new file mode 100644
index 0000000..75ce508
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class JHFSparkParser implements JHFParserBase {
+
+    private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
+
+    JHFSparkEventReader eventReader;
+
+    public JHFSparkParser(JHFSparkEventReader reader){
+        this.eventReader = reader;
+    }
+
+    @Override
+    public void parse(InputStream is) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+        try{
+            String line = null;
+
+            JSONParser parser = new JSONParser();
+            while((line = reader.readLine()) != null){
+                try{
+                    JSONObject eventObj = (JSONObject) parser.parse(line);
+                    this.eventReader.read(eventObj);
+                }catch(Exception e){
+                    logger.error(String.format("Fail to parse %s.", line), e);
+                }
+            }
+            this.eventReader.clearReader();
+
+        }finally {
+            if(reader != null){
+                reader.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
new file mode 100644
index 0000000..c206b71
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+public class SparkApplicationInfo {
+
+    private String state;
+    private String finalStatus;
+    private String queue;
+    private String name;
+    private String user;
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public String getFinalStatus() {
+        return finalStatus;
+    }
+
+    public void setFinalStatus(String finalStatus) {
+        this.finalStatus = finalStatus;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
new file mode 100644
index 0000000..38c0a04
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import org.apache.eagle.jpm.util.SparkJobTagName;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkHistoryFileInputStreamReaderImpl implements JHFInputStreamReader {
+
+    private String site;
+    private SparkApplicationInfo app;
+
+
+    public SparkHistoryFileInputStreamReaderImpl(String site, SparkApplicationInfo app){
+        this.site = site;
+        this.app = app;
+    }
+
+    @Override
+    public void read(InputStream is) throws Exception {
+        Map<String, String> baseTags = new HashMap<>();
+        baseTags.put(SparkJobTagName.SITE.toString(), site);
+        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+        JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
+        parser.parse(is);
+    }
+
+    public static void main(String[] args) throws Exception{
+        SparkHistoryFileInputStreamReaderImpl impl = new SparkHistoryFileInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+        impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
new file mode 100644
index 0000000..60e126e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -0,0 +1,262 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.status;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class JobHistoryZKStateManager {
+    public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+    private String zkRoot;
+    private CuratorFramework _curator;
+    private static String START_TIMESTAMP = "lastAppTime";
+
+    private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception {
+        return CuratorFrameworkFactory.newClient(
+                config.zkStateConfig.zkQuorum,
+                config.zkStateConfig.zkSessionTimeoutMs,
+                15000,
+                new RetryNTimes(config.zkStateConfig.zkRetryTimes, config.zkStateConfig.zkRetryInterval)
+        );
+    }
+
+    public JobHistoryZKStateManager(SparkHistoryCrawlConfig config) {
+        this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
+
+        try {
+            _curator = newCurator(config);
+            _curator.start();
+;        } catch (Exception e) {
+            LOG.error("Fail to connect to zookeeper", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close() {
+        _curator.close();
+        _curator = null;
+    }
+
+    public List<String> loadApplications(int limit){
+        String jobPath = zkRoot + "/jobs";
+        List<String> apps = new ArrayList<>();
+        InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
+        try{
+            lock.acquire();
+            Iterator<String> iter =  _curator.getChildren().forPath(jobPath).iterator();
+            while(iter.hasNext()) {
+                String appId = iter.next();
+                String path = jobPath + "/" + appId;
+                if(_curator.checkExists().forPath(path) != null){
+                    if(new String(_curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())){
+                        apps.add(appId);
+                    }
+                }
+                if(apps.size() == limit){
+                    break;
+                }
+            }
+            return apps;
+        }catch(Exception e){
+            LOG.error("fail to read unprocessed jobs", e);
+            throw new RuntimeException(e);
+        }finally {
+            try{
+                lock.release();
+            }catch(Exception e){
+                LOG.error("fail to release lock", e);
+            }
+
+        }
+    }
+
+    public void resetApplications() {
+        String jobPath = zkRoot + "/jobs";
+        InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
+        try {
+            lock.acquire();
+            Iterator<String> iter =  _curator.getChildren().forPath(jobPath).iterator();
+            while (iter.hasNext()) {
+                String appId = iter.next();
+                String path = jobPath + "/" + appId;
+                try {
+                    if (_curator.checkExists().forPath(path) != null) {
+                        String status = new String(_curator.getData().forPath(path));
+                        if(!ZKStateConstant.AppStatus.INIT.toString().equals(status))
+                            _curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+                    }
+                } catch (Exception e) {
+                    LOG.error("fail to read unprocessed job", e);
+                    throw new RuntimeException(e);
+                }
+            }
+
+        } catch (Exception e) {
+            LOG.error("fail to read unprocessed jobs", e);
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                lock.release();
+            } catch(Exception e) {
+                LOG.error("fail to release lock", e);
+            }
+        }
+    }
+
+    public SparkApplicationInfo getApplicationInfo(String appId){
+
+        String appPath = zkRoot + "/jobs/" + appId +"/info";
+        try{
+            SparkApplicationInfo info = new SparkApplicationInfo();
+            if(_curator.checkExists().forPath(appPath)!= null){
+                String[] appStatus = new String(_curator.getData().forPath(appPath)).split("/");
+                info.setQueue(appStatus[0]);
+                info.setState(appStatus[1]);
+                info.setFinalStatus(appStatus[2]);
+                if(appStatus.length > 3){
+                    info.setUser(appStatus[3]);
+                    info.setName(appStatus[4]);
+                }
+
+            }
+            return info;
+        }catch(Exception e){
+            LOG.error("fail to read application attempt info", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public long readLastFinishedTimestamp(){
+        String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
+
+        try{
+            if(_curator.checkExists().forPath(lastTimeStampPath) == null){
+                return 0l;
+            }else{
+                return Long.valueOf(new String(_curator.getData().forPath(lastTimeStampPath)));
+            }
+        }catch(Exception e){
+            LOG.error("fail to read last finished spark job timestamp", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean hasApplication(String appId){
+        String path = zkRoot + "/jobs/" + appId;
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                return true;
+            }
+            return false;
+        }catch (Exception e){
+            LOG.error("fail to check whether application exists", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name){
+        String path = zkRoot + "/jobs/" + appId;
+
+
+        try{
+            if(_curator.checkExists().forPath(path) != null){
+                _curator.delete().deletingChildrenIfNeeded().forPath(path);
+            }
+
+            name = name.replace("/","_");
+            if(name.length() > 50){
+                name = name.substring(0, 50);
+            }
+
+            CuratorTransactionBridge result =  _curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+            result = result.and().create().withMode(CreateMode.PERSISTENT).forPath(path + "/info", String.format("%s/%s/%s/%s/%s", queue, yarnState, yarnStatus, user, name).getBytes("UTF-8"));
+
+            result.and().commit();
+        }catch (Exception e){
+            LOG.error("fail adding finished application", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    public void updateLastUpdateTime(Long updateTime){
+        String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
+        try{
+            if(_curator.checkExists().forPath(lastTimeStampPath) == null){
+                _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+            }else{
+                long originalEndTime = this.readLastFinishedTimestamp();
+                if(originalEndTime < updateTime){
+                   _curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+                }
+            }
+        }catch (Exception e){
+            LOG.error("fail to update last finished time", e);
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status){
+
+        String path = zkRoot + "/jobs/" + appId ;
+        InterProcessLock lock = new InterProcessReadWriteLock(_curator,zkRoot+"/jobs").readLock();
+        try{
+            if(_curator.checkExists().forPath(path) != null){
+                if(status.equals(ZKStateConstant.AppStatus.FINISHED)){
+                    lock.acquire();
+                    _curator.delete().deletingChildrenIfNeeded().forPath(path);
+                }else{
+                    _curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
+                }
+            }else{
+                String errorMsg = String.format("fail to update for application with path %s", path);
+                LOG.error(errorMsg);
+            }
+        }catch (Exception e){
+            LOG.error("fail to update application status", e);
+            throw new RuntimeException(e);
+        }finally{
+            try{
+                if(lock.isAcquiredInThisProcess())
+                    lock.release();
+            }catch (Exception e){
+                LOG.error("fail to release lock",e);
+            }
+
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
new file mode 100644
index 0000000..40efa50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.status;
+
+public class ZKStateConstant {
+
+    public enum AppStatus{
+        INIT, SENT_FOR_PARSE, FINISHED, FAILED
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
new file mode 100644
index 0000000..9c231aa
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
@@ -0,0 +1,152 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+public class FinishedSparkJobSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(FinishedSparkJobSpout.class);
+    private SpoutOutputCollector collector;
+    private JobHistoryZKStateManager zkState;
+    private SparkHistoryCrawlConfig config;
+    private ResourceFetcher rmFetch;
+    private long lastFinishAppTime = 0;
+    private Map<String, Integer> failTimes;
+
+    private static final int FAIL_MAX_TIMES = 5;
+
+    public FinishedSparkJobSpout(SparkHistoryCrawlConfig config){
+        this.config = config;
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+        rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
+        this.failTimes = new HashMap<>();
+        this.collector = spoutOutputCollector;
+        this.zkState = new JobHistoryZKStateManager(config);
+        this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
+        zkState.resetApplications();
+    }
+
+
+    @Override
+    public void nextTuple() {
+        LOG.info("Start to run tuple");
+        try {
+            long fetchTime = Calendar.getInstance().getTimeInMillis();
+            if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
+                List apps = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, new Long(lastFinishAppTime).toString());
+                List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
+                LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
+                for (AppInfo app: appInfos) {
+                    String appId = app.getId();
+                    if (!zkState.hasApplication(appId)) {
+                        zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
+                    }
+                }
+                this.lastFinishAppTime = fetchTime;
+                zkState.updateLastUpdateTime(fetchTime);
+            }
+
+            List<String> appIds = zkState.loadApplications(10);
+            for (String appId: appIds) {
+                collector.emit(new Values(appId), appId);
+                LOG.info("emit " + appId);
+                zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+            }
+            LOG.info("{} apps sent.", appIds.size());
+
+            if (appIds.isEmpty()) {
+                this.takeRest(60);
+            }
+        } catch (Exception e) {
+            LOG.error("Fail to run next tuple", e);
+           // this.takeRest(10);
+        }
+
+    }
+
+    private void takeRest(int seconds) {
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch(InterruptedException e) {
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("appId"));
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        String appId = (String) msgId;
+        int failTimes = 0;
+        if (this.failTimes.containsKey(appId)) {
+            failTimes = this.failTimes.get(appId);
+        }
+        failTimes ++;
+        if (failTimes >= FAIL_MAX_TIMES) {
+            this.failTimes.remove(appId);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+            LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
+        } else {
+            this.failTimes.put(appId, failTimes);
+            collector.emit(new Values(appId), appId);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        String appId = (String) msgId;
+        if (this.failTimes.containsKey(appId)) {
+            this.failTimes.remove(appId);
+        }
+
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        zkState.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
new file mode 100644
index 0000000..bd0eb85
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
@@ -0,0 +1,81 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+
+public class SparkHistoryTopology {
+
+    private SparkHistoryCrawlConfig SHConfig;
+
+    public SparkHistoryTopology(SparkHistoryCrawlConfig config){
+        this.SHConfig = config;
+    }
+
+    public TopologyBuilder getBuilder() {
+        TopologyBuilder builder = new TopologyBuilder();
+        String spoutName = "sparkHistoryJobSpout";
+        String boltName = "sparkHistoryJobBolt";
+        com.typesafe.config.Config config = this.SHConfig.getConfig();
+        builder.setSpout(spoutName,
+                new FinishedSparkJobSpout(SHConfig),
+                config.getInt("storm.parallelismConfig." + spoutName)
+        ).setNumTasks(config.getInt("storm.tasks." + spoutName));
+
+        builder.setBolt(boltName,
+                new SparkJobParseBolt(SHConfig),
+                config.getInt("storm.parallelismConfig." + boltName)
+        ).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName);
+        return builder;
+    }
+
+
+    public static void main(String[] args) {
+        try {
+            SparkHistoryCrawlConfig crawlConfig = new SparkHistoryCrawlConfig();
+
+            Config conf = new Config();
+            conf.setNumWorkers(crawlConfig.stormConfig.workerNo);
+            conf.setMessageTimeoutSecs(crawlConfig.stormConfig.timeoutSec);
+            //conf.setMaxSpoutPending(crawlConfig.stormConfig.spoutPending);
+            //conf.put(Config.TOPOLOGY_DEBUG, true);
+
+
+            if (crawlConfig.stormConfig.mode.equals("local")) {
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology(
+                        crawlConfig.stormConfig.topologyName,
+                        conf,
+                        new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
+            } else {
+                StormSubmitter.submitTopology(
+                        crawlConfig.stormConfig.topologyName,
+                        conf,
+                        new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}


Mime
View raw message