eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [6/8] incubator-eagle git commit: [EAGLE-422] eagle support for mr & spark running job monitoring
Date Tue, 09 Aug 2016 05:25:34 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..6bb11ad
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JPMEntityRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running.entities;
+
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCountersSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(JobExecutionAPIEntity.class);
+        entitySet.add(TaskExecutionAPIEntity.class);
+        entitySet.add(TaskAttemptExecutionAPIEntity.class);
+
+        serDeserMap.put(JobCounters.class, new JobCountersSerDeser());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobConfig.java
new file mode 100644
index 0000000..20f9cf4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobConfig.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobExecutionAPIEntity.java
new file mode 100644
index 0000000..5fe7d5a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/JobExecutionAPIEntity.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningJobs")
+@ColumnFamily("f")
+@Prefix("jobs")
+@Service(Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+})
+@Tags({"site", "jobId", "jobName", "jobDefId", "jobType", "user", "queue"})
+public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private long elapsedTime;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int mapsTotal;
+    @Column("f")
+    private int mapsCompleted;
+    @Column("g")
+    private int reducesTotal;
+    @Column("h")
+    private int reducesCompleted;
+    @Column("i")
+    private double mapProgress;
+    @Column("j")
+    private double reduceProgress;
+    @Column("k")
+    private int mapsPending;
+    @Column("l")
+    private int mapsRunning;
+    @Column("m")
+    private int reducesPending;
+    @Column("n")
+    private int reducesRunning;
+    @Column("o")
+    private int newReduceAttempts;
+    @Column("p")
+    private int runningReduceAttempts;
+    @Column("q")
+    private int failedReduceAttempts;
+    @Column("r")
+    private int killedReduceAttempts;
+    @Column("s")
+    private int successfulReduceAttempts;
+    @Column("t")
+    private int newMapAttempts;
+    @Column("u")
+    private int runningMapAttempts;
+    @Column("v")
+    private int failedMapAttempts;
+    @Column("w")
+    private int killedMapAttempts;
+    @Column("x")
+    private int successfulMapAttempts;
+    @Column("y")
+    private AppInfo appInfo;
+    @Column("z")
+    private JobCounters jobCounters;
+    @Column("aa")
+    private JobConfig jobConfig;
+    @Column("ab")
+    private long allocatedMB;
+    @Column("ac")
+    private int allocatedVCores;
+    @Column("ad")
+    private int runningContainers;
+    @Column("ae")
+    private int dataLocalMaps;
+    @Column("af")
+    private double dataLocalMapsPercentage;
+    @Column("ag")
+    private int rackLocalMaps;
+    @Column("ah")
+    private double rackLocalMapsPercentage;
+    @Column("ai")
+    private int totalLaunchedMaps;
+    @Column("aj")
+    private long submissionTime;
+
+    public JobConfig getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(JobConfig jobConfig) {
+        this.jobConfig = jobConfig;
+        valueChanged("jobConfig");
+    }
+
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        valueChanged("jobCounters");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public int getMapsTotal() {
+        return mapsTotal;
+    }
+
+    public void setMapsTotal(int mapsTotal) {
+        this.mapsTotal = mapsTotal;
+        valueChanged("mapsTotal");
+    }
+
+    public int getMapsCompleted() {
+        return mapsCompleted;
+    }
+
+    public void setMapsCompleted(int mapsCompleted) {
+        this.mapsCompleted = mapsCompleted;
+        valueChanged("mapsCompleted");
+    }
+
+    public int getReducesTotal() {
+        return reducesTotal;
+    }
+
+    public void setReducesTotal(int reducesTotal) {
+        this.reducesTotal = reducesTotal;
+        valueChanged("reducesTotal");
+    }
+
+    public int getReducesCompleted() {
+        return reducesCompleted;
+    }
+
+    public void setReducesCompleted(int reducesCompleted) {
+        this.reducesCompleted = reducesCompleted;
+        valueChanged("reducesCompleted");
+    }
+
+    public double getMapProgress() {
+        return mapProgress;
+    }
+
+    public void setMapProgress(double mapProgress) {
+        this.mapProgress = mapProgress;
+        valueChanged("mapProgress");
+    }
+
+    public double getReduceProgress() {
+        return reduceProgress;
+    }
+
+    public void setReduceProgress(double reduceProgress) {
+        this.reduceProgress = reduceProgress;
+        valueChanged("reduceProgress");
+    }
+
+    public int getMapsPending() {
+        return mapsPending;
+    }
+
+    public void setMapsPending(int mapsPending) {
+        this.mapsPending = mapsPending;
+        valueChanged("mapsPending");
+    }
+
+    public int getMapsRunning() {
+        return mapsRunning;
+    }
+
+    public void setMapsRunning(int mapsRunning) {
+        this.mapsRunning = mapsRunning;
+        valueChanged("mapsRunning");
+    }
+
+    public int getReducesPending() {
+        return reducesPending;
+    }
+
+    public void setReducesPending(int reducesPending) {
+        this.reducesPending = reducesPending;
+        valueChanged("reducesPending");
+    }
+
+    public int getReducesRunning() {
+        return reducesRunning;
+    }
+
+    public void setReducesRunning(int reducesRunning) {
+        this.reducesRunning = reducesRunning;
+        valueChanged("reducesRunning");
+    }
+
+    public int getNewReduceAttempts() {
+        return newReduceAttempts;
+    }
+
+    public void setNewReduceAttempts(int newReduceAttempts) {
+        this.newReduceAttempts = newReduceAttempts;
+        valueChanged("newReduceAttempts");
+    }
+
+    public int getRunningReduceAttempts() {
+        return runningReduceAttempts;
+    }
+
+    public void setRunningReduceAttempts(int runningReduceAttempts) {
+        this.runningReduceAttempts = runningReduceAttempts;
+        valueChanged("runningReduceAttempts");
+    }
+
+    public int getFailedReduceAttempts() {
+        return failedReduceAttempts;
+    }
+
+    public void setFailedReduceAttempts(int failedReduceAttempts) {
+        this.failedReduceAttempts = failedReduceAttempts;
+        valueChanged("failedReduceAttempts");
+    }
+
+    public int getKilledReduceAttempts() {
+        return killedReduceAttempts;
+    }
+
+    public void setKilledReduceAttempts(int killedReduceAttempts) {
+        this.killedReduceAttempts = killedReduceAttempts;
+        valueChanged("killedReduceAttempts");
+    }
+
+    public int getSuccessfulReduceAttempts() {
+        return successfulReduceAttempts;
+    }
+
+    public void setSuccessfulReduceAttempts(int successfulReduceAttempts) {
+        this.successfulReduceAttempts = successfulReduceAttempts;
+        valueChanged("successfulReduceAttempts");
+    }
+
+    public int getNewMapAttempts() {
+        return newMapAttempts;
+    }
+
+    public void setNewMapAttempts(int newMapAttempts) {
+        this.newMapAttempts = newMapAttempts;
+        valueChanged("newMapAttempts");
+    }
+
+    public int getRunningMapAttempts() {
+        return runningMapAttempts;
+    }
+
+    public void setRunningMapAttempts(int runningMapAttempts) {
+        this.runningMapAttempts = runningMapAttempts;
+        valueChanged("runningMapAttempts");
+    }
+
+    public int getFailedMapAttempts() {
+        return failedMapAttempts;
+    }
+
+    public void setFailedMapAttempts(int failedMapAttempts) {
+        this.failedMapAttempts = failedMapAttempts;
+        valueChanged("failedMapAttempts");
+    }
+
+    public int getKilledMapAttempts() {
+        return killedMapAttempts;
+    }
+
+    public void setKilledMapAttempts(int killedMapAttempts) {
+        this.killedMapAttempts = killedMapAttempts;
+        valueChanged("killedMapAttempts");
+    }
+
+    public int getSuccessfulMapAttempts() {
+        return successfulMapAttempts;
+    }
+
+    public void setSuccessfulMapAttempts(int successfulMapAttempts) {
+        this.successfulMapAttempts = successfulMapAttempts;
+        valueChanged("successfulMapAttempts");
+    }
+
+    public AppInfo getAppInfo() {
+        return appInfo;
+    }
+
+    public void setAppInfo(AppInfo appInfo) {
+        this.appInfo = appInfo;
+        valueChanged("appInfo");
+    }
+
+    public long getAllocatedMB() {
+        return allocatedMB;
+    }
+
+    public void setAllocatedMB(long allocatedMB) {
+        this.allocatedMB = allocatedMB;
+        valueChanged("allocatedMB");
+    }
+
+    public int getAllocatedVCores() {
+        return allocatedVCores;
+    }
+
+    public void setAllocatedVCores(int allocatedVCores) {
+        this.allocatedVCores = allocatedVCores;
+        valueChanged("allocatedVCores");
+    }
+
+    public int getRunningContainers() {
+        return runningContainers;
+    }
+
+    public void setRunningContainers(int runningContainers) {
+        this.runningContainers = runningContainers;
+        valueChanged("runningContainers");
+    }
+
+    public int getDataLocalMaps() {
+        return dataLocalMaps;
+    }
+
+    public void setDataLocalMaps(int dataLocalMaps) {
+        this.dataLocalMaps = dataLocalMaps;
+        valueChanged("dataLocalMaps");
+    }
+
+    public double getDataLocalMapsPercentage() {
+        return dataLocalMapsPercentage;
+    }
+
+    public void setDataLocalMapsPercentage(double dataLocalMapsPercentage) {
+        this.dataLocalMapsPercentage = dataLocalMapsPercentage;
+        valueChanged("dataLocalMapsPercentage");
+    }
+
+    public int getRackLocalMaps() {
+        return rackLocalMaps;
+    }
+
+    public void setRackLocalMaps(int rackLocalMaps) {
+        this.rackLocalMaps = rackLocalMaps;
+        valueChanged("rackLocalMaps");
+    }
+
+    public double getRackLocalMapsPercentage() {
+        return rackLocalMapsPercentage;
+    }
+
+    public void setRackLocalMapsPercentage(double rackLocalMapsPercentage) {
+        this.rackLocalMapsPercentage = rackLocalMapsPercentage;
+        valueChanged("rackLocalMapsPercentage");
+    }
+
+    public int getTotalLaunchedMaps() {
+        return totalLaunchedMaps;
+    }
+
+    public void setTotalLaunchedMaps(int totalLaunchedMaps) {
+        this.totalLaunchedMaps = totalLaunchedMaps;
+        valueChanged("totalLaunchedMaps");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        valueChanged("submissionTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskAttemptExecutionAPIEntity.java
new file mode 100644
index 0000000..c74563b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskAttemptExecutionAPIEntity.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningTasks")
+@ColumnFamily("f")
+@Prefix("tasks_exec_attempt")
+@Service(Constants.JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+
+@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "host", "rack"})
+public class TaskAttemptExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long finishTime;
+    @Column("c")
+    private long elapsedTime;
+    @Column("d")
+    private double progress;
+    @Column("e")
+    private String id;
+    @Column("f")
+    private String status;
+    @Column("g")
+    private String diagnostics;
+    @Column("h")
+    private String type;
+    @Column("i")
+    private String assignedContainerId;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getFinishTime() {
+        return finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+        valueChanged("finishTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+        valueChanged("progress");
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+        valueChanged("id");
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public String getDiagnostics() {
+        return diagnostics;
+    }
+
+    public void setDiagnostics(String diagnostics) {
+        this.diagnostics = diagnostics;
+        valueChanged("diagnostics");
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+        valueChanged("type");
+    }
+
+    public String getAssignedContainerId() {
+        return assignedContainerId;
+    }
+
+    public void setAssignedContainerId(String assignedContainerId) {
+        this.assignedContainerId = assignedContainerId;
+        valueChanged("assignedContainerId");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskExecutionAPIEntity.java
new file mode 100644
index 0000000..7677f45
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/entities/TaskExecutionAPIEntity.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningTasks")
+@ColumnFamily("f")
+@Prefix("tasks_exec")
+@Service(Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+
+@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue"})
+public class TaskExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long finishTime;
+    @Column("c")
+    private long elapsedTime;
+    @Column("d")
+    private double progress;
+    @Column("e")
+    private String status;
+    @Column("f")
+    private String successfulAttempt;
+    @Column("g")
+    private String statusDesc;
+    @Column("h")
+    private JobCounters jobCounters;
+    @Column("i")
+    private String host;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getFinishTime() {
+        return finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+        valueChanged("finishTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+        valueChanged("progress");
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public String getSuccessfulAttempt() {
+        return successfulAttempt;
+    }
+
+    public void setSuccessfulAttempt(String successfulAttempt) {
+        this.successfulAttempt = successfulAttempt;
+        valueChanged("successfulAttempt");
+    }
+
+    public String getStatusDesc() {
+        return statusDesc;
+    }
+
+    public void setStatusDesc(String statusDesc) {
+        this.statusDesc = statusDesc;
+        valueChanged("statusDesc");
+    }
+
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        valueChanged("jobCounters");
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        valueChanged("host");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
new file mode 100644
index 0000000..aae0dd8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.parser;
+
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.entities.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
+import org.apache.eagle.jpm.mr.running.parser.metrics.TaskExecutionMetricsCreationListener;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class MRJobEntityCreationHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(MRJobEntityCreationHandler.class);
+
+    private List<TaggedLogAPIEntity> entities = new ArrayList<>();
+    private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private JobExecutionMetricsCreationListener jobMetricsListener;
+    private TaskExecutionMetricsCreationListener taskMetricsListener;
+
+    public MRJobEntityCreationHandler(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig) {
+        this.eagleServiceConfig = eagleServiceConfig;
+        jobMetricsListener = new JobExecutionMetricsCreationListener();
+        taskMetricsListener = new TaskExecutionMetricsCreationListener();
+    }
+
+    public void add(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+        List<GenericMetricEntity> metricEntities;
+        if (entity instanceof TaskExecutionAPIEntity) {
+            metricEntities = taskMetricsListener.generateMetrics((TaskExecutionAPIEntity) entity);
+            entities.addAll(metricEntities);
+        } else if (entity instanceof JobExecutionAPIEntity) {
+            metricEntities = jobMetricsListener.generateMetrics((JobExecutionAPIEntity) entity);
+            entities.addAll(metricEntities);
+        }
+        if (entities.size() >= eagleServiceConfig.maxFlushNum) {
+            this.flush();
+        }
+    }
+
+    public boolean flush() {
+        //need flush right now
+        if (entities.size() == 0) {
+            return true;
+        }
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        try {
+            LOG.info("start to flush mr job entities, size {}", entities.size());
+            client.create(entities);
+            LOG.info("finish flushing mr job entities, size {}", entities.size());
+            entities.clear();
+        } catch (Exception e) {
+            LOG.warn("exception found when flush entities, {}", e);
+            e.printStackTrace();
+            return false;
+        } finally {
+            client.getJerseyClient().destroy();
+            try {
+                client.close();
+            } catch (Exception e) {
+            }
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
new file mode 100644
index 0000000..9f993a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.parser;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.entities.JobConfig;
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.entities.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.*;
+import org.apache.eagle.jpm.util.resourceFetch.model.JobCounters;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.InputStream;
+import java.net.URLConnection;
+import java.util.*;
+import java.util.function.Function;
+
+public class MRJobParser implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRJobParser.class);
+
+    public enum ParserStatus {
+        RUNNING,
+        FINISHED,
+        APP_FINISHED
+    }
+    private AppInfo app;
+    private static final int MAX_RETRY_TIMES = 2;
+    private MRJobEntityCreationHandler mrJobEntityCreationHandler;
+    //<jobId, JobExecutionAPIEntity>
+    private Map<String, JobExecutionAPIEntity> mrJobEntityMap;
+    private Map<String, JobConfig> mrJobConfigs;
+    private static final String XML_HTTP_HEADER = "Accept";
+    private static final String XML_FORMAT = "application/xml";
+    private static final int CONNECTION_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    private final Object lock = new Object();
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+    private Map<String, String> commonTags = new HashMap<>();
+    private MRRunningJobManager runningJobManager;
+    private ParserStatus parserStatus;
+    private ResourceFetcher rmResourceFetcher;
+    private boolean first;
+    private Set<String> finishedTaskIds;
+    private List<String> configKeys;
+    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public MRJobParser(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+                       MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+                       AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
+                       MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
+                       List<String> configKeys) {
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.app = app;
+        this.mrJobEntityMap = new HashMap<>();
+        this.mrJobEntityMap = mrJobMap;
+        if (this.mrJobEntityMap == null) {
+            this.mrJobEntityMap = new HashMap<>();
+        }
+        this.mrJobConfigs = new HashMap<>();
+
+        this.mrJobEntityCreationHandler = new MRJobEntityCreationHandler(eagleServiceConfig);
+
+        this.commonTags.put(MRJobTagName.SITE.toString(), jobExtractorConfig.site);
+        this.commonTags.put(MRJobTagName.USER.toString(), app.getUser());
+        this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
+        this.runningJobManager = runningJobManager;
+        this.parserStatus  = ParserStatus.FINISHED;
+        this.rmResourceFetcher = rmResourceFetcher;
+        this.first = true;
+        this.finishedTaskIds = new HashSet<>();
+        this.configKeys = configKeys;
+    }
+
+    public ParserStatus status() {
+        return this.parserStatus;
+    }
+
+    public void setStatus(ParserStatus status) {
+        this.parserStatus = status;
+    }
+
+    private void finishMRJob(String mrJobId) {
+        JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId);
+        jobExecutionAPIEntity.setStatus(Constants.AppState.FINISHED.toString());
+        mrJobConfigs.remove(mrJobId);
+        if (mrJobConfigs.size() == 0) {
+            this.parserStatus = ParserStatus.APP_FINISHED;
+        }
+        LOG.info("mr job {} has been finished", mrJobId);
+    }
+
+    private void fetchMRRunningInfo() throws Exception {
+        for (int i = 0; i < MAX_RETRY_TIMES; i++) {
+            if (fetchMRJobs()) {
+                break;
+            } else if (i >= MAX_RETRY_TIMES - 1) {
+                //check whether the app has finished. if we test that we can connect rm, then we consider the jobs have finished
+                //if we get here either because of cannot connect rm or the jobs have finished
+                rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
+                mrJobEntityMap.keySet().forEach(this::finishMRJob);
+                return;
+            }
+        }
+
+        List<Function<String, Boolean>> functions = new ArrayList<>();
+        functions.add(fetchJobConfig);
+        functions.add(fetchJobCounters);
+        if (!this.first) { //do not fetch these info below for the first time
+            functions.add(fetchTasks);
+        }
+
+        this.first = false;
+        for (String jobId : mrJobEntityMap.keySet()) {
+            for (Function<String, Boolean> function : functions) {
+                int i = 0;
+                for (; i < MAX_RETRY_TIMES; i++) {
+                    if (function.apply(jobId)) {
+                        break;
+                    }
+                }
+                if (i >= MAX_RETRY_TIMES) {
+                    //may caused by rm unreachable
+                    rmResourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
+                    finishMRJob(jobId);
+                    break;
+                }
+            }
+        }
+    }
+
+    private boolean fetchMRJobs() {
+        String jobURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        List<MRJob> mrJobs = null;
+        try {
+            is = InputStreamUtils.getInputStream(jobURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch mr job from {}", jobURL);
+            mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob();
+        } catch (Exception e) {
+            LOG.warn("fetch mr job from {} failed, {}", jobURL, e);
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        for (MRJob mrJob : mrJobs) {
+            String id = mrJob.getId();
+            if (!mrJobEntityMap.containsKey(id)) {
+                mrJobEntityMap.put(id, new JobExecutionAPIEntity());
+            }
+
+            JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(id);
+            jobExecutionAPIEntity.setTags(new HashMap<>(commonTags));
+            jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_ID.toString(), id);
+            jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), mrJob.getName());
+            jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), mrJob.getName());
+            jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
+            jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
+            jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
+            jobExecutionAPIEntity.setElapsedTime(mrJob.getElapsedTime());
+            jobExecutionAPIEntity.setStatus(mrJob.getState());
+            jobExecutionAPIEntity.setMapsTotal(mrJob.getMapsTotal());
+            jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted());
+            jobExecutionAPIEntity.setReducesTotal(mrJob.getReducesTotal());
+            jobExecutionAPIEntity.setReducesCompleted(mrJob.getReducesCompleted());
+            jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
+            jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
+            jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());
+            jobExecutionAPIEntity.setMapsRunning(mrJob.getMapsRunning());
+            jobExecutionAPIEntity.setReducesPending(mrJob.getReducesPending());
+            jobExecutionAPIEntity.setReducesRunning(mrJob.getReducesRunning());
+            jobExecutionAPIEntity.setNewReduceAttempts(mrJob.getNewReduceAttempts());
+            jobExecutionAPIEntity.setRunningReduceAttempts(mrJob.getRunningReduceAttempts());
+            jobExecutionAPIEntity.setFailedReduceAttempts(mrJob.getFailedReduceAttempts());
+            jobExecutionAPIEntity.setKilledReduceAttempts(mrJob.getKilledReduceAttempts());
+            jobExecutionAPIEntity.setSuccessfulReduceAttempts(mrJob.getSuccessfulReduceAttempts());
+            jobExecutionAPIEntity.setNewMapAttempts(mrJob.getNewMapAttempts());
+            jobExecutionAPIEntity.setRunningMapAttempts(mrJob.getRunningMapAttempts());
+            jobExecutionAPIEntity.setFailedMapAttempts(mrJob.getFailedMapAttempts());
+            jobExecutionAPIEntity.setKilledMapAttempts(mrJob.getKilledMapAttempts());
+            jobExecutionAPIEntity.setSuccessfulMapAttempts(mrJob.getSuccessfulMapAttempts());
+            jobExecutionAPIEntity.setAppInfo(app);
+            jobExecutionAPIEntity.setAllocatedMB(app.getAllocatedMB());
+            jobExecutionAPIEntity.setAllocatedVCores(app.getAllocatedVCores());
+            jobExecutionAPIEntity.setRunningContainers(app.getRunningContainers());
+            runningJobManager.update(app.getId(), id, jobExecutionAPIEntity);
+        }
+
+        return true;
+    }
+
+    private Function<String, Boolean> fetchJobCounters = jobId -> {
+        String jobCounterURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_JOB_COUNTERS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        JobCounters jobCounters = null;
+        try {
+            is = InputStreamUtils.getInputStream(jobCounterURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch mr job counter from {}", jobCounterURL);
+            jobCounters = OBJ_MAPPER.readValue(is, JobCountersWrapper.class).getJobCounters();
+        } catch (Exception e) {
+            LOG.warn("fetch mr job counter from {} failed, {}", jobCounterURL, e);
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        if (jobCounters.getCounterGroup() == null) return true;
+        JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(jobId);
+        org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
+        Map<String, Map<String, Long>> groups = new HashMap<>();
+
+        for (JobCounterGroup jobCounterGroup : jobCounters.getCounterGroup()) {
+            String counterGroupName = jobCounterGroup.getCounterGroupName();
+            if (!groups.containsKey(counterGroupName)) {
+                groups.put(counterGroupName, new HashMap<>());
+            }
+
+            Map<String, Long> counterValues = groups.get(counterGroupName);
+            List<JobCounterItem> items = jobCounterGroup.getCounter();
+            if (items == null) continue;
+            for (JobCounterItem item : items) {
+                String key = item.getName();
+                counterValues.put(key, item.getTotalCounterValue());
+                if (counterGroupName.equals(Constants.JOB_COUNTER)) {
+                    if (key.equals(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+                        jobExecutionAPIEntity.setDataLocalMaps((int)item.getTotalCounterValue());
+                    } else if (key.equals(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+                        jobExecutionAPIEntity.setRackLocalMaps((int)item.getTotalCounterValue());
+                    } else if (key.equals(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+                        jobExecutionAPIEntity.setTotalLaunchedMaps((int)item.getTotalCounterValue());
+                    }
+                }
+            }
+        }
+
+        jobCounter.setCounters(groups);
+        jobExecutionAPIEntity.setJobCounters(jobCounter);
+        if (jobExecutionAPIEntity.getTotalLaunchedMaps() > 0) {
+            jobExecutionAPIEntity.setDataLocalMapsPercentage(jobExecutionAPIEntity.getDataLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
+            jobExecutionAPIEntity.setRackLocalMapsPercentage(jobExecutionAPIEntity.getRackLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
+        }
+        return true;
+    };
+
+    private Function<Pair<String, String>, org.apache.eagle.jpm.util.jobcounter.JobCounters> fetchTaskCounters = jobAndTaskId -> {
+        org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
+        String jobId = jobAndTaskId.getLeft();
+        String taskId = jobAndTaskId.getRight();
+        String taskCounterURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_JOB_COUNTERS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        TaskCounters taskCounters = null;
+        try {
+            is = InputStreamUtils.getInputStream(taskCounterURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch mr task counters from {}", taskCounterURL);
+            taskCounters = OBJ_MAPPER.readValue(is, TaskCountersWrapper.class).getJobTaskCounters();
+        } catch (Exception e) {
+            LOG.warn("fetch mr task counters from {} failed, {}", taskCounterURL, e);
+            return null;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        if (taskCounters.getTaskCounterGroup() == null) return jobCounter;
+        Map<String, Map<String, Long>> groups = new HashMap<>();
+
+        for (TaskCounterGroup taskCounterGroup : taskCounters.getTaskCounterGroup()) {
+            if (!groups.containsKey(taskCounterGroup.getCounterGroupName())) {
+                groups.put(taskCounterGroup.getCounterGroupName(), new HashMap<>());
+            }
+
+            Map<String, Long> counterValues = groups.get(taskCounterGroup.getCounterGroupName());
+            List<TaskCounterItem> items = taskCounterGroup.getCounter();
+            if (items == null) continue;
+            for (TaskCounterItem item : items) {
+                counterValues.put(item.getName(), item.getValue());
+            }
+        }
+
+        jobCounter.setCounters(groups);
+
+        return jobCounter;
+    };
+
+    private Function<Pair<String, String>, TaskAttemptExecutionAPIEntity> fetchTaskAttempt = jobAndTaskId -> {
+        String jobId = jobAndTaskId.getLeft();
+        String taskId = jobAndTaskId.getRight();
+        String taskAttemptURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        List<MRTaskAttempt> taskAttempts = null;
+        try {
+            is = InputStreamUtils.getInputStream(taskAttemptURL, null, Constants.CompressionType.GZIP);
+            LOG.info("fetch mr task attempt from {}", taskAttemptURL);
+            taskAttempts = OBJ_MAPPER.readValue(is, MRTaskAttemptWrapper.class).getTaskAttempts().getTaskAttempt();
+        } catch (Exception e) {
+            LOG.warn("fetch mr task attempt from {} failed, {}", taskAttemptURL, e);
+            return null;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+        Comparator<MRTaskAttempt> byStartTime = (e1, e2) -> -1 * Long.compare(e1.getStartTime(), e2.getStartTime());
+        Iterator<MRTaskAttempt> taskAttemptIterator = taskAttempts.stream().sorted(byStartTime).iterator();
+        while (taskAttemptIterator.hasNext()) {
+            MRTaskAttempt mrTaskAttempt = taskAttemptIterator.next();
+            TaskAttemptExecutionAPIEntity taskAttemptExecutionAPIEntity = new TaskAttemptExecutionAPIEntity();
+            taskAttemptExecutionAPIEntity.setTags(new HashMap<>(mrJobEntityMap.get(jobId).getTags()));
+            taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.TASK_TYPE.toString(), mrTaskAttempt.getType());
+            taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.TASK_ID.toString(), taskId);
+            taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.RACK.toString(), mrTaskAttempt.getRack());
+            String nodeHttpAddress = mrTaskAttempt.getNodeHttpAddress();
+            String host = nodeHttpAddress.substring(0, nodeHttpAddress.indexOf(':'));
+            taskAttemptExecutionAPIEntity.getTags().put(MRJobTagName.HOSTNAME.toString(), host);
+
+            taskAttemptExecutionAPIEntity.setTimestamp(mrTaskAttempt.getStartTime());
+            taskAttemptExecutionAPIEntity.setStartTime(mrTaskAttempt.getStartTime());
+            taskAttemptExecutionAPIEntity.setFinishTime(mrTaskAttempt.getFinishTime());
+            taskAttemptExecutionAPIEntity.setElapsedTime(mrTaskAttempt.getElapsedTime());
+            taskAttemptExecutionAPIEntity.setProgress(mrTaskAttempt.getProgress());
+            taskAttemptExecutionAPIEntity.setId(mrTaskAttempt.getId());
+            taskAttemptExecutionAPIEntity.setStatus(mrTaskAttempt.getState());
+            taskAttemptExecutionAPIEntity.setDiagnostics(mrTaskAttempt.getDiagnostics());
+            taskAttemptExecutionAPIEntity.setType(mrTaskAttempt.getType());
+            taskAttemptExecutionAPIEntity.setAssignedContainerId(mrTaskAttempt.getAssignedContainerId());
+            this.mrJobEntityCreationHandler.add(taskAttemptExecutionAPIEntity);
+            return taskAttemptExecutionAPIEntity;
+        }
+        return null;
+    };
+
+    private Set<String> calcFetchCounterAndAttemptTaskId(List<MRTask> tasks) {
+        Set<String> needFetchAttemptTasks = new HashSet<>();
+        //1, sort by elapsedTime
+        Comparator<MRTask> byElapsedTimeIncrease = (e1, e2) -> Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
+        Comparator<MRTask> byElapsedTimeDecrease = (e1, e2) -> -1 * Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
+        //2, get finished bottom n
+        Iterator<MRTask> taskIteratorIncrease = tasks.stream()
+                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+                .sorted(byElapsedTimeIncrease).iterator();
+        int i = 0;
+        while (taskIteratorIncrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+            MRTask mrTask = taskIteratorIncrease.next();
+            if (mrTask.getElapsedTime() > 0) {
+                i++;
+                needFetchAttemptTasks.add(mrTask.getId());
+            }
+        }
+        //3, fetch finished top n
+        Iterator<MRTask> taskIteratorDecrease = tasks.stream()
+                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+                .sorted(byElapsedTimeDecrease).iterator();
+        i = 0;
+        while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+            MRTask mrTask = taskIteratorDecrease.next();
+            if (mrTask.getElapsedTime() > 0) {
+                i++;
+                needFetchAttemptTasks.add(mrTask.getId());
+            }
+        }
+        //4, fetch running top n
+        taskIteratorDecrease = tasks.stream()
+                .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
+                .sorted(byElapsedTimeDecrease).iterator();
+        i = 0;
+        while (taskIteratorDecrease.hasNext() && i < jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+            MRTask mrTask = taskIteratorDecrease.next();
+            if (mrTask.getElapsedTime() > 0) {
+                i++;
+                needFetchAttemptTasks.add(mrTask.getId());
+            }
+        }
+        return needFetchAttemptTasks;
+    }
+
+    private Function<String, Boolean> fetchTasks = jobId -> {
+        String taskURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        List<MRTask> tasks = null;
+        try {
+            is = InputStreamUtils.getInputStream(taskURL, null, Constants.CompressionType.NONE);
+            LOG.info("fetch mr task from {}", taskURL);
+            tasks = OBJ_MAPPER.readValue(is, MRTasksWrapper.class).getTasks().getTask();
+        } catch (Exception e) {
+            LOG.warn("fetch mr task from {} failed, {}", taskURL, e);
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        Set<String> needFetchAttemptTasks = calcFetchCounterAndAttemptTaskId(tasks);
+        for (MRTask task : tasks) {
+            if (this.finishedTaskIds.contains(task.getId()) && !needFetchAttemptTasks.contains(task.getId())) {
+                continue;
+            }
+            TaskExecutionAPIEntity taskExecutionAPIEntity = new TaskExecutionAPIEntity();
+            taskExecutionAPIEntity.setTags(new HashMap<>(mrJobEntityMap.get(jobId).getTags()));
+            taskExecutionAPIEntity.getTags().put(MRJobTagName.TASK_TYPE.toString(), task.getType());
+            taskExecutionAPIEntity.getTags().put(MRJobTagName.TASK_ID.toString(), task.getId());
+
+            taskExecutionAPIEntity.setTimestamp(app.getStartedTime());
+            taskExecutionAPIEntity.setStartTime(task.getStartTime());
+            taskExecutionAPIEntity.setFinishTime(task.getFinishTime());
+            taskExecutionAPIEntity.setElapsedTime(task.getElapsedTime());
+            taskExecutionAPIEntity.setProgress(task.getProgress());
+            taskExecutionAPIEntity.setStatus(task.getState());
+            taskExecutionAPIEntity.setSuccessfulAttempt(task.getSuccessfulAttempt());
+            taskExecutionAPIEntity.setStatusDesc(task.getStatus());
+
+            if (needFetchAttemptTasks.contains(task.getId())) {
+                taskExecutionAPIEntity.setJobCounters(fetchTaskCounters.apply(Pair.of(jobId, task.getId())));
+                if (taskExecutionAPIEntity.getJobCounters() == null) {
+                    return false;
+                }
+
+                TaskAttemptExecutionAPIEntity taskAttemptExecutionAPIEntity = fetchTaskAttempt.apply(Pair.of(jobId, task.getId()));
+                if (taskAttemptExecutionAPIEntity != null) {
+                    taskExecutionAPIEntity.setHost(taskAttemptExecutionAPIEntity.getTags().get(MRJobTagName.HOSTNAME.toString()));
+                }
+            }
+
+            mrJobEntityCreationHandler.add(taskExecutionAPIEntity);
+
+            if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString()) ||
+                    task.getState().equals(Constants.TaskState.FAILED.toString()) ||
+                    task.getState().equals(Constants.TaskState.KILLED.toString()) ||
+                    task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
+                //LOG.info("mr job {} task {} has finished", jobId, task.getId());
+                this.finishedTaskIds.add(task.getId());
+            }
+        }
+        return true;
+    };
+
+    private Function<String, Boolean> fetchJobConfig = jobId -> {
+        if (mrJobConfigs.containsKey(jobId)) {
+            mrJobEntityMap.get(jobId).setJobConfig(mrJobConfigs.get(jobId));
+            return true;
+        }
+        String confURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_CONF_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        InputStream is = null;
+        try {
+            LOG.info("fetch job conf from {}", confURL);
+            final URLConnection connection = URLConnectionUtils.getConnection(confURL);
+            connection.setRequestProperty(XML_HTTP_HEADER, XML_FORMAT);
+            connection.setConnectTimeout(CONNECTION_TIMEOUT);
+            connection.setReadTimeout(READ_TIMEOUT);
+            is = connection.getInputStream();
+            DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+            DocumentBuilder db = dbf.newDocumentBuilder();
+            Document dt = db.parse(is);
+            Element element = dt.getDocumentElement();
+            JobConfig config = new JobConfig();
+            NodeList propertyList = element.getElementsByTagName("property");
+            int length = propertyList.getLength();
+            for (int i = 0; i < length; i++) {
+                Node property = propertyList.item(i);
+                String key = property.getChildNodes().item(0).getTextContent();
+                String value = property.getChildNodes().item(1).getTextContent();
+                if (this.configKeys.contains(key)) {
+                    config.put(key, value);
+                }
+
+                if (!this.configKeys.isEmpty() && key.equals(this.configKeys.get(0))) {
+                    mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOD_DEF_ID.toString(), value);
+                }
+            }
+            mrJobEntityMap.get(jobId).setJobConfig(config);
+            mrJobConfigs.put(jobId, config);
+
+            mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId));
+        } catch (Exception e) {
+            LOG.warn("fetch job conf from {} failed, {}", confURL, e);
+            return false;
+        } finally {
+            Utils.closeInputStream(is);
+        }
+
+        return true;
+    };
+
+    @Override
+    public void run() {
+        synchronized (this.lock) {
+            if (this.parserStatus == ParserStatus.APP_FINISHED) {
+                return;
+            }
+
+            this.parserStatus = ParserStatus.RUNNING;
+            LOG.info("start to process yarn application " + app.getId());
+            try {
+                fetchMRRunningInfo();
+            } catch (Exception e) {
+                LOG.warn("exception found when process application {}, {}", app.getId(), e);
+                e.printStackTrace();
+            } finally {
+                for (String jobId : mrJobEntityMap.keySet()) {
+                    mrJobEntityCreationHandler.add(mrJobEntityMap.get(jobId));
+                }
+                if (mrJobEntityCreationHandler.flush()) { //force flush
+                    //we must flush entities before delete from zk in case of missing finish state of jobs
+                    //delete from zk if needed
+                    mrJobEntityMap.keySet()
+                            .stream()
+                            .filter(
+                                    jobId -> mrJobEntityMap.get(jobId).getStatus().equals(Constants.AppState.FINISHED.toString()) ||
+                                            mrJobEntityMap.get(jobId).getStatus().equals(Constants.AppState.FAILED.toString()))
+                            .forEach(
+                                    jobId -> this.runningJobManager.delete(app.getId(), jobId));
+                }
+
+                LOG.info("finish process yarn application " + app.getId());
+            }
+            if (this.parserStatus == ParserStatus.RUNNING) {
+                this.parserStatus = ParserStatus.FINISHED;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
new file mode 100644
index 0000000..8634b6a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.running.parser.metrics;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractMetricsCreationListener<E extends TaggedLogAPIEntity> {
+
+    public abstract List<GenericMetricEntity> generateMetrics(E entity);
+
+    protected abstract String buildMetricName(String field);
+
+    protected GenericMetricEntity metricWrapper(Long timestamp, String field, double value, Map<String, String> tags) {
+        String metricName = buildMetricName(field);
+        GenericMetricEntity metricEntity = new GenericMetricEntity();
+        metricEntity.setTimestamp(timestamp);
+        metricEntity.setTags(tags);
+        metricEntity.setPrefix(metricName);
+        metricEntity.setValue(new double[]{value});
+        return metricEntity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
new file mode 100644
index 0000000..d7b84cc
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.running.parser.metrics;
+
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JobExecutionMetricsCreationListener extends AbstractMetricsCreationListener<JobExecutionAPIEntity> {
+
+    @Override
+    public List<GenericMetricEntity> generateMetrics(JobExecutionAPIEntity entity) {
+        List<GenericMetricEntity> metrics = new ArrayList<>();
+        if (entity != null) {
+            Long currentTime = System.currentTimeMillis();
+            Map<String, String> tags = entity.getTags();
+            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, entity.getAllocatedMB(), tags));
+            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, entity.getAllocatedVCores(), tags));
+            metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, entity.getRunningContainers(), tags));
+            org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters();
+            if (jobCounters != null && jobCounters.getCounters() != null) {
+                for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
+                    for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
+                        String metricName = entry.getKey().toLowerCase();
+                        metrics.add(metricWrapper(currentTime, metricName, entry.getValue(), tags));
+                    }
+                }
+            }
+        }
+        return metrics;
+    }
+
+    @Override
+    public String buildMetricName(String field) {
+        return String.format(Constants.metricFormat, Constants.JOB_LEVEL, field);
+    }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
new file mode 100644
index 0000000..6d9525e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.running.parser.metrics;
+
+import org.apache.eagle.jpm.mr.running.entities.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TaskExecutionMetricsCreationListener extends AbstractMetricsCreationListener<TaskExecutionAPIEntity> {
+    @Override
+    public List<GenericMetricEntity> generateMetrics(TaskExecutionAPIEntity entity) {
+        List<GenericMetricEntity> metrics = new ArrayList<>();
+        if (entity != null) {
+            Long currentTime = System.currentTimeMillis();
+            Map<String, String> tags = entity.getTags();
+            metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getElapsedTime(), tags));
+        }
+        return metrics;
+    }
+
+    @Override
+    public String buildMetricName(String field) {
+        return String.format(Constants.metricFormat, Constants.TASK_LEVEL, field);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
new file mode 100644
index 0000000..352696b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.recover;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class MRRunningJobManager implements Serializable {
+    private RunningJobManager runningJobManager;
+
+    public MRRunningJobManager(MRRunningConfigManager.ZKStateConfig config) {
+        this.runningJobManager = new RunningJobManager(config.zkQuorum,
+                config.zkSessionTimeoutMs, config.zkRetryTimes, config.zkRetryInterval, config.zkRoot);
+    }
+
+    public Map<String, JobExecutionAPIEntity> recoverYarnApp(String appId) throws Exception {
+        Map<String, Pair<Map<String, String>, AppInfo>> result = this.runningJobManager.recoverYarnApp(appId);
+        Map<String, JobExecutionAPIEntity> jobs = new HashMap<>();
+        for (String jobId : result.keySet()) {
+            Pair<Map<String, String>, AppInfo> job = result.get(jobId);
+            JobExecutionAPIEntity jobExecutionAPIEntity = new JobExecutionAPIEntity();
+            jobExecutionAPIEntity.setTags(job.getLeft());
+            jobExecutionAPIEntity.setAppInfo(job.getRight());
+            jobExecutionAPIEntity.setTimestamp(job.getRight().getStartedTime());
+            jobs.put(jobId, jobExecutionAPIEntity);
+        }
+        return jobs;
+    }
+
+    public Map<String, Map<String, JobExecutionAPIEntity>> recover() {
+        //we need read from zookeeper, path looks like /apps/mr/running/yarnAppId/jobId/
+        //<yarnAppId, <jobId, JobExecutionAPIEntity>>
+        Map<String, Map<String, JobExecutionAPIEntity>> result = new HashMap<>();
+        Map<String, Map<String, Pair<Map<String, String>, AppInfo>>> apps = this.runningJobManager.recover();
+        for (String appId : apps.keySet()) {
+            result.put(appId, new HashMap<>());
+            Map<String, Pair<Map<String, String>, AppInfo>> jobs = apps.get(appId);
+
+            for (String jobId : jobs.keySet()) {
+                Pair<Map<String, String>, AppInfo> job = jobs.get(jobId);
+                JobExecutionAPIEntity jobExecutionAPIEntity = new JobExecutionAPIEntity();
+                jobExecutionAPIEntity.setTags(job.getLeft());
+                jobExecutionAPIEntity.setAppInfo(job.getRight());
+                jobExecutionAPIEntity.setTimestamp(job.getRight().getStartedTime());
+                result.get(appId).put(jobId, jobExecutionAPIEntity);
+            }
+        }
+        return result;
+    }
+
+    public boolean update(String yarnAppId, String jobId, JobExecutionAPIEntity entity) {
+        return this.runningJobManager.update(yarnAppId, jobId, entity.getTags(), entity.getAppInfo());
+    }
+
+    public void delete(String yarnAppId, String jobId) {
+        this.runningJobManager.delete(yarnAppId, jobId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
new file mode 100644
index 0000000..1703b25
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.storm;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class MRRunningJobFetchSpout extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobFetchSpout.class);
+    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private MRRunningConfigManager.EndpointConfig endpointConfig;
+    private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+    private ResourceFetcher resourceFetcher;
+    private SpoutOutputCollector collector;
+    private boolean init;
+    private transient MRRunningJobManager runningJobManager;
+    private Set<String> runningYarnApps;
+    public MRRunningJobFetchSpout(MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+                                  MRRunningConfigManager.EndpointConfig endpointConfig,
+                                  MRRunningConfigManager.ZKStateConfig zkStateConfig) {
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.endpointConfig = endpointConfig;
+        this.zkStateConfig = zkStateConfig;
+        this.init = false;
+        this.runningYarnApps = new HashSet<>();
+    }
+
+    @Override
+    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+        resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+        collector = spoutOutputCollector;
+        this.runningJobManager = new MRRunningJobManager(zkStateConfig);
+    }
+
+    @Override
+    public void nextTuple() {
+        LOG.info("Start to fetch mr running jobs");
+        try {
+            List<AppInfo> apps;
+            Map<String, Map<String, JobExecutionAPIEntity>> mrApps = null;
+            if (!this.init) {
+                mrApps = recoverRunningApps();
+
+                apps = new ArrayList<>();
+                for (String appId : mrApps.keySet()) {
+                    Map<String, JobExecutionAPIEntity> jobs = mrApps.get(appId);
+                    if (jobs.size() > 0) {
+                        Set<String> jobIds = jobs.keySet();
+                        apps.add(jobs.get(jobIds.iterator().next()).getAppInfo());
+                        this.runningYarnApps.add(appId);
+                    }
+                }
+                LOG.info("recover {} mr yarn apps from zookeeper", apps.size());
+                this.init = true;
+            } else {
+                apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
+                LOG.info("get {} apps from resource manager", apps.size());
+                Set<String> running = new HashSet<>();
+                Iterator<String> appIdIterator = this.runningYarnApps.iterator();
+                while (appIdIterator.hasNext()) {
+                    String appId = appIdIterator.next();
+                    boolean hasFinished = true;
+                    for (AppInfo appInfo : apps) {
+                        if (appId.equals(appInfo.getId())) {
+                            hasFinished = false;
+                        }
+                        running.add(appInfo.getId());
+                    }
+
+                    if (hasFinished) {
+                        try {
+                            Map<String, JobExecutionAPIEntity> result = this.runningJobManager.recoverYarnApp(appId);
+                            if (result.size() > 0) {
+                                if (mrApps == null) {
+                                    mrApps = new HashMap<>();
+                                }
+                                mrApps.put(appId, result);
+                                AppInfo appInfo = result.get(result.keySet().iterator().next()).getAppInfo();
+                                appInfo.setState(Constants.AppState.FINISHED.toString());
+                                apps.add(appInfo);
+                            }
+                        } catch (KeeperException.NoNodeException e) {
+                            LOG.warn("{}", e);
+                            LOG.warn("yarn app {} has finished", appId);
+                        }
+                    }
+                }
+
+                this.runningYarnApps = running;
+                LOG.info("get {} total apps(contains finished)", apps.size());
+            }
+
+            for (int i = 0; i < apps.size(); i++) {
+                LOG.info("emit mr yarn application " + apps.get(i).getId());
+                AppInfo appInfo = apps.get(i);
+                if (mrApps != null && mrApps.containsKey(appInfo.getId())) {
+                    //emit (AppInfo, Map<String, JobExecutionAPIEntity>)
+                    collector.emit(new Values(appInfo.getId(), appInfo, mrApps.get(appInfo.getId())));
+                } else {
+                    collector.emit(new Values(appInfo.getId(), appInfo, null));
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            Utils.sleep(jobExtractorConfig.fetchRunningJobInterval);
+        }
+    }
+
+    private Map<String, Map<String, JobExecutionAPIEntity>> recoverRunningApps() {
+        //we need read from zookeeper, path looks like /apps/mr/running/yarnAppId/jobId/
+        //content of path /apps/mr/running/yarnAppId/jobId is JobExecutionAPIEntity
+        //as we know, a yarn application may contains many mr jobs
+        //so, the returned results is a Map-Map
+        //<yarnAppId, <jobId, JobExecutionAPIEntity>>
+        Map<String, Map<String, JobExecutionAPIEntity>> result = this.runningJobManager.recover();
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "mrJobEntity"));
+    }
+
+    @Override
+    public void fail(Object msgId) {
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
new file mode 100644
index 0000000..92dfbe3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.running.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class MRRunningJobParseBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(MRRunningJobParseBolt.class);
+
+    private MRRunningConfigManager.EndpointConfig endpointConfig;
+    private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+    private MRRunningConfigManager.ZKStateConfig zkStateConfig;
+    private ExecutorService executorService;
+    private Map<String, MRJobParser> runningMRParsers;
+    private transient MRRunningJobManager runningJobManager;
+    private MRRunningConfigManager.EagleServiceConfig eagleServiceConfig;
+    private ResourceFetcher resourceFetcher;
+    private List<String> configKeys;
+    public MRRunningJobParseBolt(MRRunningConfigManager.EagleServiceConfig eagleServiceConfig,
+                                 MRRunningConfigManager.EndpointConfig endpointConfig,
+                                 MRRunningConfigManager.JobExtractorConfig jobExtractorConfig,
+                                 MRRunningConfigManager.ZKStateConfig zkStateConfig,
+                                 List<String> configKeys) {
+        this.eagleServiceConfig = eagleServiceConfig;
+        this.endpointConfig = endpointConfig;
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.runningMRParsers = new HashMap<>();
+        this.zkStateConfig = zkStateConfig;
+        this.configKeys = configKeys;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+        this.executorService = Executors.newFixedThreadPool(jobExtractorConfig.parseJobThreadPoolSize);
+
+        this.runningJobManager = new MRRunningJobManager(zkStateConfig);
+        this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        AppInfo appInfo = (AppInfo)tuple.getValue(1);
+        Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>)tuple.getValue(2);
+
+        LOG.info("get mr yarn application " + appInfo.getId());
+
+        MRJobParser applicationParser;
+        if (!runningMRParsers.containsKey(appInfo.getId())) {
+            applicationParser = new MRJobParser(jobExtractorConfig, eagleServiceConfig,
+                    appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys);
+            runningMRParsers.put(appInfo.getId(), applicationParser);
+            LOG.info("create application parser for {}", appInfo.getId());
+        } else {
+            applicationParser = runningMRParsers.get(appInfo.getId());
+        }
+
+        Set<String> runningParserIds = new HashSet<>(runningMRParsers.keySet());
+        runningParserIds.stream()
+                .filter(appId -> runningMRParsers.get(appId).status() == MRJobParser.ParserStatus.APP_FINISHED)
+                .forEach(appId -> {
+                    runningMRParsers.remove(appId);
+                    LOG.info("remove parser {}", appId);
+                });
+
+        if (appInfo.getState().equals(Constants.AppState.FINISHED.toString()) ||
+                applicationParser.status() == MRJobParser.ParserStatus.FINISHED) {
+            applicationParser.setStatus(MRJobParser.ParserStatus.RUNNING);
+            executorService.execute(applicationParser);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+    }
+}


Mime
View raw message