eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [5/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring
Date Tue, 05 Jul 2016 18:07:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
new file mode 100644
index 0000000..bb08ef0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+public class JHFMRVer1Parser implements JHFParserBase {
+    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1Parser.class);
+    static final char LINE_DELIMITER_CHAR = '.';
+    static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR};
+    static final String KEY = "(\\w+)";
+    // value is any character other than quote, but escaped quotes can be there
+    static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+";
+    static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
+    static final String MAX_COUNTER_COUNT = "10000";
+
+    private JHFMRVer1EventReader m_reader;
+    public JHFMRVer1Parser(JHFMRVer1EventReader reader){
+        this.m_reader = reader;
+    }
+
+    /**
+      * Parses history file and invokes Listener.handle() for
+      * each line of history. It can be used for looking through history
+      * files for specific items without having to keep whole history in memory.
+      * @throws IOException
+      */
+    @Override
+    public void parse(InputStream in) throws Exception, ParseException {
+        // set enough counter number as user may build more counters
+        System.setProperty("mapreduce.job.counters.max", MAX_COUNTER_COUNT);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+        try {
+            String line = null;
+            StringBuffer buf = new StringBuffer();
+
+            // Read the meta-info line. Note that this might be a jobinfo line for files written with older format
+            line = reader.readLine();
+
+            // Check if the file is empty
+            if (line == null) {
+              return;
+            }
+
+            // Get the information required for further processing
+            MetaInfoManager mgr = new MetaInfoManager(line);
+            boolean isEscaped = mgr.isValueEscaped();
+            String lineDelim = String.valueOf(mgr.getLineDelim());
+            String escapedLineDelim = StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, mgr.getLineDelim());
+
+            do {
+                buf.append(line);
+                if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) {
+                  buf.append("\n");
+                  continue;
+                }
+                parseLine(buf.toString(), m_reader, isEscaped);
+                buf = new StringBuffer();
+            } while ((line = reader.readLine())!= null);
+
+            // flush to tell listener that we have finished parsing
+            logger.info("finish parsing job history file and close");
+            m_reader.close();
+        } catch(Exception ex) {
+            logger.error("can not parse correctly ", ex);
+            throw ex;
+        } finally {
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    private static void parseLine(String line, JHFMRVer1PerLineListener l, boolean isEscaped) throws Exception, ParseException {
+        // extract the record type
+        int idx = line.indexOf(' ');
+        String recType = line.substring(0, idx);
+        String data = line.substring(idx+1, line.length());
+
+        Matcher matcher = pattern.matcher(data);
+        Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
+
+        while(matcher.find()) {
+            String tuple = matcher.group(0);
+            String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
+            String value = parts[1].substring(1, parts[1].length() -1);
+            if (isEscaped) {
+              value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
+            }
+            parseBuffer.put(Keys.valueOf(parts[0]), value);
+        }
+
+//        if(conf!=null){
+//            parseBuffer.put(Keys.NORM_JOBNAME, conf.get(JPAConstants.JOB_CONF_NORM_JOBNAME_KEY));
+//        }
+
+        try {
+            l.handle(RecordTypes.valueOf(recType), parseBuffer);
+        } catch (IllegalArgumentException ex) {
+            // somehow record type does not exist, but continue to run
+            logger.warn("record type does not exist " + recType, ex);
+        }
+
+        parseBuffer.clear();
+      }
+
+      /**
+       * Manages job-history's meta information such as version etc.
+       * Helps in logging version information to the job-history and recover
+       * version information from the history.
+       */
+      static class MetaInfoManager implements JHFMRVer1PerLineListener {
+          private long version = 0L;
+          private KeyValuePair pairs = new KeyValuePair();
+
+          public void close() {
+          }
+          // Extract the version of the history that was used to write the history
+          public MetaInfoManager(String line) throws Exception, ParseException {
+              if (null != line) {
+                  // Parse the line
+                  parseLine(line, this, false);
+              }
+          }
+
+          // Get the line delimiter
+          char getLineDelim() {
+              if (version == 0) {
+                  return '"';
+              } else {
+                  return LINE_DELIMITER_CHAR;
+              }
+          }
+
+          // Checks if the values are escaped or not
+          boolean isValueEscaped() {
+              // Note that the values are not escaped in version 0
+              return version != 0;
+          }
+
+          public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
+            // Check if the record is of type META
+              if (RecordTypes.Meta == recType) {
+                  pairs.handle(values);
+                  version = pairs.getLong(Keys.VERSION); // defaults to 0
+              }
+          }
+      }
+      
+      /**
+       * Base class contais utility stuff to manage types key value pairs with enums. 
+       */
+      static class KeyValuePair {
+          private Map<Keys, String> values = new HashMap<Keys, String>(); 
+
+          /**
+           * Get 'String' value for given key. Most of the places use Strings as 
+           * values so the default get' method returns 'String'.  This method never returns 
+           * null to ease on GUIs. if no value is found it returns empty string ""
+           * @param k 
+           * @return if null it returns empty string - "" 
+           */
+          public String get(Keys k) {
+              String s = values.get(k);
+              return s == null ? "" : s;
+          }
+          /**
+           * Convert value from history to int and return. 
+           * if no value is found it returns 0.
+           * @param k key 
+           */
+          public int getInt(Keys k) {
+              String s = values.get(k);
+              if (null != s){
+                  return Integer.parseInt(s);
+              }
+              return 0;
+          }
+          /**
+           * Convert value from history to int and return. 
+           * if no value is found it returns 0.
+           * @param k
+           */
+          public long getLong(Keys k) {
+              String s = values.get(k);
+              if (null != s){
+                  return Long.parseLong(s);
+              }
+              return 0;
+          }
+          /**
+           * Set value for the key. 
+           * @param k
+           * @param s
+           */
+          public void set(Keys k, String s) {
+              values.put(k, s);
+          }
+          /**
+           * Adds all values in the Map argument to its own values. 
+           * @param m
+           */
+          public void set(Map<Keys, String> m) {
+              values.putAll(m);
+          }
+          /**
+           * Reads values back from the history, input is same Map as passed to Listener by parseHistory().  
+           * @param values
+           */
+          public synchronized void handle(Map<Keys, String> values) {
+              set(values);
+          }
+          /**
+           * Returns Map containing all key-values. 
+           */
+          public Map<Keys, String> getValues() {
+              return values;
+          }
+      }
+      
+      /**
+       * Job history files contain key="value" pairs, where keys belong to this enum. 
+       * It acts as a global namespace for all keys. 
+       */
+      public static enum Keys { 
+          JOBTRACKERID,
+          START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, 
+          LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
+          FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
+          ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
+          SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
+          TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
+          VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
+
+          UBERISED,SPLIT_LOCATIONS,FAILED_DUE_TO_ATTEMPT,MAP_FINISH_TIME,PORT,RACK_NAME,
+
+          //For Artemis
+          WORKFLOW_ID,WORKFLOW_NAME,WORKFLOW_NODE_NAME,WORKFLOW_ADJACENCIES,WORKFLOW_TAGS,
+          SHUFFLE_PORT,LOCALITY,AVATAAR,FAIL_REASON
+      }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
new file mode 100644
index 0000000..1c096fc
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Callback interface for reading back log events from JobHistory. This interface 
+ * should be implemented and passed to JobHistory.parseHistory() 
+ *
+ */
+public interface JHFMRVer1PerLineListener{
+    /**
+     * Callback method for history parser. 
+     * @param recType type of record, which is the first entry in the line. 
+     * @param values a map of key-value pairs as thry appear in history.
+     * @throws IOException
+     */
+    void handle(RecordTypes recType, Map<Keys, String> values) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
new file mode 100644
index 0000000..6fd2c18
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.jobhistory.*;
+import org.apache.hadoop.util.StringInterner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JHFMRVer2EventReader extends JHFEventReaderBase {
+    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2EventReader.class);
+
+    /**
+     * Create a new Event Reader
+     * @throws IOException
+     */
+    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        super(baseTags, configuration, filter);
+    }
+
+    @SuppressWarnings("deprecation")
+    public void handleEvent(Event wrapper) throws Exception {
+        switch (wrapper.type) {
+        case JOB_SUBMITTED:
+            handleJobSubmitted(wrapper); break;
+        case JOB_INITED:
+            handleJobInited(wrapper); break;
+        case JOB_FINISHED:
+            handleJobFinished(wrapper); break;
+        case JOB_PRIORITY_CHANGED:
+            handleJobPriorityChanged(); break;
+        case JOB_STATUS_CHANGED:
+            handleJobStatusChanged(); break;
+        case JOB_FAILED:
+        case JOB_KILLED:
+        case JOB_ERROR:
+            handleJobUnsuccessfulCompletion(wrapper); break;
+        case JOB_INFO_CHANGED:
+            handleJobInfoChanged(); break;
+        case JOB_QUEUE_CHANGED:
+            handleJobQueueChanged();break;
+        case TASK_STARTED:
+            handleTaskStarted(wrapper); break;
+        case TASK_FINISHED:
+            handleTaskFinished(wrapper); break;
+        case TASK_FAILED:
+            handleTaskFailed(wrapper); break;
+        case TASK_UPDATED:
+            handleTaskUpdated(); break;
+
+        // map task
+        case MAP_ATTEMPT_STARTED:
+            handleMapAttemptStarted(wrapper); break;
+        case MAP_ATTEMPT_FINISHED:
+            handleMapAttemptFinished(wrapper); break;
+        case MAP_ATTEMPT_FAILED:
+            handleMapAttemptFailed(wrapper); break;
+        case MAP_ATTEMPT_KILLED:
+            handleMapAttemptKilled(wrapper); break;
+
+        // reduce task
+        case REDUCE_ATTEMPT_STARTED:
+            handleReduceAttemptStarted(wrapper); break;
+        case REDUCE_ATTEMPT_FINISHED:
+            handleReduceAttemptFinished(wrapper); break;
+        case REDUCE_ATTEMPT_FAILED:
+            handleReduceAttemptFailed(wrapper); break;
+        case REDUCE_ATTEMPT_KILLED:
+            handleReduceAttemptKilled(wrapper); break;
+
+        // set up task
+        case SETUP_ATTEMPT_STARTED:
+            break;
+        case SETUP_ATTEMPT_FINISHED:
+            handleSetupAttemptFinished(); break;
+        case SETUP_ATTEMPT_FAILED:
+            handleSetupAttemptFailed(); break;
+        case SETUP_ATTEMPT_KILLED:
+            handleSetupAttemptKilled(); break;
+
+        // clean up task
+        case CLEANUP_ATTEMPT_STARTED:
+            break;
+        case CLEANUP_ATTEMPT_FINISHED:
+            handleCleanupAttemptFinished(); break;
+        case CLEANUP_ATTEMPT_FAILED:
+            handleCleanupAttemptFailed(); break;
+        case CLEANUP_ATTEMPT_KILLED:
+            handleCleanupAttemptKilled(); break;
+
+        case AM_STARTED:
+            handleAMStarted(); break;
+        default:
+            logger.warn("unexpected event type: " + wrapper.type);
+        }
+    }
+
+    private void handleJobPriorityChanged() throws Exception {
+        return;
+    }
+    private void handleJobStatusChanged() throws Exception {
+        return;
+    }
+    private void handleJobInfoChanged() throws Exception {
+        return;
+    }
+
+    private void handleJobQueueChanged() throws Exception {
+        return;
+    }
+
+    private void handleJobSubmitted(Event wrapper) throws Exception {
+        JobSubmitted js = ((JobSubmitted)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
+        if (js.getJobName() != null) values.put(Keys.JOBNAME, js.getJobName().toString());
+        if (js.getUserName() != null) values.put(Keys.USER, js.getUserName().toString());
+        if (js.getSubmitTime() != null) values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
+        if (js.getJobQueueName() != null) values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+        handleJob(wrapper.getType(), values, null);
+    }
+
+    private void handleJobInited(Event wrapper) throws Exception {
+        JobInited js = ((JobInited)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
+        if (js.getLaunchTime() != null) values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
+        if (js.getTotalMaps() != null) values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
+        if (js.getTotalReduces() != null) values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
+        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        if (js.getUberized() != null) values.put(Keys.UBERISED, js.getUberized().toString());
+        handleJob(wrapper.getType(), values, null);
+    }
+
+    private void handleJobFinished(Event wrapper) throws Exception {
+        JobFinished js = ((JobFinished)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        if (js.getFailedMaps() != null) values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
+        if (js.getFailedReduces() != null) values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+        values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
+        handleJob(wrapper.getType(), values, js.getTotalCounters());
+    }
+
+    private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
+        JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        handleJob(wrapper.getType(), values, null);
+    }
+
+    private void handleTaskStarted(Event wrapper) throws Exception {
+        TaskStarted js = ((TaskStarted)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
+        if (js.getSplitLocations() != null) values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+        handleTask(RecordTypes.Task, wrapper.getType(), values, null);
+    }
+
+    private void handleTaskFinished(Event wrapper) throws Exception {
+        TaskFinished js = ((TaskFinished)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
+    }
+
+    private void handleTaskFailed(Event wrapper) throws Exception {
+        TaskFailed js = ((TaskFailed)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
+        if (js.getFailedDueToAttempt() != null) values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+        handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
+    }
+
+    private String normalizeTaskStatus(String taskStatus) {
+        if (taskStatus.equals("SUCCEEDED"))
+            return EagleTaskStatus.SUCCESS.name();
+        return taskStatus;
+    }
+
+    private void handleTaskUpdated(){
+        return;
+    }
+
+    private void handleMapAttemptStarted(Event wrapper) throws Exception {
+        handleTaskAttemptStarted(wrapper, RecordTypes.MapAttempt);
+    }
+    private void handleReduceAttemptStarted(Event wrapper) throws Exception {
+        handleTaskAttemptStarted(wrapper, RecordTypes.ReduceAttempt);
+    }
+
+    private void handleTaskAttemptStarted(Event wrapper, RecordTypes recordType) throws Exception {
+        TaskAttemptStarted js = ((TaskAttemptStarted)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
+        if (js.getTrackerName() != null) values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
+        if (js.getHttpPort() != null) values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
+        if (js.getShufflePort() != null) values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
+        if (js.getLocality() != null) values.put(Keys.LOCALITY, js.getLocality().toString());
+        if (js.getAvataar() != null) values.put(Keys.AVATAAR, js.getAvataar().toString());
+        handleTask(recordType,wrapper.getType(), values, null);
+    }
+
+    private void handleMapAttemptFinished(Event wrapper) throws Exception {
+        MapAttemptFinished js = ((MapAttemptFinished)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getMapFinishTime() != null) values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
+        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
+        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
+        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
+        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+
+        ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
+        handleTask(RecordTypes.MapAttempt,wrapper.getType(), values, js.getCounters());
+    }
+    private void handleReduceAttemptFinished(Event wrapper) throws Exception {
+        ReduceAttemptFinished js = ((ReduceAttemptFinished)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getShuffleFinishTime() != null) values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
+        if (js.getSortFinishTime() != null) values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
+        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
+        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
+        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
+        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+        ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
+        handleTask(RecordTypes.ReduceAttempt,wrapper.getType(), values, js.getCounters());
+    }
+
+    private void ensureRackAfterAttemptFinish(String rackname, Map<Keys, String> values) {
+        // rack name has the format like /default/rack13
+        String[] tmp = rackname.split("/");
+        String rack = tmp[tmp.length - 1];
+        values.put(Keys.RACK, rack);
+        m_host2RackMapping.put(values.get(Keys.HOSTNAME), rack);
+    }
+
+    private void handleMapAttemptFailed(Event wrapper) throws Exception {
+        handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
+    }
+    private void handleReduceAttemptFailed(Event wrapper) throws Exception {
+        handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
+    }
+    private void handleMapAttemptKilled(Event wrapper)throws Exception {
+        handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
+    }
+    private void handleReduceAttemptKilled(Event wrapper)throws Exception {
+        handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
+    }
+    
+    private void handleTaskAttemptFailed(Event wrapper, RecordTypes recordType) throws Exception {
+        TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion)wrapper.getEvent());
+        Map<Keys, String> values = new HashMap<>();
+        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
+        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
+        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
+        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
+        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
+        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, js.getStatus().toString());
+        ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
+        handleTask(recordType,wrapper.getType(), values, js.getCounters());
+    }    
+    private void handleSetupAttemptFinished(){
+        return;
+    }
+    private void handleSetupAttemptFailed(){
+        return;
+    }
+    private void handleSetupAttemptKilled(){
+        return;
+    }
+    private void handleCleanupAttemptFinished(){
+        return;
+    }
+    private void handleCleanupAttemptFailed(){
+        return;
+    }
+    private void handleCleanupAttemptKilled(){
+        return;
+    }
+    private void handleAMStarted(){
+        return;
+    }
+
+    protected JobCounters parseCounters(Object value) throws IOException{
+        JobCounters jc = new JobCounters();
+        Map<String, Map<String, Long>> groups = new HashMap<>();
+        JhCounters counters = (JhCounters)value;
+        List<JhCounterGroup> list = counters.getGroups();
+        for (JhCounterGroup cg : list) {
+            String cgName = cg.getName().toString();
+            if(!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
+                && !cgName.equals("org.apache.hadoop.mapreduce.TaskCounter")
+                && !cgName.equals("org.apache.hadoop.mapreduce.JobCounter")
+                && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
+                && !cgName.equals("org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter")
+                && !cgName.equals("FileSystemCounters")                                           // for artemis
+                && !cgName.equals("org.apache.hadoop.mapred.Task$Counter")                        // for artemis
+                && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis
+                && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") // for artemis
+            ) continue;
+
+            groups.put(cgName, new HashMap<String, Long>());
+            Map<String, Long> counterValues = groups.get(cgName);
+            logger.debug("groupname:" + cg.getName() + "(" + cg.getDisplayName() + ")");
+
+            for (JhCounter c : cg.getCounts()) {
+                counterValues.put(c.getName().toString(), c.getValue());
+                logger.debug(c.getName() + "=" + c.getValue() + "(" + c.getDisplayName() + ")");
+            }
+        }
+        jc.setCounters(groups);
+        return jc;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
new file mode 100644
index 0000000..c289e4d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.mapreduce.jobhistory.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class JHFMRVer2Parser implements JHFParserBase {
+    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class);
+    private JHFMRVer2EventReader _reader;
+    
+    public JHFMRVer2Parser(JHFMRVer2EventReader reader){
+        this._reader = reader;
+    }
+
+    @SuppressWarnings({ "rawtypes", "deprecation" })
+    @Override
+    public void parse(InputStream is) throws Exception {
+        int eventCtr = 0;
+        try {
+             long start = System.currentTimeMillis();
+             DataInputStream in = new DataInputStream(is);
+             String version = in.readLine();
+             if (!"Avro-Json".equals(version)) {
+                 throw new IOException("Incompatible event log version: " + version);
+             }
+                
+             Schema schema = Schema.parse(in.readLine());
+             SpecificDatumReader datumReader = new SpecificDatumReader(schema);
+             JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
+
+             Event wrapper;
+             while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
+                 ++eventCtr;
+                 _reader.handleEvent(wrapper);
+             }
+             _reader.parseConfiguration();
+             // don't need put to finally as it's a kind of flushing data
+             _reader.close();
+            logger.info("reader used " + (System.currentTimeMillis() - start) + "ms");
+        } catch (Exception ioe) {
+            logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe);
+            throw ioe;
+        } finally {
+            if (is != null) {
+                is.close();
+            }
+        }
+      }
+    
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public Event getNextEvent(DatumReader datumReader, JsonDecoder decoder) throws Exception {
+        Event wrapper;
+        try {
+            wrapper = (Event)datumReader.read(null, decoder);
+        } catch (java.io.EOFException e) {            // at EOF
+            return null;
+        }
+        return wrapper;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
new file mode 100644
index 0000000..e4f437f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import java.io.InputStream;
+
+/**
+ *
+ * @author yonzhang
+ *
+ */
+public interface JHFParserBase {
+    /**
+     * this method will ensure to close the inputstream
+     * @param is
+     * @throws Exception
+     */
+    void parse(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
new file mode 100755
index 0000000..dd640c2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class JHFParserFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
+
+    public static JHFParserBase getParser(JHFConfigManager configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        String format = configManager.getJobExtractorConfig().mrVersion;
+        JHFParserBase parser;
+        JHFFormat f;
+        try {
+            if (format == null) {
+                f = JHFFormat.MRVer1;
+            } else {
+                f = JHFFormat.valueOf(format);
+            }
+        } catch(IllegalArgumentException ex) {
+            f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2
+        }
+        
+        switch (f) {
+        case MRVer2:
+            JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+            reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
+            reader2.addListener(new TaskFailureListener(configManager));
+            reader2.addListener(new TaskAttemptCounterListener(configManager));
+            reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
+
+            reader2.register(new JobEntityLifecycleAggregator());
+            parser = new JHFMRVer2Parser(reader2);
+            break;
+        case MRVer1:
+        default:
+            JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+            reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
+            reader1.addListener(new TaskFailureListener(configManager));
+            reader1.addListener(new TaskAttemptCounterListener(configManager));
+
+            reader1.register(new JobEntityLifecycleAggregator());
+            parser = new JHFMRVer1Parser(reader1);
+            break;
+        }
+        return parser;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
new file mode 100644
index 0000000..4529f8b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+/**
+ * used to warn that one job history file has not yet completed writing to hdfs
+ * This happens when feeder catches up and the history file has not been written into hdfs completely
+ * @author yonzhang
+ *
+ */
+public class JHFWriteNotCompletedException extends Exception {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = -3060175780718218490L;
+
+    public JHFWriteNotCompletedException(String msg){
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
new file mode 100644
index 0000000..a748565
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.JobConfigurationAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener {
+    private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
+    private static final int MAX_RETRY_TIMES = 3;
+    private JHFConfigManager configManager;
+    private JobConfigurationAPIEntity m_jobConfigurationEntity;
+
+    public JobConfigurationCreationServiceListener(JHFConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        if (entity != null) {
+            if (entity instanceof JobConfigurationAPIEntity) {
+                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity)entity;
+            }
+        }
+    }
+
+    @Override
+    public void jobFinish() {
+
+    }
+
+    @Override
+    public void flush() throws Exception {
+        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        List<JobConfigurationAPIEntity> list = new ArrayList<>();
+        list.add(m_jobConfigurationEntity);
+
+        int tried = 0;
+        while (tried <= MAX_RETRY_TIMES) {
+            try {
+                logger.info("start flushing JobConfigurationAPIEntity entities of total number " + list.size());
+                client.create(list);
+                logger.info("finish flushing entities of total number " + list.size());
+                break;
+            } catch (Exception ex) {
+                if (tried < MAX_RETRY_TIMES) {
+                    logger.error("Got exception to flush, retry as " + (tried + 1) +" times",ex);
+                } else {
+                    logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                }
+            } finally {
+                list.clear();
+                m_jobConfigurationEntity = null;
+                client.getJerseyClient().destroy();
+                client.close();
+            }
+            tried ++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
new file mode 100644
index 0000000..15c932d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.*;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCreationListener {
+    private static final Logger logger = LoggerFactory.getLogger(JobEntityCreationEagleServiceListener.class);
+    private static final int BATCH_SIZE = 1000;
+    private int batchSize;
+    private List<JobBaseAPIEntity> list = new ArrayList<>();
+    private JHFConfigManager configManager;
+    List<JobExecutionAPIEntity> jobs = new ArrayList<>();
+    List<JobEventAPIEntity> jobEvents = new ArrayList<>();
+    List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
+    List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
+    
+    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager){
+        this(configManager, BATCH_SIZE);
+    }
+    
+    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager, int batchSize) {
+        this.configManager = configManager;
+        if (batchSize <= 0)
+            throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
+        this.batchSize = batchSize;
+    }
+    
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        list.add(entity);
+        if (list.size() % batchSize == 0) {
+            flush();
+            list.clear();
+        }
+    }
+
+    /**
+     * We need save network bandwidth as well
+     */
+    @Override
+    public void flush() throws Exception {
+        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        logger.info("start flushing entities of total number " + list.size());
+        for (int i = 0; i < list.size(); i++) {
+            JobBaseAPIEntity entity = list.get(i);
+            if (entity instanceof JobExecutionAPIEntity) {
+                jobs.add((JobExecutionAPIEntity)entity);
+            } else if(entity instanceof JobEventAPIEntity) {
+                jobEvents.add((JobEventAPIEntity)entity);
+            } else if(entity instanceof TaskExecutionAPIEntity) {
+                taskExecs.add((TaskExecutionAPIEntity)entity);
+            } else if(entity instanceof TaskAttemptExecutionAPIEntity) {
+                taskAttemptExecs.add((TaskAttemptExecutionAPIEntity)entity);
+            }
+        }
+        GenericServiceAPIResponseEntity result;
+        if (jobs.size() > 0) {
+            logger.info("flush JobExecutionAPIEntity of number " + jobs.size());
+            result = client.create(jobs);
+            checkResult(result);
+            jobs.clear();
+        }
+        if (jobEvents.size() > 0) {
+            logger.info("flush JobEventAPIEntity of number " + jobEvents.size());
+            result = client.create(jobEvents);
+            checkResult(result);
+            jobEvents.clear();
+        }
+        if (taskExecs.size() > 0) {
+            logger.info("flush TaskExecutionAPIEntity of number " + taskExecs.size());
+            result = client.create(taskExecs);
+            checkResult(result);
+            taskExecs.clear();
+        }
+        if (taskAttemptExecs.size() > 0) {
+            logger.info("flush TaskAttemptExecutionAPIEntity of number " + taskAttemptExecs.size());
+            result = client.create(taskAttemptExecs);
+            checkResult(result);
+            taskAttemptExecs.clear();
+        }
+        logger.info("finish flushing entities of total number " + list.size());
+        list.clear();
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+    
+    private void checkResult(GenericServiceAPIResponseEntity result) throws Exception {
+        if (!result.isSuccess()) {
+            logger.error(result.getException());
+            throw new Exception("Entity creation fails going to EagleService");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
new file mode 100644
index 0000000..b9dc13b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+
+import java.util.Vector;
+
+/**
+ * not thread safe
+ * @author yonzhang
+ *
+ */
+public class JobEntityCreationPublisher {
+    private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
+    public void addListener(HistoryJobEntityCreationListener l){
+        listeners.add(l);
+    }
+    
+    public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
+        for (HistoryJobEntityCreationListener l : listeners) {
+            l.jobEntityCreated(entity);
+        }
+    }
+    
+    public void flush() throws Exception {
+        for (HistoryJobEntityCreationListener l : listeners) {
+            l.flush();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
new file mode 100644
index 0000000..e485cc8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleListener {
+    private final static Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
+    private JobExecutionAPIEntity m_jobExecutionAPIEntity;
+    private final JobCounterAggregateFunction m_mapTaskAttemptCounterAgg;
+    private final JobCounterAggregateFunction m_reduceTaskAttemptCounterAgg;
+
+    private final JobCounterAggregateFunction m_mapFileSystemCounterAgg;
+    private final JobCounterAggregateFunction m_reduceFileSystemTaskCounterAgg;
+
+    private long m_mapAttemptDuration = 0;
+    private long m_reduceAttemptDuration = 0;
+    private boolean jobFinished = false;
+
+    public JobEntityLifecycleAggregator() {
+        this.m_mapTaskAttemptCounterAgg = new JobCounterSumFunction();
+        this.m_reduceTaskAttemptCounterAgg = new JobCounterSumFunction();
+        this.m_mapFileSystemCounterAgg = new JobCounterSumFunction();
+        this.m_reduceFileSystemTaskCounterAgg = new JobCounterSumFunction();
+    }
+
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        if (entity != null ) {
+            if (entity instanceof TaskAttemptExecutionAPIEntity) {
+                taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
+            } else if(entity instanceof JobExecutionAPIEntity) {
+                this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+            }
+        }
+    }
+
+    @Override
+    public void jobFinish() {
+        try {
+            if (m_jobExecutionAPIEntity == null) {
+                throw new IOException("No JobExecutionAPIEntity found before flushing");
+            }
+
+            LOG.debug("Updating aggregated task attempts to job level counters");
+
+            JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
+
+            if (jobCounters == null) {
+                LOG.warn("no job counter found for "+this.m_jobExecutionAPIEntity);
+                jobCounters = new JobCounters();
+            }
+
+            Map<String,Map<String,Long>> counters = jobCounters.getCounters();
+
+            Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+            if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
+            mapTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
+            counters.put(JPAConstants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+
+            Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+            if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
+            reduceTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
+            counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+
+            counters.put(JPAConstants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
+            counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+
+            jobCounters.setCounters(counters);
+
+            m_jobExecutionAPIEntity.setJobCounters(jobCounters);
+            jobFinished = true;
+        } catch (Exception e) {
+            LOG.error("Failed to update job execution entity: " + this.m_jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e);
+        }
+    }
+
+    private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) {
+        JobCounters jobCounters = entity.getJobCounters();
+        String taskType = entity.getTags().get(JPAConstants.JOB_TASK_TYPE_TAG);
+
+        if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) {
+            if (JPAConstants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
+                m_mapAttemptDuration += entity.getDuration();
+                this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
+                this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+                return;
+            } else if (JPAConstants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
+                m_reduceAttemptDuration += entity.getDuration();
+                this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER));
+                this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER));
+                return;
+            }
+        }
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity));
+        } catch (JsonProcessingException e) {
+            LOG.error(e.getMessage(),e);
+        }
+    }
+
+    @Override
+    public void flush() throws Exception {
+        if (!this.jobFinished) {
+            this.jobFinish();
+        }
+    }
+
+    interface JobCounterAggregateFunction {
+        void accumulate(Map<String, Long> mapCounters);
+        Map<String,Long> result();
+    }
+
+    static class JobCounterSumFunction implements JobCounterAggregateFunction {
+        final Map<String, Long> result;
+
+        public JobCounterSumFunction(){
+            result = new HashMap<>();
+        }
+
+        /**
+         * @param counters
+         */
+        @Override
+        public void accumulate(Map<String, Long> counters) {
+            if (counters != null) {
+                for (Map.Entry<String,Long> taskEntry: counters.entrySet()) {
+                    String counterName = taskEntry.getKey();
+                    long counterValue = taskEntry.getValue();
+
+                    if (!result.containsKey(counterName)) {
+                        result.put(counterName, counterValue);
+                    } else {
+                        long preValue = result.get(counterName);
+                        result.put(counterName, preValue + counterValue);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public Map<String, Long> result() {
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
new file mode 100644
index 0000000..a2f4c3a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class MRErrorClassifier {
+    
+    private final List<ErrorCategory> categories = new ArrayList<ErrorCategory>();
+
+    public MRErrorClassifier(InputStream configStream) throws IOException {
+        final BufferedReader reader = new BufferedReader(new InputStreamReader(configStream));
+        String line;
+        while((line = reader.readLine()) != null) {
+            line = line.trim();
+            if (line.isEmpty() || line.startsWith("#")) {
+                continue;
+            }
+            int index = line.indexOf("=");
+            if (index == -1) {
+                throw new IllegalArgumentException("Invalid config, line: " + line);
+            }
+            final String key = line.substring(0, index).trim();
+            final String value = line.substring(index + 1).trim();
+            index = value.indexOf("|");
+            if (index == -1) {
+                throw new IllegalArgumentException("Invalid config, line: " + line);
+            }
+            final boolean needTransform = Boolean.valueOf(value.substring(0, index));
+            final Pattern pattern = Pattern.compile(value.substring(index + 1));
+            final ErrorCategory category = new ErrorCategory();
+            category.setName(key);
+            category.setNeedTransform(needTransform);
+            category.setPattern(pattern);
+            categories.add(category);
+        }
+    }
+    
+    public List<ErrorCategory> getErrorCategories() {
+        return categories;
+    }
+
+    public String classifyError(String error) throws IOException {
+        for (ErrorCategory category : categories) {
+            final String result = category.classify(error);
+            if (result != null) {
+                return result;
+            }
+        }
+        return "UNKNOWN";
+    }
+    
+    public static class ErrorCategory {
+        private String name;
+        private Pattern pattern;
+        private boolean needTransform;
+        
+        public String getName() {
+            return name;
+        }
+        public void setName(String name) {
+            this.name = name;
+        }
+        public Pattern getPattern() {
+            return pattern;
+        }
+        public void setPattern(Pattern pattern) {
+            this.pattern = pattern;
+        }
+        public boolean isNeedTransform() {
+            return needTransform;
+        }
+        public void setNeedTransform(boolean needTransform) {
+            this.needTransform = needTransform;
+        }
+        
+        public String classify(String error) {
+            Matcher matcher = pattern.matcher(error);
+            if (matcher.find()) {
+                if (!needTransform) {
+                    return name;
+                } else {
+                    return matcher.group(1);
+                }
+            }
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
new file mode 100644
index 0000000..d2ac944
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+/**
+ * Record types are identifiers for each line of log in history files. 
+ * A record type appears as the first token in a single line of log. 
+ */
+public enum RecordTypes {
+    Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
new file mode 100755
index 0000000..23e7072
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.TaskAttemptCounterAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class TaskAttemptCounterListener implements HistoryJobEntityCreationListener {
+    private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class);
+    private static final int BATCH_SIZE = 1000;
+    private Map<CounterKey, CounterValue> counters = new HashMap<>();
+    private JHFConfigManager configManager;
+
+    public TaskAttemptCounterListener(JHFConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    private static class CounterKey {
+        Map<String, String> tags = new HashMap<>();
+        long timestamp;
+        
+        @Override
+        public boolean equals(Object thatKey) {
+            if (!(thatKey instanceof CounterKey))
+                return false;
+            CounterKey that = (CounterKey)thatKey;
+            if (that.tags.equals(this.tags) && that.timestamp == this.timestamp)
+                return true;
+            return false;
+        }
+        
+        @Override
+        public int hashCode(){
+            return tags.hashCode() ^ Long.valueOf(timestamp).hashCode();
+        }
+    }
+    
+    private static class CounterValue {
+        int totalCount;
+        int failedCount;
+        int killedCount;
+    }
+    
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
+            return;
+        }
+        
+        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
+        
+        Map<String, String> tags = new HashMap<>();
+        tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
+        tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
+        tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
+        tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
+        tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
+        tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+
+        CounterKey key = new CounterKey();
+        key.tags = tags;
+        key.timestamp = roundToMinute(e.getEndTime());
+        
+        CounterValue value = counters.get(key);
+        if (value == null) {
+            value = new CounterValue();
+            counters.put(key, value);
+        }
+        
+        if (e.getTaskStatus().equals(EagleTaskStatus.FAILED.name())) {
+            value.failedCount++;
+        } else if(e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+            value.killedCount++;
+        }
+        value.totalCount++;
+    }
+    
+    private long roundToMinute(long timestamp) {
+        GregorianCalendar cal = new GregorianCalendar();
+        cal.setTimeInMillis(timestamp);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+        return cal.getTimeInMillis();
+    }
+    
+    @Override
+    public void flush() throws Exception {
+        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password);
+
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
+        logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size());
+        // create entity
+        for (Map.Entry<CounterKey, CounterValue> entry : counters.entrySet()) {
+            CounterKey key = entry.getKey();
+            CounterValue value = entry.getValue();
+            TaskAttemptCounterAPIEntity entity = new TaskAttemptCounterAPIEntity();
+            entity.setTags(key.tags);
+            entity.setTimestamp(key.timestamp);
+            entity.setTotalCount(value.totalCount);
+            entity.setFailedCount(value.failedCount);
+            entity.setKilledCount(value.killedCount);
+            list.add(entity);
+            
+            if (list.size() >= BATCH_SIZE) {
+                logger.info("start flushing TaskAttemptCounter " + list.size());
+                client.create(list);
+                logger.info("end  flushing TaskAttemptCounter " + list.size());
+                list.clear();
+            }
+        }
+
+        logger.info("start flushing rest of TaskAttemptCounter " + list.size());
+        client.create(list);
+        logger.info("end  flushing rest of TaskAttemptCounter " + list.size());
+        logger.info("end  flushing TaskAttemptCounter entities of total number " + counters.size());
+        counters.clear();
+        list.clear();
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
new file mode 100755
index 0000000..14cc882
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.history.entities.TaskFailureCountAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TaskFailureListener implements HistoryJobEntityCreationListener {
+    private static final Logger logger = LoggerFactory.getLogger(TaskFailureListener.class);
+    private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = "MRErrorCategory.config";
+    private static final int BATCH_SIZE = 1000;
+    private static final int MAX_RETRY_TIMES = 3;
+
+    private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>();
+    private final MRErrorClassifier classifier;
+	private JHFConfigManager configManager;
+
+    public TaskFailureListener(JHFConfigManager configManager) {
+        this.configManager = configManager;
+    	InputStream is = null;
+    	try {
+    		is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
+            URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
+            if (url != null) {
+                logger.info("Feeder is going to load configuration file: " + url.toString());
+            }
+    		classifier = new MRErrorClassifier(is);
+    	} catch (IOException ex) {
+    		throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
+    	} finally {
+    		if (is != null) {
+    			try {
+    				is.close();
+    			} catch (IOException e) {
+    			}
+    		}
+    	}
+    }
+
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+    	if (!(entity instanceof TaskAttemptExecutionAPIEntity))
+    		return;
+
+    	TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
+    	// only store those killed or failed tasks
+    	if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name()))
+    		return;
+
+    	TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
+    	Map<String, String> tags = new HashMap<>();
+    	failureTask.setTags(tags);
+    	tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString()));
+    	tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString()));
+    	tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString()));
+    	tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString()));
+    	tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString()));
+    	tags.put(EagleJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+    	tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString()));
+
+    	//TODO need optimize, match and then capture the data
+    	final String errCategory = classifier.classifyError(e.getError());
+    	tags.put(EagleJobTagName.ERROR_CATEGORY.toString(), errCategory);
+
+    	failureTask.setError(e.getError());
+    	failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
+    	failureTask.setTimestamp(e.getTimestamp());
+    	failureTask.setTaskStatus(e.getTaskStatus());
+    	failureTasks.add(failureTask);
+
+    	if (failureTasks.size() >= BATCH_SIZE) flush();
+    }
+    
+    @Override
+    public void flush() throws Exception {
+		JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+		JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+		IEagleServiceClient client = new EagleServiceClientImpl(
+				eagleServiceConfig.eagleServiceHost,
+				eagleServiceConfig.eagleServicePort,
+				eagleServiceConfig.username,
+				eagleServiceConfig.password);
+
+		client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+
+    	int tried = 0;
+    	while (tried <= MAX_RETRY_TIMES) {
+    		try {
+    			logger.info("start flushing entities of total number " + failureTasks.size());
+    			client.create(failureTasks);
+    			logger.info("finish flushing entities of total number " + failureTasks.size());
+    			failureTasks.clear();
+				break;
+    		} catch (Exception ex) {
+    			if (tried < MAX_RETRY_TIMES) {
+    				logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+    			} else {
+    				logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+    				throw ex;
+    			}
+    		}
+			tried ++;
+    	}
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+}



Mime
View raw message