eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [6/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring
Date Tue, 05 Jul 2016 18:07:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
new file mode 100755
index 0000000..2684899
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobExecutionAPIEntity.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jexec")
+@Service(JPAConstants.JPA_JOB_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+    @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true),
+    @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false)
+    })
+public class JobExecutionAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private String currentState;
+    @Column("b")
+    private long startTime;
+    @Column("c")
+    private long endTime;
+    @Column("d")
+    private int numTotalMaps;
+    @Column("e")
+    private int numFailedMaps;
+    @Column("f")
+    private int numFinishedMaps;
+    @Column("g")
+    private int numTotalReduces;
+    @Column("h")
+    private int numFailedReduces;
+    @Column("i")
+    private int numFinishedReduces;
+    @Column("j")
+    private JobCounters jobCounters;
+
+    public String getCurrentState() {
+        return currentState;
+    }
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+        _pcs.firePropertyChange("currentState", null, null);
+    }
+    public long getStartTime() {
+        return startTime;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        _pcs.firePropertyChange("startTime", null, null);
+    }
+    public long getEndTime() {
+        return endTime;
+    }
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        _pcs.firePropertyChange("endTime", null, null);
+    }
+    public int getNumTotalMaps() {
+        return numTotalMaps;
+    }
+    public void setNumTotalMaps(int numTotalMaps) {
+        this.numTotalMaps = numTotalMaps;
+        _pcs.firePropertyChange("numTotalMaps", null, null);
+    }
+    public int getNumFailedMaps() {
+        return numFailedMaps;
+    }
+    public void setNumFailedMaps(int numFailedMaps) {
+        this.numFailedMaps = numFailedMaps;
+        _pcs.firePropertyChange("numFailedMaps", null, null);
+    }
+    public int getNumFinishedMaps() {
+        return numFinishedMaps;
+    }
+    public void setNumFinishedMaps(int numFinishedMaps) {
+        this.numFinishedMaps = numFinishedMaps;
+        _pcs.firePropertyChange("numFinishedMaps", null, null);
+    }
+    public int getNumTotalReduces() {
+        return numTotalReduces;
+    }
+    public void setNumTotalReduces(int numTotalReduces) {
+        this.numTotalReduces = numTotalReduces;
+        _pcs.firePropertyChange("numTotalReduces", null, null);
+    }
+    public int getNumFailedReduces() {
+        return numFailedReduces;
+    }
+    public void setNumFailedReduces(int numFailedReduces) {
+        this.numFailedReduces = numFailedReduces;
+        _pcs.firePropertyChange("numFailedReduces", null, null);
+    }
+    public int getNumFinishedReduces() {
+        return numFinishedReduces;
+    }
+
+    public void setNumFinishedReduces(int numFinishedReduces) {
+        this.numFinishedReduces = numFinishedReduces;
+        _pcs.firePropertyChange("numFinishedReduces", null, null);
+    }
+
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        _pcs.firePropertyChange("jobCounters", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
new file mode 100644
index 0000000..2400c55
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobProcessTimeStampEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_process")
+@ColumnFamily("f")
+@Prefix("process")
+@Service(JPAConstants.JPA_JOB_PROCESS_TIME_STAMP_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class JobProcessTimeStampEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long currentTimeStamp;
+
+    public long getCurrentTimeStamp() {
+        return currentTimeStamp;
+    }
+    public void setCurrentTimeStamp(long currentTimeStamp) {
+        this.currentTimeStamp = currentTimeStamp;
+        _pcs.firePropertyChange("currentTimeStamp", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
new file mode 100644
index 0000000..9769620
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptCounterAPIEntity.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_anomaly")
+@ColumnFamily("f")
+@Prefix("tacount")
+@Service(JPAConstants.JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class TaskAttemptCounterAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private int totalCount;
+    @Column("b")
+    private int failedCount;
+    @Column("c")
+    private int killedCount;
+    
+    public int getKilledCount() {
+        return killedCount;
+    }
+    public void setKilledCount(int killedCount) {
+        this.killedCount = killedCount;
+        _pcs.firePropertyChange("killedCount", null, null);
+    }
+    public int getFailedCount() {
+        return failedCount;
+    }
+    public void setFailedCount(int failedCount) {
+        this.failedCount = failedCount;
+        _pcs.firePropertyChange("failedCount", null, null);
+    }
+    public int getTotalCount() {
+        return totalCount;
+    }
+    public void setTotalCount(int totalCount) {
+        this.totalCount = totalCount;
+        _pcs.firePropertyChange("totalCount", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
new file mode 100755
index 0000000..77994a5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskAttemptExecutionAPIEntity.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_task")
+@ColumnFamily("f")
+@Prefix("taexec")
+@Service(JPAConstants.JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+    @Index(name="Index_1_jobId", columns = { "jobID" }, unique = false)
+    })
+public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private String taskStatus;
+    @Column("b")
+    private long startTime;
+    @Column("c")
+    private long endTime;
+    @Column("d")
+    private long duration;
+    @Column("e")
+    private String error;
+    @Column("f")
+    private JobCounters jobCounters;
+    @Column("g")
+    private String taskAttemptID;
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+    public void setTaskStatus(String taskStatus) {
+        this.taskStatus = taskStatus;
+        _pcs.firePropertyChange("taskStatus", null, null);
+    }
+    public long getStartTime() {
+        return startTime;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        _pcs.firePropertyChange("startTime", null, null);
+    }
+    public long getEndTime() {
+        return endTime;
+    }
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        _pcs.firePropertyChange("endTime", null, null);
+    }
+    public long getDuration() {
+        return duration;
+    }
+    public void setDuration(long duration) {
+        this.duration = duration;
+        _pcs.firePropertyChange("duration", null, null);
+    }
+    public String getError() {
+        return error;
+    }
+    public void setError(String error) {
+        this.error = error;
+        _pcs.firePropertyChange("error", null, null);
+    }
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        _pcs.firePropertyChange("jobCounters", null, null);
+    }
+    public String getTaskAttemptID() {
+        return taskAttemptID;
+    }
+    public void setTaskAttemptID(String taskAttemptID) {
+        this.taskAttemptID = taskAttemptID;
+        _pcs.firePropertyChange("taskAttemptID", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
new file mode 100644
index 0000000..f287688
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskExecutionAPIEntity.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_task")
+@ColumnFamily("f")
+@Prefix("texec")
+@Service(JPAConstants.JPA_TASK_EXECUTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class TaskExecutionAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private String taskStatus;
+    @Column("b")
+    private long startTime;
+    @Column("c")
+    private long endTime;
+    @Column("d")
+    private long duration;
+    @Column("e")
+    private String error;
+    @Column("f")
+    private JobCounters jobCounters;
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+    public void setTaskStatus(String taskStatus) {
+        this.taskStatus = taskStatus;
+        _pcs.firePropertyChange("taskStatus", null, null);
+    }
+    public long getStartTime() {
+        return startTime;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        _pcs.firePropertyChange("startTime", null, null);
+    }
+    public long getEndTime() {
+        return endTime;
+    }
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        _pcs.firePropertyChange("endTime", null, null);
+    }
+    public long getDuration() {
+        return duration;
+    }
+    public void setDuration(long duration) {
+        this.duration = duration;
+        _pcs.firePropertyChange("duration", null, null);
+    }
+    public String getError() {
+        return error;
+    }
+    public void setError(String error) {
+        this.error = error;
+        _pcs.firePropertyChange("error", null, null);
+    }
+    public JobCounters getJobCounters() {
+        return jobCounters;
+    }
+    public void setJobCounters(JobCounters jobCounters) {
+        this.jobCounters = jobCounters;
+        _pcs.firePropertyChange("jobCounters", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
new file mode 100755
index 0000000..5ae67c0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/TaskFailureCountAPIEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa_anomaly")
+@ColumnFamily("f")
+@Prefix("taskfailurecount")
+@Service(JPAConstants.JPA_TASK_FAILURE_COUNT_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class TaskFailureCountAPIEntity extends JobBaseAPIEntity {
+    @Column("a")
+    private int failureCount;
+    @Column("b")
+    private String error;
+    @Column("c")
+    private String taskStatus;
+
+
+    public String getTaskStatus() {
+        return taskStatus;
+    }
+
+    public void setTaskStatus(String taskStatus) {
+        this.taskStatus = taskStatus;
+        _pcs.firePropertyChange("taskStatus", null, null);
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    public void setError(String error) {
+        this.error = error;
+        _pcs.firePropertyChange("error", null, null);
+    }
+
+    public int getFailureCount() {
+        return failureCount;
+    }
+
+    public void setFailureCount(int failureCount) {
+        this.failureCount = failureCount;
+        _pcs.firePropertyChange("failureCount", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
new file mode 100644
index 0000000..1c1c759
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupDictionary.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.jobcounter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * MR Job counter dictionary. It's singlton class that will try to read JobCounter.conf file and configure
+ * counters.
+ *
+ */
+public final class CounterGroupDictionary {
+
+    private final List<CounterGroupKey> groupKeys = new ArrayList<>();
+
+    private static volatile CounterGroupDictionary instance = null;
+    private static final Logger LOG = LoggerFactory.getLogger(CounterGroupDictionary.class);
+
+    private CounterGroupDictionary() {}
+
+    public static CounterGroupDictionary getInstance() throws JobCounterException {
+        if (instance == null) {
+            synchronized (CounterGroupDictionary.class) {
+                if (instance == null) {
+                    CounterGroupDictionary tmp = new CounterGroupDictionary();
+                    tmp.initialize();
+                    instance = tmp;
+                }
+            }
+        }
+        return instance;
+    }
+
+    public CounterGroupKey getCounterGroupByName(String groupName) {
+        for (CounterGroupKey groupKey : groupKeys) {
+            if (groupKey.getName().equalsIgnoreCase(groupName)) {
+                return groupKey;
+            }
+        }
+        return null;
+    }
+
+    public CounterGroupKey getCounterGroupByIndex(int groupIndex) {
+        if (groupIndex < 0 || groupIndex >= groupKeys.size()) {
+            return null;
+        }
+        return groupKeys.get(groupIndex);
+    }
+
+    private void initialize() throws JobCounterException {
+        // load config.properties file from classpath
+        InputStream is = this.getClass().getClassLoader().getResourceAsStream("/JobCounter.conf");
+        try {
+            if (is == null) {
+                is = this.getClass().getClassLoader().getResourceAsStream("JobCounter.conf");
+                if (is == null) {
+                    final String errMsg = "Failed to load JobCounter.conf";
+                    LOG.error(errMsg);
+                    throw new JobCounterException(errMsg);
+                }
+            }
+            final Properties prop = new Properties();
+            try {
+                prop.load(is);
+            } catch(Exception ex) {
+                final String errMsg = "Failed to load JobCounter.conf, reason: " + ex.getMessage();
+                LOG.error(errMsg, ex);
+                throw new JobCounterException(errMsg, ex);
+            }
+            int groupIndex = 0;
+            while (parseGroup(groupIndex, prop)) {
+                ++groupIndex;
+            }
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+    }
+
+    private boolean parseGroup(int groupIndex, Properties prop) {
+        final String groupKeyBase = "counter.group" + groupIndex;
+        final String groupNameKey = groupKeyBase + ".name";
+        final String groupName = prop.getProperty(groupNameKey);
+
+        if (groupName == null) {
+            return false;
+        }
+
+        final String groupDescriptionKey = groupKeyBase + ".description";
+        final String groupDescription = prop.getProperty(groupDescriptionKey);
+        final CounterGroupKeyImpl groupKey = new CounterGroupKeyImpl(groupIndex, groupName, groupDescription);
+        final ArrayList<CounterKey> counters = new ArrayList<CounterKey>();
+
+        int counterIndex = 0;
+        while (parseCounter(groupKey, counterIndex, counters, prop)) {
+            ++counterIndex;
+        }
+        groupKey.setCounterKeys(counters.toArray(new CounterKey[counters.size()]));
+        groupKeys.add(groupKey);
+        return true;
+    }
+
+    private boolean parseCounter(CounterGroupKey groupKey, int counterIndex, List<CounterKey> counters, Properties prop) {
+        final String counterKeyBase = "counter.group" + groupKey.getIndex() + ".counter" + counterIndex;
+        final String counterNameKey = counterKeyBase + ".names";
+        final String counterNamesString = prop.getProperty(counterNameKey);
+
+        if (counterNamesString == null) {
+            return false;
+        }
+        final String[] names = counterNamesString.split(",");
+        final List<String> counterNames = new ArrayList<String>();
+        for (String name : names) {
+            counterNames.add(name.trim());
+        }
+
+        final String counterDescriptionKey = counterKeyBase + ".description";
+        final String counterDescription = prop.getProperty(counterDescriptionKey);
+
+        CounterKey counter = new CounterKeyImpl(counterIndex, counterNames, counterDescription, groupKey);
+        counters.add(counter);
+        return true;
+    }
+
+    private static class CounterKeyImpl implements CounterKey {
+        private final int index;
+        private final List<String> counterNames;
+        private final String description;
+        private final CounterGroupKey groupKey;
+
+        public CounterKeyImpl(int index, List<String> counterNames, String description, CounterGroupKey groupKey) {
+            this.index = index;
+            this.counterNames = counterNames;
+            this.description = description;
+            this.groupKey = groupKey;
+        }
+        @Override
+        public int getIndex() {
+            return index;
+        }
+        @Override
+        public List<String> getNames() {
+            return counterNames;
+        }
+        @Override
+        public String getDescription() {
+            return description;
+        }
+        @Override
+        public CounterGroupKey getGroupKey() {
+            return groupKey;
+        }
+    }
+
+    private static class CounterGroupKeyImpl implements CounterGroupKey {
+        private final int index;
+        private final String name;
+        private final String description;
+        private CounterKey[] counterKeys;
+
+        public CounterGroupKeyImpl(int index, String name, String description) {
+            this.index = index;
+            this.name = name;
+            this.description = description;
+        }
+
+        public void setCounterKeys(CounterKey[] counterKeys) {
+            this.counterKeys = counterKeys;
+        }
+
+        @Override
+        public int getIndex() {
+            return index;
+        }
+        @Override
+        public String getName() {
+            return name;
+        }
+        @Override
+        public String getDescription() {
+            return description;
+        }
+        @Override
+        public int getCounterNumber() {
+            return counterKeys.length;
+        }
+        @Override
+        public List<CounterKey> listCounterKeys() {
+            return Arrays.asList(counterKeys);
+        }
+        @Override
+        public CounterKey getCounterKeyByName(String name) {
+            for (CounterKey counterKey : counterKeys) {
+                for (String n : counterKey.getNames()) {
+                    if (n.equalsIgnoreCase(name)) {
+                        return counterKey;
+                    }
+                }
+            }
+            return null;
+        }
+        @Override
+        public CounterKey getCounterKeyByID(int index) {
+            if (index < 0 || index >= counterKeys.length) {
+                return null;
+            }
+            return counterKeys[index];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
new file mode 100644
index 0000000..82606d1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterGroupKey.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.jobcounter;
+
+import java.util.List;
+
+public interface CounterGroupKey {
+
+    String getName();
+    String getDescription();
+    int getIndex();
+    int getCounterNumber();
+    List<CounterKey> listCounterKeys();
+    CounterKey getCounterKeyByName(String name);
+    CounterKey getCounterKeyByID(int index);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
new file mode 100644
index 0000000..161490f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/CounterKey.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.jobcounter;
+
+import java.util.List;
+
+public interface CounterKey {
+
+    List<String> getNames();
+    String getDescription();
+    int getIndex();
+    CounterGroupKey getGroupKey();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
new file mode 100644
index 0000000..5ffaf51
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounterException.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.jobcounter;
+
+public class JobCounterException extends Exception {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = -4525162176188266862L;
+
+    /**
+     * Default constructor of JobCounterException
+     */
+    public JobCounterException() {
+        super();
+    }
+
+    /**
+     * Constructor of JobCounterException
+     * 
+     * @param message error message
+     */
+    public JobCounterException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructor of JobCounterException
+     * 
+     * @param message error message
+     * @param cause the cause of the exception
+     * 
+     */
+    public JobCounterException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructor of JobCounterException
+     * 
+     * @param cause the cause of the exception
+     */
+    public JobCounterException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
new file mode 100644
index 0000000..2806cf1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/jobcounter/JobCounters.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.jobcounter;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public final class JobCounters {
+    
+    private Map<String, Map<String, Long>> counters = new TreeMap<>();
+
+    public Map<String, Map<String, Long>> getCounters() {
+        return counters;
+    }
+
+    public void setCounters(Map<String, Map<String, Long>> counters) {
+        this.counters = counters;
+    }
+    
+    public String toString(){
+        return counters.toString();
+    }
+
+    public void clear() {
+        for (Map.Entry<String, Map<String, Long>> entry : counters.entrySet()) {
+            entry.getValue().clear();
+        }
+        counters.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
new file mode 100644
index 0000000..0a137be
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+public enum EagleJobStatus {
+    SUBMITTED,
+    LAUNCHED,
+    PREP,
+    RUNNING,
+    SUCCESS,
+    FAILED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
new file mode 100644
index 0000000..9d13fb4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobTagName.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+public enum EagleJobTagName {
+    SITE("site"),
+    RACK("rack"),
+    HOSTNAME("hostname"),
+    JOB_NAME("jobName"),
+    NORM_JOB_NAME("normJobName"),
+    JOB_ID("jobID"),
+    TASK_ID("taskID"),
+    TASK_ATTEMPT_ID("taskAttemptID"),
+    JOB_STATUS("jobStatus"),
+    USER("user"),
+    TASK_TYPE("taskType"),
+    TASK_EXEC_TYPE("taskExecType"),
+    ERROR_CATEGORY("errorCategory"),
+    JOB_QUEUE("queue"),
+    RULE_TYPE("ruleType"),
+    JOB_TYPE("jobType");
+
+    private String tagName; 
+    private EagleJobTagName(String tagName) {
+        this.tagName = tagName;
+    }
+    
+    public String toString() {
+
+        return this.tagName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java
new file mode 100644
index 0000000..fef62a2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleTaskStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+public enum EagleTaskStatus {
+    SUCCESS,
+    KILLED,
+    FAILED,
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
new file mode 100644
index 0000000..559f7a8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+
+/**
+ * generalizing this listener would decouple entity creation and entity handling, also will help unit testing
+ * @author yonzhang
+ *
+ */
+public interface HistoryJobEntityCreationListener {
+    /**
+     * job entity created event
+     * @param entity
+     */
+    void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
+    /**
+     * for streaming processing, flush would help commit the last several entities
+     */
+    void flush() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
new file mode 100644
index 0000000..fc678f8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+
+public interface HistoryJobEntityLifecycleListener extends HistoryJobEntityCreationListener {
+    /**
+     * job entity created event
+     * @param entity
+     */
+    void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
+
+    /**
+     * Job finished
+     */
+    void jobFinish();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
new file mode 100644
index 0000000..d454c31
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+/**
+ */
+public class ImportException extends RuntimeException {
+    private static final long serialVersionUID = -706778307046285820L;
+
+    public ImportException(String message) {
+        super(message);
+    }
+
+    public ImportException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
new file mode 100644
index 0000000..31bfdb5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.entities.*;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A new instance of JHFEventReaderBase will be created for each job history log file.
+ */
+public abstract class JHFEventReaderBase extends JobEntityCreationPublisher implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(JHFEventReaderBase.class);
+    protected Map<String, String> m_baseTags;
+    protected JobEventAPIEntity m_jobSubmitEventEntity;
+    protected JobEventAPIEntity m_jobLaunchEventEntity;
+    protected int m_numTotalMaps;
+    protected int m_numTotalReduces;
+    protected JobEventAPIEntity m_jobFinishEventEntity;
+    protected JobExecutionAPIEntity m_jobExecutionEntity;
+    protected Map<String, Long> m_taskStartTime;
+    // taskAttemptID to task attempt startTime
+    protected Map<String, Long> m_taskAttemptStartTime;
+
+    // taskID to host mapping, for task it's the host where the last attempt runs on
+    protected Map<String, String> m_taskRunningHosts;
+    // hostname to rack mapping
+    protected Map<String, String> m_host2RackMapping;
+
+    protected String m_jobID;
+    protected String m_jobName;
+    protected String m_jobType;
+    protected String m_normJobName;
+    protected String m_user;
+    protected String m_queueName;
+    protected Long m_jobLauchTime;
+    protected JobHistoryContentFilter m_filter;
+
+    protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>();
+
+    protected final Configuration configuration;
+
+    public JPAConstants.JobType fetchJobType(Configuration config) {
+        if (config.get(JPAConstants.JobConfiguration.CASCADING_JOB) != null) { return JPAConstants.JobType.CASCADING; }
+        if (config.get(JPAConstants.JobConfiguration.HIVE_JOB) != null) { return JPAConstants.JobType.HIVE; }
+        if (config.get(JPAConstants.JobConfiguration.PIG_JOB) != null) { return JPAConstants.JobType.PIG; }
+        if (config.get(JPAConstants.JobConfiguration.SCOOBI_JOB) != null) {return JPAConstants.JobType.SCOOBI; }
+        return JPAConstants.JobType.NOTAVALIABLE;
+    }
+
+    /**
+     * baseTags stores the basic tag name values which might be used for persisting various entities
+     * baseTags includes: cluster, datacenter and jobName
+     * baseTags are used for all job/task related entities
+     * @param baseTags
+     */
+    public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        this.m_filter = filter;
+
+        this.m_baseTags = baseTags;
+        m_jobSubmitEventEntity = new JobEventAPIEntity();
+        m_jobSubmitEventEntity.setTags(new HashMap<>(baseTags));
+
+        m_jobLaunchEventEntity = new JobEventAPIEntity();
+        m_jobLaunchEventEntity.setTags(new HashMap<>(baseTags));
+
+        m_jobFinishEventEntity = new JobEventAPIEntity();
+        m_jobFinishEventEntity.setTags(new HashMap<>(baseTags));
+
+        m_jobExecutionEntity = new JobExecutionAPIEntity();
+        m_jobExecutionEntity.setTags(new HashMap<>(baseTags));
+
+        m_taskRunningHosts = new HashMap<>();
+
+        m_host2RackMapping = new HashMap<>();
+
+        m_taskStartTime = new HashMap<>();
+        m_taskAttemptStartTime = new HashMap<>();
+
+        this.configuration = configuration;
+
+        if (this.configuration != null && this.m_jobType == null) {
+            this.setJobType(fetchJobType(this.configuration).toString());
+        }
+    }
+
+    public void register(HistoryJobEntityLifecycleListener lifecycleListener){
+        this.jobEntityLifecycleListeners.add(lifecycleListener);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // check if this job history file is complete
+        if (m_jobExecutionEntity.getEndTime() == 0L) {
+            throw new IOException(new JHFWriteNotCompletedException(m_jobID));
+        }
+        try {
+            flush();
+        } catch(Exception ex) {
+            throw new IOException(ex);
+        }
+    }
+
+    @Override
+    public void flush() throws Exception {
+        super.flush();
+        for (HistoryJobEntityLifecycleListener listener : this.jobEntityLifecycleListeners) {
+            listener.flush();
+        }
+    }
+
+    /**
+       * @param id
+       */
+    private void setJobID(String id) {
+        this.m_jobID = id;
+    }
+
+    private void setJobType(String jobType) {
+        this.m_jobType = jobType;
+    }
+
+    protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
+       String id = values.get(Keys.JOBID);
+
+       if (m_jobID == null) {
+           setJobID(id);
+       } else if (!m_jobID.equals(id)) {
+           String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobID + "'";
+           LOG.error(msg);
+           throw new ImportException(msg);
+       }
+
+       if (values.get(Keys.SUBMIT_TIME) != null) {  // job submitted
+           m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
+           m_user = values.get(Keys.USER);
+           m_queueName = values.get(Keys.JOB_QUEUE);
+           m_jobName = values.get(Keys.JOBNAME);
+           m_normJobName = m_jobName;
+
+           LOG.info("NormJobName of " + id + ": " + m_normJobName);
+
+           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
+           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
+           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobSubmitEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           entityCreated(m_jobSubmitEventEntity);
+       } else if(values.get(Keys.LAUNCH_TIME) != null) {  // job launched
+           m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
+           m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+           m_jobLaunchEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
+           m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
+           entityCreated(m_jobLaunchEventEntity);
+       } else if(values.get(Keys.FINISH_TIME) != null) {  // job finished
+           m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+           m_jobFinishEventEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+           entityCreated(m_jobFinishEventEntity);
+
+           // populate jobExecutionEntity entity
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_QUEUE.toString(), m_queueName);
+           m_jobExecutionEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(),this.m_jobType);
+
+           m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
+           m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
+           m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
+           m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+           if (values.get(Keys.FAILED_MAPS) != null) {
+               // for Artemis
+               m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
+           }
+           if (values.get(Keys.FAILED_REDUCES) != null) {
+               // for Artemis
+               m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
+           }
+           m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
+           m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
+           m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
+           m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
+           if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
+               m_jobExecutionEntity.setJobCounters(parseCounters(totalCounters));
+           }
+           entityCreated(m_jobExecutionEntity);
+       }
+    }
+
+    private void entityCreated(JobBaseAPIEntity entity) throws Exception {
+        for (HistoryJobEntityLifecycleListener lifecycleListener: this.jobEntityLifecycleListeners) {
+            lifecycleListener.jobEntityCreated(entity);
+        }
+
+        // job finished when passing JobExecutionAPIEntity: m_jobExecutionEntity
+        if (entity == this.m_jobExecutionEntity) {
+            for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) {
+                lifecycleListener.jobFinish();
+            }
+        }
+
+        super.notifiyListeners(entity);
+    }
+
+    protected abstract JobCounters parseCounters(Object value) throws IOException;
+
+    /**
+      * for one task ID, it has several sequential task events, i.e.
+      * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end
+      * @param values
+      * @throws IOException
+      */
+    @SuppressWarnings("serial")
+    protected void handleTask(RecordTypes recType, EventType eventType, final Map<Keys, String> values, Object counters) throws Exception {
+        String taskAttemptID = values.get(Keys.TASK_ATTEMPT_ID);
+        String startTime = values.get(Keys.START_TIME);
+        String finishTime = values.get(Keys.FINISH_TIME);
+        final String taskType = values.get(Keys.TASK_TYPE);
+        final String taskID = values.get(Keys.TASKID);
+
+        Map<String, String> taskBaseTags = new HashMap<String, String>(){{
+            put(EagleJobTagName.TASK_TYPE.toString(), taskType);
+            put(EagleJobTagName.USER.toString(), m_user);
+            //put(EagleJobTagName.JOB_NAME.toString(), _jobName);
+            put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+            put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
+            put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+            put(EagleJobTagName.TASK_ID.toString(), taskID);
+        }};
+        taskBaseTags.putAll(m_baseTags);
+        if (recType == RecordTypes.Task && startTime != null) { // task start, no host is assigned yet
+            m_taskStartTime.put(taskID, Long.valueOf(startTime));
+        } else if (recType == RecordTypes.Task && finishTime != null) { // task finish
+            // task execution entity setup
+            TaskExecutionAPIEntity entity = new TaskExecutionAPIEntity();
+            Map<String, String> taskExecutionTags = new HashMap<>(taskBaseTags);
+            String hostname = m_taskRunningHosts.get(taskID);
+            hostname = (hostname == null) ? "" : hostname; // TODO if task fails, then no hostname
+            taskExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
+            taskExecutionTags.put(EagleJobTagName.RACK.toString(), m_host2RackMapping.get(hostname));
+            entity.setTags(taskExecutionTags);
+            entity.setStartTime(m_taskStartTime.get(taskID));
+            entity.setEndTime(Long.valueOf(finishTime));
+            entity.setDuration(entity.getEndTime() - entity.getStartTime());
+            entity.setTimestamp(m_jobLauchTime);
+            entity.setError(values.get(Keys.ERROR));
+            entity.setTaskStatus(values.get(Keys.TASK_STATUS));
+            if (values.get(Keys.COUNTERS) != null || counters != null) {
+                entity.setJobCounters(parseCounters(counters));
+            }
+            entityCreated(entity);
+            //_taskStartTime.remove(taskID); // clean this taskID
+        } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start
+            m_taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime));
+        } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && finishTime != null) {   // task attempt finish
+            TaskAttemptExecutionAPIEntity entity = new TaskAttemptExecutionAPIEntity();
+            Map<String, String> taskAttemptExecutionTags = new HashMap<>(taskBaseTags);
+            entity.setTags(taskAttemptExecutionTags);
+            String hostname = values.get(Keys.HOSTNAME);
+            String rack = values.get(Keys.RACK);
+            taskAttemptExecutionTags.put(EagleJobTagName.HOSTNAME.toString(), hostname);
+            taskAttemptExecutionTags.put(EagleJobTagName.RACK.toString(), rack);
+            // put last attempt's hostname to task level
+            m_taskRunningHosts.put(taskID, hostname);
+            // it is very likely that an attempt ID could be both succeeded and failed due to M/R system
+            // in this case, we should ignore this attempt?
+            if (m_taskAttemptStartTime.get(taskAttemptID) == null) {
+                LOG.warn("task attemp has consistency issue " + taskAttemptID);
+                return;
+            }
+            entity.setStartTime(m_taskAttemptStartTime.get(taskAttemptID));
+            entity.setEndTime(Long.valueOf(finishTime));
+            entity.setTimestamp(m_jobLauchTime);
+            entity.setDuration(entity.getEndTime() - entity.getStartTime());
+            entity.setTaskStatus(values.get(Keys.TASK_STATUS));
+            entity.setError(values.get(Keys.ERROR));
+            if (values.get(Keys.COUNTERS) != null || counters != null) {  // when task is killed, COUNTERS does not exist
+                //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
+                entity.setJobCounters(parseCounters(counters));
+            }
+            entity.setTaskAttemptID(taskAttemptID);
+            entityCreated(entity);
+            m_taskAttemptStartTime.remove(taskAttemptID);
+        } else {
+            // silently ignore
+            LOG.warn("It's an exceptional case ?");
+        }
+    }
+
+    public void parseConfiguration() throws Exception {
+        Map<String, String> prop = new TreeMap<>();
+
+        if (m_filter.acceptJobConfFile()) {
+            Iterator<Map.Entry<String, String> > iter = configuration.iterator();
+            while (iter.hasNext()) {
+                String key = iter.next().getKey();
+                if (included(key) && !excluded(key))
+                    prop.put(key, configuration.get(key));
+            }
+        }
+
+        // check must-have keys are within prop
+        if (matchMustHaveKeyPatterns(prop)) {
+            JobConfigurationAPIEntity jobConfigurationEntity = new JobConfigurationAPIEntity();
+            jobConfigurationEntity.setTags(new HashMap<>(m_baseTags));
+            jobConfigurationEntity.getTags().put(EagleJobTagName.USER.toString(), m_user);
+            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_ID.toString(), m_jobID);
+            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_NAME.toString(), m_jobName);
+            jobConfigurationEntity.getTags().put(EagleJobTagName.NORM_JOB_NAME.toString(), m_normJobName);
+            jobConfigurationEntity.getTags().put(EagleJobTagName.JOB_TYPE.toString(), m_jobType);
+            jobConfigurationEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setConfig(prop);
+            jobConfigurationEntity.setJobConfig(jobConfig);
+            jobConfigurationEntity.setConfigJobName(m_normJobName);
+            entityCreated(jobConfigurationEntity);
+        }
+    }
+
+    private boolean matchMustHaveKeyPatterns(Map<String, String> prop) {
+        if (m_filter.getMustHaveJobConfKeyPatterns() == null) {
+            return true;
+        }
+
+        for (Pattern p : m_filter.getMustHaveJobConfKeyPatterns()) {
+            boolean matched = false;
+            for (String key : prop.keySet()) {
+                if (p.matcher(key).matches()) {
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched)
+                return false;
+        }
+        return true;
+    }
+
+    private boolean included(String key) {
+        if (m_filter.getJobConfKeyInclusionPatterns() == null)
+            return true;
+        for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) {
+            Matcher m = p.matcher(key);
+            if (m.matches()) {
+                LOG.info("include key: " + p.toString());
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean excluded(String key) {
+        if (m_filter.getJobConfKeyExclusionPatterns() == null)
+            return false;
+        for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) {
+            Matcher m = p.matcher(key);
+            if (m.matches())
+                return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
new file mode 100644
index 0000000..adeb41e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+public enum JHFFormat {
+    MRVer1,
+    MRVer2
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
new file mode 100644
index 0000000..24e3563
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.util.CountersStrings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading.
+ * @author yonzhang
+ *
+ */
+public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener {
+    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class);
+
+    /**
+     * baseTags stores the basic tag name values which might be used for persisting various entities
+     * baseTags includes: cluster, datacenter and jobName
+     * baseTags are used for all job/task related entities
+     * @param baseTags
+     */
+    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        super(baseTags, configuration, filter);
+    }
+
+    @Override
+    public void handle(RecordTypes recType, Map<Keys, String> values)
+          throws Exception {
+        switch (recType) {
+            case Job:
+                handleJob(null, values, values.get(Keys.COUNTERS));
+                break;
+            case Task:
+                handleTask(RecordTypes.Task, null, values, values.get(Keys.COUNTERS));
+                break;
+            case MapAttempt:
+                ensureRackHostnameAfterAttemptFinish(values);
+                handleTask(RecordTypes.MapAttempt, null, values, values.get(Keys.COUNTERS));
+                break;
+            case ReduceAttempt:
+                ensureRackHostnameAfterAttemptFinish(values);
+                handleTask(RecordTypes.ReduceAttempt, null, values, values.get(Keys.COUNTERS));
+                break;
+            default:
+                // skip other types
+                ;
+        }
+    }
+     
+    private void ensureRackHostnameAfterAttemptFinish(Map<Keys, String> values) {
+        // only care about attempt finish
+        if (values.get(Keys.FINISH_TIME) == null)
+            return;
+        String hostname = null;
+        String rack = null;
+        // we get rack/hostname based on task's status
+        if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.SUCCESS.name())) {
+            // in CDH4.1.4, the hostname has the format of /default/rack128/phxdpehdc10dn0338.stratus.phx.ebay.com
+            // if not specified, the hostname has the format  of /default-rack/<hostname>
+            String decoratedHostname = values.get(Keys.HOSTNAME);
+            String[] tmp = decoratedHostname.split("/");
+            hostname = tmp[tmp.length - 1];
+            rack = tmp[tmp.length - 2];
+            m_host2RackMapping.put(hostname, rack);
+        } else if(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
+            hostname = values.get(Keys.HOSTNAME);
+            // make every effort to get RACK information
+            hostname = (hostname == null) ? "" : hostname;
+            rack = m_host2RackMapping.get(hostname);
+        }
+          
+        values.put(Keys.HOSTNAME, hostname);
+        values.put(Keys.RACK, rack);
+    }
+    
+    @Override
+    protected JobCounters parseCounters(Object value) throws IOException {
+        JobCounters jc = new JobCounters();
+        Map<String, Map<String, Long>> groups = new HashMap<String, Map<String, Long>>();
+        Counters counters = new Counters(); 
+        try {
+            CountersStrings.parseEscapedCompactString((String)value, counters);
+        } catch (Exception ex) {
+            logger.error("can not parse job history", ex);
+            throw new IOException(ex);
+        }
+        Iterator<CounterGroup> it = counters.iterator();
+        while (it.hasNext()) {
+            CounterGroup cg = it.next();
+
+           // hardcoded to exclude business level counters
+            if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter")
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter")
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter")
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter")
+                && !cg.getName().equals("FileSystemCounters")                                      // for artemis
+                && !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter")                   // for artemis
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter")  // for artemis
+                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter")
+            ) continue;
+
+            groups.put(cg.getName(), new HashMap<String, Long>());
+            Map<String, Long> counterValues = groups.get(cg.getName());
+            logger.debug("groupname:" + cg.getName() + "(" + cg.getDisplayName() + ")");
+            Iterator<Counter> iterCounter = cg.iterator();
+            while (iterCounter.hasNext()) {
+                Counter c = iterCounter.next();
+                counterValues.put(c.getName(), c.getValue());
+                logger.debug(c.getName() + "=" + c.getValue() + "(" + c.getDisplayName() + ")");
+            }
+        }
+        jc.setCounters(groups);
+        return jc;
+    }
+    
+    public JobExecutionAPIEntity jobExecution() {
+        return m_jobExecutionEntity;
+    }
+}


Mime
View raw message