eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [3/4] incubator-eagle git commit: [EAGLE-467] Job list apis for querying jobs regardless of the status
Date Fri, 19 Aug 2016 09:46:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
new file mode 100644
index 0000000..1445a24
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskFailureCountAPIEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_anomaly")
+@ColumnFamily("f")
+@Prefix("taskfailurecount")
+@Service(Constants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class TaskFailureCountAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private int failureCount;
+    @Column("b")
+    private String error;
+    @Column("c")
+    private String taskStatus;
+
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+
+    public void setTaskStatus(String taskStatus) {
+        this.taskStatus = taskStatus;
+        _pcs.firePropertyChange("taskStatus", null, null);
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+        _pcs.firePropertyChange("error", null, null);
+    }
+
+    public int getFailureCount() {
+        return failureCount;
+    }
+
+    public void setFailureCount(int failureCount) {
+        this.failureCount = failureCount;
+        _pcs.firePropertyChange("failureCount", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
new file mode 100644
index 0000000..8af853a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCountersSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(JobExecutionAPIEntity.class);
+        entitySet.add(TaskExecutionAPIEntity.class);
+        entitySet.add(TaskAttemptExecutionAPIEntity.class);
+        serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+        serDeserMap.put(JobCounters.class, new JobCountersSerDeser());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfig.java
new file mode 100644
index 0000000..6d8145f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfig.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfigSerDeser.java
new file mode 100644
index 0000000..409e84a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobConfigSerDeser.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.eagle.log.entity.meta.MapSerDeser;
+
+import java.util.Map;
+
+public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
+    private static final MapSerDeser INSTANCE = new MapSerDeser();
+
+    @Override
+    public JobConfig deserialize(byte[] bytes) {
+        Map map = INSTANCE.deserialize(bytes);
+        JobConfig config = new JobConfig();
+        config.putAll(map);
+        return config;
+    }
+
+    @Override
+    public byte[] serialize(JobConfig jobConfig) {
+        return INSTANCE.serialize(jobConfig);
+    }
+
+    @Override
+    public Class<JobConfig> type() {
+        return JobConfig.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
new file mode 100644
index 0000000..653f1c9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
@@ -0,0 +1,437 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningJobs")
+@ColumnFamily("f")
+@Prefix("jobs")
+@Service(Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+})
+@Tags({"site", "jobId", "jobName", "jobDefId", "jobType", "user", "queue"})
+public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private long durationTime;
+    @Column("d")
+    private String currentState;
+    @Column("e")
+    private int numTotalMaps;
+    @Column("f")
+    private int mapsCompleted;
+    @Column("g")
+    private int numTotalReduces;
+    @Column("h")
+    private int reducesCompleted;
+    @Column("i")
+    private double mapProgress;
+    @Column("j")
+    private double reduceProgress;
+    @Column("k")
+    private int mapsPending;
+    @Column("l")
+    private int mapsRunning;
+    @Column("m")
+    private int reducesPending;
+    @Column("n")
+    private int reducesRunning;
+    @Column("o")
+    private int newReduceAttempts;
+    @Column("p")
+    private int runningReduceAttempts;
+    @Column("q")
+    private int failedReduceAttempts;
+    @Column("r")
+    private int killedReduceAttempts;
+    @Column("s")
+    private int successfulReduceAttempts;
+    @Column("t")
+    private int newMapAttempts;
+    @Column("u")
+    private int runningMapAttempts;
+    @Column("v")
+    private int failedMapAttempts;
+    @Column("w")
+    private int killedMapAttempts;
+    @Column("x")
+    private int successfulMapAttempts;
+    @Column("y")
+    private AppInfo appInfo;
+    @Column("z")
+    private JobCounters jobCounters;
+    @Column("aa")
+    private JobConfig jobConfig;
+    @Column("ab")
+    private long allocatedMB;
+    @Column("ac")
+    private int allocatedVCores;
+    @Column("ad")
+    private int runningContainers;
+    @Column("ae")
+    private int dataLocalMaps;
+    @Column("af")
+    private double dataLocalMapsPercentage;
+    @Column("ag")
+    private int rackLocalMaps;
+    @Column("ah")
+    private double rackLocalMapsPercentage;
+    @Column("ai")
+    private int totalLaunchedMaps;
+    @Column("aj")
+    private long submissionTime;
+
+    public JobConfig getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(JobConfig jobConfig) {
+        this.jobConfig = jobConfig;
+        valueChanged("jobConfig");
+    }
+
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        valueChanged("jobCounters");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public long getDurationTime() {
+        return durationTime;
+    }
+
+    public void setDurationTime(long durationTime) {
+        this.durationTime = durationTime;
+        valueChanged("durationTime");
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+        valueChanged("currentState");
+    }
+
+    public int getNumTotalMaps() {
+        return numTotalMaps;
+    }
+
+    public void setNumTotalMaps(int numTotalMaps) {
+        this.numTotalMaps = numTotalMaps;
+        valueChanged("numTotalMaps");
+    }
+
+    public int getMapsCompleted() {
+        return mapsCompleted;
+    }
+
+    public void setMapsCompleted(int mapsCompleted) {
+        this.mapsCompleted = mapsCompleted;
+        valueChanged("mapsCompleted");
+    }
+
+    public int getNumTotalReduces() {
+        return numTotalReduces;
+    }
+
+    public void setNumTotalReduces(int numTotalReduces) {
+        this.numTotalReduces = numTotalReduces;
+        valueChanged("numTotalReduces");
+    }
+
+    public int getReducesCompleted() {
+        return reducesCompleted;
+    }
+
+    public void setReducesCompleted(int reducesCompleted) {
+        this.reducesCompleted = reducesCompleted;
+        valueChanged("reducesCompleted");
+    }
+
+    public double getMapProgress() {
+        return mapProgress;
+    }
+
+    public void setMapProgress(double mapProgress) {
+        this.mapProgress = mapProgress;
+        valueChanged("mapProgress");
+    }
+
+    public double getReduceProgress() {
+        return reduceProgress;
+    }
+
+    public void setReduceProgress(double reduceProgress) {
+        this.reduceProgress = reduceProgress;
+        valueChanged("reduceProgress");
+    }
+
+    public int getMapsPending() {
+        return mapsPending;
+    }
+
+    public void setMapsPending(int mapsPending) {
+        this.mapsPending = mapsPending;
+        valueChanged("mapsPending");
+    }
+
+    public int getMapsRunning() {
+        return mapsRunning;
+    }
+
+    public void setMapsRunning(int mapsRunning) {
+        this.mapsRunning = mapsRunning;
+        valueChanged("mapsRunning");
+    }
+
+    public int getReducesPending() {
+        return reducesPending;
+    }
+
+    public void setReducesPending(int reducesPending) {
+        this.reducesPending = reducesPending;
+        valueChanged("reducesPending");
+    }
+
+    public int getReducesRunning() {
+        return reducesRunning;
+    }
+
+    public void setReducesRunning(int reducesRunning) {
+        this.reducesRunning = reducesRunning;
+        valueChanged("reducesRunning");
+    }
+
+    public int getNewReduceAttempts() {
+        return newReduceAttempts;
+    }
+
+    public void setNewReduceAttempts(int newReduceAttempts) {
+        this.newReduceAttempts = newReduceAttempts;
+        valueChanged("newReduceAttempts");
+    }
+
+    public int getRunningReduceAttempts() {
+        return runningReduceAttempts;
+    }
+
+    public void setRunningReduceAttempts(int runningReduceAttempts) {
+        this.runningReduceAttempts = runningReduceAttempts;
+        valueChanged("runningReduceAttempts");
+    }
+
+    public int getFailedReduceAttempts() {
+        return failedReduceAttempts;
+    }
+
+    public void setFailedReduceAttempts(int failedReduceAttempts) {
+        this.failedReduceAttempts = failedReduceAttempts;
+        valueChanged("failedReduceAttempts");
+    }
+
+    public int getKilledReduceAttempts() {
+        return killedReduceAttempts;
+    }
+
+    public void setKilledReduceAttempts(int killedReduceAttempts) {
+        this.killedReduceAttempts = killedReduceAttempts;
+        valueChanged("killedReduceAttempts");
+    }
+
+    public int getSuccessfulReduceAttempts() {
+        return successfulReduceAttempts;
+    }
+
+    public void setSuccessfulReduceAttempts(int successfulReduceAttempts) {
+        this.successfulReduceAttempts = successfulReduceAttempts;
+        valueChanged("successfulReduceAttempts");
+    }
+
+    public int getNewMapAttempts() {
+        return newMapAttempts;
+    }
+
+    public void setNewMapAttempts(int newMapAttempts) {
+        this.newMapAttempts = newMapAttempts;
+        valueChanged("newMapAttempts");
+    }
+
+    public int getRunningMapAttempts() {
+        return runningMapAttempts;
+    }
+
+    public void setRunningMapAttempts(int runningMapAttempts) {
+        this.runningMapAttempts = runningMapAttempts;
+        valueChanged("runningMapAttempts");
+    }
+
+    public int getFailedMapAttempts() {
+        return failedMapAttempts;
+    }
+
+    public void setFailedMapAttempts(int failedMapAttempts) {
+        this.failedMapAttempts = failedMapAttempts;
+        valueChanged("failedMapAttempts");
+    }
+
+    public int getKilledMapAttempts() {
+        return killedMapAttempts;
+    }
+
+    public void setKilledMapAttempts(int killedMapAttempts) {
+        this.killedMapAttempts = killedMapAttempts;
+        valueChanged("killedMapAttempts");
+    }
+
+    public int getSuccessfulMapAttempts() {
+        return successfulMapAttempts;
+    }
+
+    public void setSuccessfulMapAttempts(int successfulMapAttempts) {
+        this.successfulMapAttempts = successfulMapAttempts;
+        valueChanged("successfulMapAttempts");
+    }
+
+    public AppInfo getAppInfo() {
+        return appInfo;
+    }
+
+    public void setAppInfo(AppInfo appInfo) {
+        this.appInfo = appInfo;
+        valueChanged("appInfo");
+    }
+
+    public long getAllocatedMB() {
+        return allocatedMB;
+    }
+
+    public void setAllocatedMB(long allocatedMB) {
+        this.allocatedMB = allocatedMB;
+        valueChanged("allocatedMB");
+    }
+
+    public int getAllocatedVCores() {
+        return allocatedVCores;
+    }
+
+    public void setAllocatedVCores(int allocatedVCores) {
+        this.allocatedVCores = allocatedVCores;
+        valueChanged("allocatedVCores");
+    }
+
+    public int getRunningContainers() {
+        return runningContainers;
+    }
+
+    public void setRunningContainers(int runningContainers) {
+        this.runningContainers = runningContainers;
+        valueChanged("runningContainers");
+    }
+
+    public int getDataLocalMaps() {
+        return dataLocalMaps;
+    }
+
+    public void setDataLocalMaps(int dataLocalMaps) {
+        this.dataLocalMaps = dataLocalMaps;
+        valueChanged("dataLocalMaps");
+    }
+
+    public double getDataLocalMapsPercentage() {
+        return dataLocalMapsPercentage;
+    }
+
+    public void setDataLocalMapsPercentage(double dataLocalMapsPercentage) {
+        this.dataLocalMapsPercentage = dataLocalMapsPercentage;
+        valueChanged("dataLocalMapsPercentage");
+    }
+
+    public int getRackLocalMaps() {
+        return rackLocalMaps;
+    }
+
+    public void setRackLocalMaps(int rackLocalMaps) {
+        this.rackLocalMaps = rackLocalMaps;
+        valueChanged("rackLocalMaps");
+    }
+
+    public double getRackLocalMapsPercentage() {
+        return rackLocalMapsPercentage;
+    }
+
+    public void setRackLocalMapsPercentage(double rackLocalMapsPercentage) {
+        this.rackLocalMapsPercentage = rackLocalMapsPercentage;
+        valueChanged("rackLocalMapsPercentage");
+    }
+
+    public int getTotalLaunchedMaps() {
+        return totalLaunchedMaps;
+    }
+
+    public void setTotalLaunchedMaps(int totalLaunchedMaps) {
+        this.totalLaunchedMaps = totalLaunchedMaps;
+        valueChanged("totalLaunchedMaps");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        valueChanged("submissionTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
new file mode 100644
index 0000000..11a8b4c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskAttemptExecutionAPIEntity.java
@@ -0,0 +1,137 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningTasks")
+@ColumnFamily("f")
+@Prefix("tasks_exec_attempt")
+@Service(Constants.JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false)
+})
+@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "host", "rack"})
+public class TaskAttemptExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long finishTime;
+    @Column("c")
+    private long elapsedTime;
+    @Column("d")
+    private double progress;
+    @Column("e")
+    private String id;
+    @Column("f")
+    private String status;
+    @Column("g")
+    private String diagnostics;
+    @Column("h")
+    private String type;
+    @Column("i")
+    private String assignedContainerId;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getFinishTime() {
+        return finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+        valueChanged("finishTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+        valueChanged("progress");
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+        valueChanged("id");
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public String getDiagnostics() {
+        return diagnostics;
+    }
+
+    public void setDiagnostics(String diagnostics) {
+        this.diagnostics = diagnostics;
+        valueChanged("diagnostics");
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+        valueChanged("type");
+    }
+
+    public String getAssignedContainerId() {
+        return assignedContainerId;
+    }
+
+    public void setAssignedContainerId(String assignedContainerId) {
+        this.assignedContainerId = assignedContainerId;
+        valueChanged("assignedContainerId");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
new file mode 100644
index 0000000..50e042f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/TaskExecutionAPIEntity.java
@@ -0,0 +1,127 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagleMRRunningTasks")
+@ColumnFamily("f")
+@Prefix("tasks_exec")
+@Service(Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name="Index_1_jobId", columns = { "jobId" }, unique = false)
+})
+@Tags({"site", "jobId", "JobName", "jobDefId", "jobType", "taskType", "taskId", "user", "queue", "hostname"})
+public class TaskExecutionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private long duration;
+    @Column("d")
+    private double progress;
+    @Column("e")
+    private String taskStatus;
+    @Column("f")
+    private String successfulAttempt;
+    @Column("g")
+    private String statusDesc;
+    @Column("h")
+    private JobCounters jobCounters;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public long getDuration() {
+        return duration;
+    }
+
+    public void setDuration(long duration) {
+        this.duration = duration;
+        valueChanged("duration");
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+        valueChanged("progress");
+    }
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+
+    public void setTaskStatus(String taskStatus) {
+        this.taskStatus = taskStatus;
+        valueChanged("taskStatus");
+    }
+
+    public String getSuccessfulAttempt() {
+        return successfulAttempt;
+    }
+
+    public void setSuccessfulAttempt(String successfulAttempt) {
+        this.successfulAttempt = successfulAttempt;
+        valueChanged("successfulAttempt");
+    }
+
+    public String getStatusDesc() {
+        return statusDesc;
+    }
+
+    public void setStatusDesc(String statusDesc) {
+        this.statusDesc = statusDesc;
+        valueChanged("statusDesc");
+    }
+
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        valueChanged("jobCounters");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index d5fda5a..1b75e81 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -19,8 +19,8 @@ package org.apache.eagle.jpm.spark.crawl;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import jline.internal.Log;
-import org.apache.eagle.jpm.entity.*;
+import org.apache.eagle.jpm.spark.entity.JobConfig;
+import org.apache.eagle.jpm.spark.entity.*;
 import org.apache.eagle.jpm.util.JSONUtil;
 import org.apache.eagle.jpm.util.JobNameNormalization;
 import org.apache.eagle.jpm.util.SparkEntityConstant;
@@ -673,7 +673,7 @@ public class JHFSparkEventReader {
             int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
             return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
         }
-        Log.info("Cannot parse memory info " +  memory);
+        LOG.info("Cannot parse memory info " +  memory);
         return 0l;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JPMEntityRepository.java
new file mode 100644
index 0000000..3cb57f5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JPMEntityRepository.java
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(SparkApp.class);
+        entitySet.add(SparkJob.class);
+        entitySet.add(SparkStage.class);
+        entitySet.add(SparkTask.class);
+        entitySet.add(SparkExecutor.class);
+        serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
new file mode 100644
index 0000000..11c4a22
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfig.java
@@ -0,0 +1,39 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class JobConfig implements Serializable {
+    private Map<String, String> config = new TreeMap<>();
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+    @Override
+    public String toString(){
+        return config.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfigSerDeser.java
new file mode 100644
index 0000000..69f0f0d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/JobConfigSerDeser.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.eagle.log.entity.meta.MapSerDeser;
+
+import java.util.Map;
+
+public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
+    private static final MapSerDeser INSTANCE = new MapSerDeser();
+
+    @Override
+    public JobConfig deserialize(byte[] bytes) {
+        Map map = INSTANCE.deserialize(bytes);
+        JobConfig config = new JobConfig();
+        config.setConfig(map);
+        return config;
+    }
+
+    @Override
+    public byte[] serialize(JobConfig jobConfig) {
+        return INSTANCE.serialize(jobConfig.getConfig());
+    }
+
+    @Override
+    public Class<JobConfig> type() {
+        return JobConfig.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
new file mode 100644
index 0000000..528a91f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkApp.java
@@ -0,0 +1,429 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_apps")
+@ColumnFamily("f")
+@Prefix("sprkapp")
+@Service(Constants.SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"})
+@Partition({"site"})
+public class SparkApp extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private long  startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private String yarnState;
+    @Column("d")
+    private String yarnStatus;
+    @Column("e")
+    private JobConfig config;
+    @Column("f")
+    private int numJobs;
+    @Column("g")
+    private int totalStages;
+    @Column("h")
+    private int skippedStages;
+    @Column("i")
+    private int failedStages;
+    @Column("j")
+    private int totalTasks;
+    @Column("k")
+    private int skippedTasks;
+    @Column("l")
+    private int failedTasks;
+    @Column("m")
+    private int executors;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private long executorDeserializeTime;
+    @Column("w")
+    private long executorRunTime;
+    @Column("x")
+    private long resultSize;
+    @Column("y")
+    private long jvmGcTime;
+    @Column("z")
+    private long resultSerializationTime;
+    @Column("ab")
+    private long memoryBytesSpilled;
+    @Column("ac")
+    private long diskBytesSpilled;
+    @Column("ad")
+    private long execMemoryBytes;
+    @Column("ae")
+    private long driveMemoryBytes;
+    @Column("af")
+    private int completeTasks;
+    @Column("ag")
+    private long totalExecutorTime;
+    @Column("ah")
+    private long executorMemoryOverhead;
+    @Column("ai")
+    private long driverMemoryOverhead;
+    @Column("aj")
+    private int executorCores;
+    @Column("ak")
+    private int driverCores;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public String getYarnState() {
+        return yarnState;
+    }
+
+    public String getYarnStatus() {
+        return yarnStatus;
+    }
+
+    public int getNumJobs() {
+        return numJobs;
+    }
+
+    public int getTotalStages() {
+        return totalStages;
+    }
+
+    public int getSkippedStages() {
+        return skippedStages;
+    }
+
+    public int getFailedStages() {
+        return failedStages;
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public int getSkippedTasks() {
+        return skippedTasks;
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public int getExecutors() {
+        return executors;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public long getDriveMemoryBytes() {
+        return driveMemoryBytes;
+    }
+
+    public int getCompleteTasks(){ return completeTasks;}
+
+    public JobConfig getConfig() {
+        return config;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public void setYarnState(String yarnState) {
+        this.yarnState = yarnState;
+        valueChanged("yarnState");
+    }
+
+    public void setYarnStatus(String yarnStatus) {
+        this.yarnStatus = yarnStatus;
+        valueChanged("yarnStatus");
+    }
+
+    public void setConfig(JobConfig config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public void setNumJobs(int numJobs) {
+        this.numJobs = numJobs;
+        valueChanged("numJobs");
+    }
+
+    public void setTotalStages(int totalStages) {
+        this.totalStages = totalStages;
+        valueChanged("totalStages");
+    }
+
+    public void setSkippedStages(int skippedStages) {
+        this.skippedStages = skippedStages;
+        valueChanged("skippedStages");
+    }
+
+    public void setFailedStages(int failedStages) {
+        this.failedStages = failedStages;
+        valueChanged("failedStages");
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        valueChanged("totalTasks");
+    }
+
+    public void setSkippedTasks(int skippedTasks) {
+        this.skippedTasks = skippedTasks;
+        valueChanged("skippedTasks");
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
+
+    public void setExecutors(int executors) {
+        this.executors = executors;
+        valueChanged("executors");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadBytes = shuffleReadRemoteBytes;
+        valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        valueChanged("shuffleWriteRecords");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        valueChanged("diskBytesSpilled");
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        valueChanged("execMemoryBytes");
+    }
+
+    public void setDriveMemoryBytes(long driveMemoryBytes) {
+        this.driveMemoryBytes = driveMemoryBytes;
+        valueChanged("driveMemoryBytes");
+    }
+
+    public void setCompleteTasks(int completeTasks){
+        this.completeTasks = completeTasks;
+        valueChanged("completeTasks");
+    }
+
+    public long getTotalExecutorTime() {
+        return totalExecutorTime;
+    }
+
+    public void setTotalExecutorTime(long totalExecutorTime) {
+        this.totalExecutorTime = totalExecutorTime;
+        valueChanged("totalExecutorTime");
+    }
+
+    public long getExecutorMemoryOverhead() {
+        return executorMemoryOverhead;
+    }
+
+    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+        this.executorMemoryOverhead = executorMemoryOverhead;
+        valueChanged("executorMemoryOverhead");
+    }
+
+    public long getDriverMemoryOverhead() {
+        return driverMemoryOverhead;
+    }
+
+    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+        this.driverMemoryOverhead = driverMemoryOverhead;
+        valueChanged("driverMemoryOverhead");
+    }
+
+    public int getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+        valueChanged("executorCores");
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+        valueChanged("driverCores");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
new file mode 100644
index 0000000..366e4aa
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkExecutor.java
@@ -0,0 +1,234 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_executors")
+@ColumnFamily("f")
+@Prefix("sprkexcutr")
+@Service(Constants.SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutor extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private String hostPort;
+    @Column("b")
+    private int rddBlocks;
+    @Column("c")
+    private long memoryUsed;
+    @Column("d")
+    private long diskUsed;
+    @Column("e")
+    private int activeTasks = 0;
+    @Column("f")
+    private int failedTasks = 0;
+    @Column("g")
+    private int completedTasks = 0;
+    @Column("h")
+    private int totalTasks = 0;
+    @Column("i")
+    private long totalDuration = 0;
+    @Column("j")
+    private long totalInputBytes = 0;
+    @Column("k")
+    private long totalShuffleRead = 0;
+    @Column("l")
+    private long totalShuffleWrite = 0;
+    @Column("m")
+    private long maxMemory;
+    @Column("n")
+    private long startTime;
+    @Column("o")
+    private long endTime = 0;
+    @Column("p")
+    private long execMemoryBytes;
+    @Column("q")
+    private int cores;
+    @Column("r")
+    private long memoryOverhead;
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.valueChanged("hostPort");
+    }
+
+    public int getRddBlocks() {
+        return rddBlocks;
+    }
+
+    public void setRddBlocks(int rddBlocks) {
+        this.rddBlocks = rddBlocks;
+        this.valueChanged("rddBlocks");
+    }
+
+    public long getMemoryUsed() {
+        return memoryUsed;
+    }
+
+    public void setMemoryUsed(long memoryUsed) {
+        this.memoryUsed = memoryUsed;
+        this.valueChanged("memoryUsed");
+    }
+
+    public long getDiskUsed() {
+        return diskUsed;
+    }
+
+    public void setDiskUsed(long diskUsed) {
+        this.diskUsed = diskUsed;
+        this.valueChanged("diskUsed");
+    }
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        this.valueChanged("activeTasks");
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        this.valueChanged("failedTasks");
+    }
+
+    public int getCompletedTasks() {
+        return completedTasks;
+    }
+
+    public void setCompletedTasks(int completedTasks) {
+        this.completedTasks = completedTasks;
+        this.valueChanged("completedTasks");
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        this.valueChanged("totalTasks");
+    }
+
+    public long getTotalDuration() {
+        return totalDuration;
+    }
+
+    public void setTotalDuration(long totalDuration) {
+        this.totalDuration = totalDuration;
+        this.valueChanged("totalDuration");
+    }
+
+    public long getTotalInputBytes() {
+        return totalInputBytes;
+    }
+
+    public void setTotalInputBytes(long totalInputBytes) {
+        this.totalInputBytes = totalInputBytes;
+        this.valueChanged("totalInputBytes");
+    }
+
+    public long getTotalShuffleRead() {
+        return totalShuffleRead;
+    }
+
+    public void setTotalShuffleRead(long totalShuffleRead) {
+        this.totalShuffleRead = totalShuffleRead;
+        this.valueChanged("totalShuffleRead");
+    }
+
+    public long getTotalShuffleWrite() {
+        return totalShuffleWrite;
+    }
+
+    public void setTotalShuffleWrite(long totalShuffleWrite) {
+        this.totalShuffleWrite = totalShuffleWrite;
+        this.valueChanged("totalShuffleWrite");
+    }
+
+    public long getMaxMemory() {
+        return maxMemory;
+    }
+
+    public void setMaxMemory(long maxMemory) {
+        this.maxMemory = maxMemory;
+        this.valueChanged("maxMemory");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        this.valueChanged("endTime");
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        this.valueChanged("execMemoryBytes");
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+    public void setCores(int cores) {
+        this.cores = cores;
+        valueChanged("cores");
+    }
+
+    public long getMemoryOverhead() {
+        return memoryOverhead;
+    }
+
+    public void setMemoryOverhead(long memoryOverhead) {
+        this.memoryOverhead = memoryOverhead;
+        valueChanged("memoryOverhead");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
new file mode 100644
index 0000000..acecb3a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkJob.java
@@ -0,0 +1,179 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_jobs")
+@ColumnFamily("f")
+@Prefix("sprkjob")
+@Service(Constants.SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJob extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private long  submissionTime;
+    @Column("b")
+    private long completionTime;
+    @Column("c")
+    private int numStages=0;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int numTask=0;
+    @Column("f")
+    private int numActiveTasks=0;
+    @Column("g")
+    private int numCompletedTasks=0;
+    @Column("h")
+    private int numSkippedTasks=0;
+    @Column("i")
+    private int numFailedTasks=0;
+    @Column("j")
+    private int numActiveStages=0;
+    @Column("k")
+    private int numCompletedStages=0;
+    @Column("l")
+    private int numSkippedStages=0;
+    @Column("m")
+    private int numFailedStages=0;
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public long getCompletionTime() {
+        return completionTime;
+    }
+
+    public int getNumStages() {
+        return numStages;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumTask() {
+        return numTask;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumSkippedTasks() {
+        return numSkippedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public int getNumActiveStages() {
+        return numActiveStages;
+    }
+
+    public int getNumCompletedStages() {
+        return numCompletedStages;
+    }
+
+    public int getNumSkippedStages() {
+        return numSkippedStages;
+    }
+
+    public int getNumFailedStages() {
+        return numFailedStages;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        this.valueChanged("submissionTime");
+    }
+
+    public void setCompletionTime(long completionTime) {
+        this.completionTime = completionTime;
+        this.valueChanged("completionTime");
+    }
+
+    public void setNumStages(int numStages) {
+        this.numStages = numStages;
+        this.valueChanged("numStages");
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumTask(int numTask) {
+        this.numTask = numTask;
+        this.valueChanged("numTask");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumSkippedTasks(int numSkippedTasks) {
+        this.numSkippedTasks = numSkippedTasks;
+        this.valueChanged("numSkippedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setNumActiveStages(int numActiveStages) {
+        this.numActiveStages = numActiveStages;
+        this.valueChanged("numActiveStages");
+    }
+
+    public void setNumCompletedStages(int numCompletedStages) {
+        this.numCompletedStages = numCompletedStages;
+        this.valueChanged("numCompletedStages");
+    }
+
+    public void setNumSkippedStages(int numSkippedStages) {
+        this.numSkippedStages = numSkippedStages;
+        this.valueChanged("numSkippedStages");
+    }
+
+    public void setNumFailedStages(int numFailedStages) {
+        this.numFailedStages = numFailedStages;
+        this.valueChanged("numFailedStages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
new file mode 100644
index 0000000..fcca889
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkStage.java
@@ -0,0 +1,300 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_stages")
+@ColumnFamily("f")
+@Prefix("sprkstage")
+@Service(Constants.SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStage extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private String status;
+    @Column("b")
+    private int numActiveTasks=0;
+    @Column("c")
+    private int numCompletedTasks=0;
+    @Column("d")
+    private int numFailedTasks=0;
+    @Column("e")
+    private long executorRunTime=0l;
+    @Column("f")
+    private long inputBytes=0l;
+    @Column("g")
+    private long inputRecords=0l;
+    @Column("h")
+    private long outputBytes=0l;
+    @Column("i")
+    private long outputRecords=0l;
+    @Column("j")
+    private long shuffleReadBytes=0l;
+    @Column("k")
+    private long shuffleReadRecords=0l;
+    @Column("l")
+    private long shuffleWriteBytes=0l;
+    @Column("m")
+    private long shuffleWriteRecords=0l;
+    @Column("n")
+    private long memoryBytesSpilled=0l;
+    @Column("o")
+    private long diskBytesSpilled=0l;
+    @Column("p")
+    private String name;
+    @Column("q")
+    private String schedulingPool;
+    @Column("r")
+    private long submitTime;
+    @Column("s")
+    private long completeTime;
+    @Column("t")
+    private int numTasks;
+    @Column("u")
+    private long executorDeserializeTime;
+    @Column("v")
+    private long resultSize;
+    @Column("w")
+    private long jvmGcTime;
+    @Column("x")
+    private long resultSerializationTime;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSchedulingPool() {
+        return schedulingPool;
+    }
+
+    public long getSubmitTime() {
+        return submitTime;
+    }
+
+    public long getCompleteTime() {
+        return completeTime;
+    }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadBytes) {
+        this.shuffleReadBytes = shuffleReadBytes;
+        this.valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setName(String name) {
+        this.name = name;
+        this.valueChanged("name");
+    }
+
+    public void setSchedulingPool(String schedulingPool) {
+        this.schedulingPool = schedulingPool;
+        this.valueChanged("schedulingPool");
+    }
+
+    public void setSubmitTime(long submitTime) {
+        this.submitTime = submitTime;
+        this.valueChanged("submitTime");
+    }
+
+    public void setCompleteTime(long completeTime) {
+        this.completeTime = completeTime;
+        this.valueChanged("completeTime");
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+        valueChanged("numTasks");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
new file mode 100644
index 0000000..6ef7c69
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
@@ -0,0 +1,291 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_tasks")
+@ColumnFamily("f")
+@Prefix("sprktask")
+@Service(Constants.SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTask extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private int taskId;
+    @Column("b")
+    private long launchTime;
+    @Column("c")
+    private String executorId;
+    @Column("d")
+    private String host;
+    @Column("e")
+    private String taskLocality;
+    @Column("f")
+    private boolean speculative;
+    @Column("g")
+    private long executorDeserializeTime;
+    @Column("h")
+    private long executorRunTime;
+    @Column("i")
+    private long resultSize;
+    @Column("j")
+    private long jvmGcTime;
+    @Column("k")
+    private long resultSerializationTime;
+    @Column("l")
+    private long memoryBytesSpilled;
+    @Column("m")
+    private long diskBytesSpilled;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadRemoteBytes;
+    @Column("x")
+    private long shuffleReadLocalBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private boolean failed;
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public long getLaunchTime() {
+        return launchTime;
+    }
+
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getTaskLocality() {
+        return taskLocality;
+    }
+
+    public boolean isSpeculative() {
+        return speculative;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+    public long getShuffleReadRemoteBytes() {
+        return shuffleReadRemoteBytes;
+    }
+
+    public long getShuffleReadLocalBytes() {
+        return shuffleReadLocalBytes;
+    }
+
+    public void setFailed(boolean failed) {
+        this.failed = failed;
+        valueChanged("failed");
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+        valueChanged("taskId");
+    }
+
+    public void setLaunchTime(long launchTime) {
+        this.launchTime = launchTime;
+        valueChanged("launchTime");
+    }
+
+    public void setExecutorId(String executorId) {
+        this.executorId = executorId;
+        valueChanged("executorId");
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        this.valueChanged("host");
+    }
+
+    public void setTaskLocality(String taskLocality) {
+        this.taskLocality = taskLocality;
+        this.valueChanged("taskLocality");
+    }
+
+    public void setSpeculative(boolean speculative) {
+        this.speculative = speculative;
+        this.valueChanged("speculative");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        this.valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        this.valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        this.valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        this.valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+        this.valueChanged("shuffleReadRemoteBytes");
+    }
+
+    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+        this.valueChanged("shuffleReadLocalBytes");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 97be7ec..5568284 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -43,6 +43,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-entity</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-stream-process-api</artifactId>
             <version>${project.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index c6f1b98..9f030a7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -40,12 +40,15 @@ public class MRHistoryJobMain {
 
             //2. init JobHistoryContentFilter
             JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
-            List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys");
+            List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys.jobConfigKey");
             confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
             confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
             confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
             confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
 
+            String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey");
+            builder.setJobNameKey(jobNameKey);
+
             for (String key : confKeyPatterns) {
                 builder.includeJobKeyPatterns(Pattern.compile(key));
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
index 6fbf3d3..66dbce1 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
@@ -22,15 +22,11 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.regex.Pattern;
 
-/**
- * define what content in job history stream should be streamed
- * @author yonzhang
- *
- */
 public interface JobHistoryContentFilter extends Serializable {
     boolean acceptJobFile();
     boolean acceptJobConfFile();
     List<Pattern> getMustHaveJobConfKeyPatterns();
     List<Pattern> getJobConfKeyInclusionPatterns();
     List<Pattern> getJobConfKeyExclusionPatterns();
+    String getJobNameKey();
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
index 43234c2..65b8dab 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
@@ -35,6 +35,8 @@ public class JobHistoryContentFilterBuilder {
     private List<Pattern> m_jobConfKeyInclusionPatterns;
     private List<Pattern> m_jobConfKeyExclusionPatterns;
 
+    private String jobNameKey;
+
     public static JobHistoryContentFilterBuilder newBuilder(){
         return new JobHistoryContentFilterBuilder();
     }
@@ -78,6 +80,11 @@ public class JobHistoryContentFilterBuilder {
         return this;
     }
 
+    public JobHistoryContentFilterBuilder setJobNameKey(String jobNameKey) {
+        this.jobNameKey = jobNameKey;
+        return this;
+    }
+
     public JobHistoryContentFilter build() {
         JobHistoryContentFilterImpl filter = new JobHistoryContentFilterImpl();
         filter.setAcceptJobFile(m_acceptJobFile);
@@ -85,6 +92,7 @@ public class JobHistoryContentFilterBuilder {
         filter.setMustHaveJobConfKeyPatterns(m_mustHaveJobConfKeyPatterns);
         filter.setJobConfKeyInclusionPatterns(m_jobConfKeyInclusionPatterns);
         filter.setJobConfKeyExclusionPatterns(m_jobConfKeyExclusionPatterns);
+        filter.setJobNameKey(jobNameKey);
         LOG.info("job history content filter:" + filter);
         return filter;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a20656b5/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
index d8a482b..5e7a856 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
@@ -28,6 +28,8 @@ public class JobHistoryContentFilterImpl implements JobHistoryContentFilter {
     private List<Pattern> m_jobConfKeyInclusionPatterns;
     private List<Pattern> m_jobConfKeyExclusionPatterns;
 
+    private String jobNameKey;
+
     @Override
     public boolean acceptJobFile() {
         return m_acceptJobFile;
@@ -53,6 +55,15 @@ public class JobHistoryContentFilterImpl implements JobHistoryContentFilter {
         return m_jobConfKeyExclusionPatterns;
     }
 
+    @Override
+    public String getJobNameKey() {
+        return jobNameKey;
+    }
+
+    public void setJobNameKey(String jobNameKey) {
+        this.jobNameKey = jobNameKey;
+    }
+
     public void setAcceptJobFile(boolean acceptJobFile) {
         this.m_acceptJobFile = acceptJobFile;
     }



Mime
View raw message