eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [02/52] [abbrv] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework
Date Wed, 07 Sep 2016 17:41:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 2d960b0..4ca0449 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,8 +19,8 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +41,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
 
     /**
      * Create a new Event Reader
+     *
      * @throws IOException
      */
     public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -50,86 +51,115 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     @SuppressWarnings("deprecation")
     public void handleEvent(Event wrapper) throws Exception {
         switch (wrapper.type) {
-        case JOB_SUBMITTED:
-            handleJobSubmitted(wrapper); break;
-        case JOB_INITED:
-            handleJobInited(wrapper); break;
-        case JOB_FINISHED:
-            handleJobFinished(wrapper); break;
-        case JOB_PRIORITY_CHANGED:
-            handleJobPriorityChanged(); break;
-        case JOB_STATUS_CHANGED:
-            handleJobStatusChanged(); break;
-        case JOB_FAILED:
-        case JOB_KILLED:
-        case JOB_ERROR:
-            handleJobUnsuccessfulCompletion(wrapper); break;
-        case JOB_INFO_CHANGED:
-            handleJobInfoChanged(); break;
-        case JOB_QUEUE_CHANGED:
-            handleJobQueueChanged();break;
-        case TASK_STARTED:
-            handleTaskStarted(wrapper); break;
-        case TASK_FINISHED:
-            handleTaskFinished(wrapper); break;
-        case TASK_FAILED:
-            handleTaskFailed(wrapper); break;
-        case TASK_UPDATED:
-            handleTaskUpdated(); break;
-
-        // map task
-        case MAP_ATTEMPT_STARTED:
-            handleMapAttemptStarted(wrapper); break;
-        case MAP_ATTEMPT_FINISHED:
-            handleMapAttemptFinished(wrapper); break;
-        case MAP_ATTEMPT_FAILED:
-            handleMapAttemptFailed(wrapper); break;
-        case MAP_ATTEMPT_KILLED:
-            handleMapAttemptKilled(wrapper); break;
-
-        // reduce task
-        case REDUCE_ATTEMPT_STARTED:
-            handleReduceAttemptStarted(wrapper); break;
-        case REDUCE_ATTEMPT_FINISHED:
-            handleReduceAttemptFinished(wrapper); break;
-        case REDUCE_ATTEMPT_FAILED:
-            handleReduceAttemptFailed(wrapper); break;
-        case REDUCE_ATTEMPT_KILLED:
-            handleReduceAttemptKilled(wrapper); break;
-
-        // set up task
-        case SETUP_ATTEMPT_STARTED:
-            break;
-        case SETUP_ATTEMPT_FINISHED:
-            handleSetupAttemptFinished(); break;
-        case SETUP_ATTEMPT_FAILED:
-            handleSetupAttemptFailed(); break;
-        case SETUP_ATTEMPT_KILLED:
-            handleSetupAttemptKilled(); break;
-
-        // clean up task
-        case CLEANUP_ATTEMPT_STARTED:
-            break;
-        case CLEANUP_ATTEMPT_FINISHED:
-            handleCleanupAttemptFinished(); break;
-        case CLEANUP_ATTEMPT_FAILED:
-            handleCleanupAttemptFailed(); break;
-        case CLEANUP_ATTEMPT_KILLED:
-            handleCleanupAttemptKilled(); break;
-
-        case AM_STARTED:
-            handleAMStarted(); break;
-        default:
-            logger.warn("unexpected event type: " + wrapper.type);
+            case JOB_SUBMITTED:
+                handleJobSubmitted(wrapper);
+                break;
+            case JOB_INITED:
+                handleJobInited(wrapper);
+                break;
+            case JOB_FINISHED:
+                handleJobFinished(wrapper);
+                break;
+            case JOB_PRIORITY_CHANGED:
+                handleJobPriorityChanged();
+                break;
+            case JOB_STATUS_CHANGED:
+                handleJobStatusChanged();
+                break;
+            case JOB_FAILED:
+            case JOB_KILLED:
+            case JOB_ERROR:
+                handleJobUnsuccessfulCompletion(wrapper);
+                break;
+            case JOB_INFO_CHANGED:
+                handleJobInfoChanged();
+                break;
+            case JOB_QUEUE_CHANGED:
+                handleJobQueueChanged();
+                break;
+            case TASK_STARTED:
+                handleTaskStarted(wrapper);
+                break;
+            case TASK_FINISHED:
+                handleTaskFinished(wrapper);
+                break;
+            case TASK_FAILED:
+                handleTaskFailed(wrapper);
+                break;
+            case TASK_UPDATED:
+                handleTaskUpdated();
+                break;
+
+            // map task
+            case MAP_ATTEMPT_STARTED:
+                handleMapAttemptStarted(wrapper);
+                break;
+            case MAP_ATTEMPT_FINISHED:
+                handleMapAttemptFinished(wrapper);
+                break;
+            case MAP_ATTEMPT_FAILED:
+                handleMapAttemptFailed(wrapper);
+                break;
+            case MAP_ATTEMPT_KILLED:
+                handleMapAttemptKilled(wrapper);
+                break;
+
+            // reduce task
+            case REDUCE_ATTEMPT_STARTED:
+                handleReduceAttemptStarted(wrapper);
+                break;
+            case REDUCE_ATTEMPT_FINISHED:
+                handleReduceAttemptFinished(wrapper);
+                break;
+            case REDUCE_ATTEMPT_FAILED:
+                handleReduceAttemptFailed(wrapper);
+                break;
+            case REDUCE_ATTEMPT_KILLED:
+                handleReduceAttemptKilled(wrapper);
+                break;
+
+            // set up task
+            case SETUP_ATTEMPT_STARTED:
+                break;
+            case SETUP_ATTEMPT_FINISHED:
+                handleSetupAttemptFinished();
+                break;
+            case SETUP_ATTEMPT_FAILED:
+                handleSetupAttemptFailed();
+                break;
+            case SETUP_ATTEMPT_KILLED:
+                handleSetupAttemptKilled();
+                break;
+
+            // clean up task
+            case CLEANUP_ATTEMPT_STARTED:
+                break;
+            case CLEANUP_ATTEMPT_FINISHED:
+                handleCleanupAttemptFinished();
+                break;
+            case CLEANUP_ATTEMPT_FAILED:
+                handleCleanupAttemptFailed();
+                break;
+            case CLEANUP_ATTEMPT_KILLED:
+                handleCleanupAttemptKilled();
+                break;
+
+            case AM_STARTED:
+                handleAMStarted();
+                break;
+            default:
+                logger.warn("unexpected event type: " + wrapper.type);
         }
     }
 
     private void handleJobPriorityChanged() throws Exception {
         return;
     }
+
     private void handleJobStatusChanged() throws Exception {
         return;
     }
+
     private void handleJobInfoChanged() throws Exception {
         return;
     }
@@ -139,150 +169,285 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     }
 
     private void handleJobSubmitted(Event wrapper) throws Exception {
-        JobSubmitted js = ((JobSubmitted)wrapper.getEvent());
+        JobSubmitted js = ((JobSubmitted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getJobName() != null) values.put(Keys.JOBNAME, js.getJobName().toString());
-        if (js.getUserName() != null) values.put(Keys.USER, js.getUserName().toString());
-        if (js.getSubmitTime() != null) values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
-        if (js.getJobQueueName() != null) values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getJobName() != null) {
+            values.put(Keys.JOBNAME, js.getJobName().toString());
+        }
+        if (js.getUserName() != null) {
+            values.put(Keys.USER, js.getUserName().toString());
+        }
+        if (js.getSubmitTime() != null) {
+            values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
+        }
+        if (js.getJobQueueName() != null) {
+            values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleJobInited(Event wrapper) throws Exception {
-        JobInited js = ((JobInited)wrapper.getEvent());
+        JobInited js = ((JobInited) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getLaunchTime() != null) values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
-        if (js.getTotalMaps() != null) values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
-        if (js.getTotalReduces() != null) values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
-        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
-        if (js.getUberized() != null) values.put(Keys.UBERISED, js.getUberized().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getLaunchTime() != null) {
+            values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
+        }
+        if (js.getTotalMaps() != null) {
+            values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
+        }
+        if (js.getTotalReduces() != null) {
+            values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
+        }
+        if (js.getJobStatus() != null) {
+            values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        }
+        if (js.getUberized() != null) {
+            values.put(Keys.UBERISED, js.getUberized().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleJobFinished(Event wrapper) throws Exception {
-        JobFinished js = ((JobFinished)wrapper.getEvent());
+        JobFinished js = ((JobFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
-        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
-        if (js.getFailedMaps() != null) values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
-        if (js.getFailedReduces() != null) values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getFinishedMaps() != null) {
+            values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        }
+        if (js.getFinishedReduces() != null) {
+            values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        }
+        if (js.getFailedMaps() != null) {
+            values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
+        }
+        if (js.getFailedReduces() != null) {
+            values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+        }
         values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
         handleJob(wrapper.getType(), values, js.getTotalCounters());
     }
 
     private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
-        JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion)wrapper.getEvent());
+        JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
-        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
-        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getFinishedMaps() != null) {
+            values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        }
+        if (js.getFinishedReduces() != null) {
+            values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        }
+        if (js.getJobStatus() != null) {
+            values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleTaskStarted(Event wrapper) throws Exception {
-        TaskStarted js = ((TaskStarted)wrapper.getEvent());
+        TaskStarted js = ((TaskStarted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
-        if (js.getSplitLocations() != null) values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getStartTime() != null) {
+            values.put(Keys.START_TIME, js.getStartTime().toString());
+        }
+        if (js.getSplitLocations() != null) {
+            values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, null);
     }
 
     private void handleTaskFinished(Event wrapper) throws Exception {
-        TaskFinished js = ((TaskFinished)wrapper.getEvent());
+        TaskFinished js = ((TaskFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
     }
 
     private void handleTaskFailed(Event wrapper) throws Exception {
-        TaskFailed js = ((TaskFailed)wrapper.getEvent());
+        TaskFailed js = ((TaskFailed) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
 
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
-        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
-        if (js.getFailedDueToAttempt() != null) values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        }
+        if (js.getError() != null) {
+            values.put(Keys.ERROR, js.getError().toString());
+        }
+        if (js.getFailedDueToAttempt() != null) {
+            values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
     }
 
     private String normalizeTaskStatus(String taskStatus) {
-        if (taskStatus.equals("SUCCEEDED"))
+        if (taskStatus.equals("SUCCEEDED")) {
             return EagleTaskStatus.SUCCESS.name();
+        }
         return taskStatus;
     }
 
-    private void handleTaskUpdated(){
+    private void handleTaskUpdated() {
         return;
     }
 
     private void handleMapAttemptStarted(Event wrapper) throws Exception {
         handleTaskAttemptStarted(wrapper, RecordTypes.MapAttempt);
     }
+
     private void handleReduceAttemptStarted(Event wrapper) throws Exception {
         handleTaskAttemptStarted(wrapper, RecordTypes.ReduceAttempt);
     }
 
     private void handleTaskAttemptStarted(Event wrapper, RecordTypes recordType) throws Exception {
-        TaskAttemptStarted js = ((TaskAttemptStarted)wrapper.getEvent());
+        TaskAttemptStarted js = ((TaskAttemptStarted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
-        if (js.getTrackerName() != null) values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
-        if (js.getHttpPort() != null) values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
-        if (js.getShufflePort() != null) values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
-        if (js.getLocality() != null) values.put(Keys.LOCALITY, js.getLocality().toString());
-        if (js.getAvataar() != null) values.put(Keys.AVATAAR, js.getAvataar().toString());
-        handleTask(recordType,wrapper.getType(), values, null);
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getStartTime() != null) {
+            values.put(Keys.START_TIME, js.getStartTime().toString());
+        }
+        if (js.getTrackerName() != null) {
+            values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
+        }
+        if (js.getHttpPort() != null) {
+            values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
+        }
+        if (js.getShufflePort() != null) {
+            values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
+        }
+        if (js.getLocality() != null) {
+            values.put(Keys.LOCALITY, js.getLocality().toString());
+        }
+        if (js.getAvataar() != null) {
+            values.put(Keys.AVATAAR, js.getAvataar().toString());
+        }
+        handleTask(recordType, wrapper.getType(), values, null);
     }
 
     private void handleMapAttemptFinished(Event wrapper) throws Exception {
-        MapAttemptFinished js = ((MapAttemptFinished)wrapper.getEvent());
+        MapAttemptFinished js = ((MapAttemptFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getMapFinishTime() != null) values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getTaskStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getMapFinishTime() != null) {
+            values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getState() != null) {
+            values.put(Keys.STATE_STRING, js.getState().toString());
+        }
 
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(RecordTypes.MapAttempt,wrapper.getType(), values, js.getCounters());
+        handleTask(RecordTypes.MapAttempt, wrapper.getType(), values, js.getCounters());
     }
+
     private void handleReduceAttemptFinished(Event wrapper) throws Exception {
-        ReduceAttemptFinished js = ((ReduceAttemptFinished)wrapper.getEvent());
+        ReduceAttemptFinished js = ((ReduceAttemptFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getShuffleFinishTime() != null) values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
-        if (js.getSortFinishTime() != null) values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getTaskStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getShuffleFinishTime() != null) {
+            values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
+        }
+        if (js.getSortFinishTime() != null) {
+            values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getState() != null) {
+            values.put(Keys.STATE_STRING, js.getState().toString());
+        }
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(RecordTypes.ReduceAttempt,wrapper.getType(), values, js.getCounters());
+        handleTask(RecordTypes.ReduceAttempt, wrapper.getType(), values, js.getCounters());
     }
 
     private void ensureRackAfterAttemptFinish(String rackname, Map<Keys, String> values) {
@@ -296,61 +461,89 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     private void handleMapAttemptFailed(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
     }
+
     private void handleReduceAttemptFailed(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
     }
-    private void handleMapAttemptKilled(Event wrapper)throws Exception {
+
+    private void handleMapAttemptKilled(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
     }
-    private void handleReduceAttemptKilled(Event wrapper)throws Exception {
+
+    private void handleReduceAttemptKilled(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
     }
-    
+
     private void handleTaskAttemptFailed(Event wrapper, RecordTypes recordType) throws Exception {
-        TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion)wrapper.getEvent());
+        TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, js.getStatus().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getError() != null) {
+            values.put(Keys.ERROR, js.getError().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, js.getStatus().toString());
+        }
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(recordType,wrapper.getType(), values, js.getCounters());
-    }    
-    private void handleSetupAttemptFinished(){
+        handleTask(recordType, wrapper.getType(), values, js.getCounters());
+    }
+
+    private void handleSetupAttemptFinished() {
         return;
     }
-    private void handleSetupAttemptFailed(){
+
+    private void handleSetupAttemptFailed() {
         return;
     }
-    private void handleSetupAttemptKilled(){
+
+    private void handleSetupAttemptKilled() {
         return;
     }
-    private void handleCleanupAttemptFinished(){
+
+    private void handleCleanupAttemptFinished() {
         return;
     }
-    private void handleCleanupAttemptFailed(){
+
+    private void handleCleanupAttemptFailed() {
         return;
     }
-    private void handleCleanupAttemptKilled(){
+
+    private void handleCleanupAttemptKilled() {
         return;
     }
-    private void handleAMStarted(){
+
+    private void handleAMStarted() {
         return;
     }
 
-    protected JobCounters parseCounters(Object value) throws IOException{
+    protected JobCounters parseCounters(Object value) throws IOException {
         JobCounters jc = new JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<>();
-        JhCounters counters = (JhCounters)value;
+        JhCounters counters = (JhCounters) value;
         List<JhCounterGroup> list = counters.getGroups();
         for (JhCounterGroup cg : list) {
             String cgName = cg.getName().toString();
-            if(!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
+            if (!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.TaskCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.JobCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
@@ -359,7 +552,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
                 && !cgName.equals("org.apache.hadoop.mapred.Task$Counter")                        // for artemis
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") // for artemis
-            ) continue;
+                ) {
+                continue;
+            }
 
             groups.put(cgName, new HashMap<String, Long>());
             Map<String, Long> counterValues = groups.get(cgName);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
index c289e4d..2ccbf8d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
@@ -19,8 +19,9 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.avro.Schema;
-import org.apache.avro.io.*;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.mapreduce.jobhistory.Event;
 import org.slf4j.Logger;
@@ -33,35 +34,35 @@ import java.io.InputStream;
 public class JHFMRVer2Parser implements JHFParserBase {
     private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class);
     private JHFMRVer2EventReader _reader;
-    
-    public JHFMRVer2Parser(JHFMRVer2EventReader reader){
+
+    public JHFMRVer2Parser(JHFMRVer2EventReader reader) {
         this._reader = reader;
     }
 
-    @SuppressWarnings({ "rawtypes", "deprecation" })
+    @SuppressWarnings( {"rawtypes", "deprecation"})
     @Override
     public void parse(InputStream is) throws Exception {
         int eventCtr = 0;
         try {
-             long start = System.currentTimeMillis();
-             DataInputStream in = new DataInputStream(is);
-             String version = in.readLine();
-             if (!"Avro-Json".equals(version)) {
-                 throw new IOException("Incompatible event log version: " + version);
-             }
-                
-             Schema schema = Schema.parse(in.readLine());
-             SpecificDatumReader datumReader = new SpecificDatumReader(schema);
-             JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
+            long start = System.currentTimeMillis();
+            DataInputStream in = new DataInputStream(is);
+            String version = in.readLine();
+            if (!"Avro-Json".equals(version)) {
+                throw new IOException("Incompatible event log version: " + version);
+            }
+
+            Schema schema = Schema.parse(in.readLine());
+            SpecificDatumReader datumReader = new SpecificDatumReader(schema);
+            JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
 
-             Event wrapper;
-             while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
-                 ++eventCtr;
-                 _reader.handleEvent(wrapper);
-             }
-             _reader.parseConfiguration();
-             // don't need put to finally as it's a kind of flushing data
-             _reader.close();
+            Event wrapper;
+            while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
+                ++eventCtr;
+                _reader.handleEvent(wrapper);
+            }
+            _reader.parseConfiguration();
+            // don't need put to finally as it's a kind of flushing data
+            _reader.close();
             logger.info("reader used " + (System.currentTimeMillis() - start) + "ms");
         } catch (Exception ioe) {
             logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe);
@@ -71,13 +72,13 @@ public class JHFMRVer2Parser implements JHFParserBase {
                 is.close();
             }
         }
-      }
-    
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    }
+
+    @SuppressWarnings( {"rawtypes", "unchecked"})
     public Event getNextEvent(DatumReader datumReader, JsonDecoder decoder) throws Exception {
         Event wrapper;
         try {
-            wrapper = (Event)datumReader.read(null, decoder);
+            wrapper = (Event) datumReader.read(null, decoder);
         } catch (java.io.EOFException e) {            // at EOF
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
index e4f437f..a2479ab 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
@@ -21,13 +21,12 @@ package org.apache.eagle.jpm.mr.history.parser;
 import java.io.InputStream;
 
 /**
- *
  * @author yonzhang
- *
  */
 public interface JHFParserBase {
     /**
      * this method will ensure to close the inputstream
+     *
      * @param is
      * @throws Exception
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index dd640c2..718612d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -30,7 +30,7 @@ public class JHFParserFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
 
-    public static JHFParserBase getParser(JHFConfigManager configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+    public static JHFParserBase getParser(MRHistoryJobConfig configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
         String format = configManager.getJobExtractorConfig().mrVersion;
         JHFParserBase parser;
         JHFFormat f;
@@ -40,31 +40,31 @@ public class JHFParserFactory {
             } else {
                 f = JHFFormat.valueOf(format);
             }
-        } catch(IllegalArgumentException ex) {
+        } catch (IllegalArgumentException ex) {
             f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2
         }
-        
+
         switch (f) {
-        case MRVer2:
-            JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
-            reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
-            reader2.addListener(new TaskFailureListener(configManager));
-            reader2.addListener(new TaskAttemptCounterListener(configManager));
-            reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
+            case MRVer2:
+                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+                reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
+                reader2.addListener(new TaskFailureListener(configManager));
+                reader2.addListener(new TaskAttemptCounterListener(configManager));
+                reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
 
-            reader2.register(new JobEntityLifecycleAggregator());
-            parser = new JHFMRVer2Parser(reader2);
-            break;
-        case MRVer1:
-        default:
-            JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
-            reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
-            reader1.addListener(new TaskFailureListener(configManager));
-            reader1.addListener(new TaskAttemptCounterListener(configManager));
+                reader2.register(new JobEntityLifecycleAggregator());
+                parser = new JHFMRVer2Parser(reader2);
+                break;
+            case MRVer1:
+            default:
+                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+                reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
+                reader1.addListener(new TaskFailureListener(configManager));
+                reader1.addListener(new TaskAttemptCounterListener(configManager));
 
-            reader1.register(new JobEntityLifecycleAggregator());
-            parser = new JHFMRVer1Parser(reader1);
-            break;
+                reader1.register(new JobEntityLifecycleAggregator());
+                parser = new JHFMRVer1Parser(reader1);
+                break;
         }
         return parser;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
index 4529f8b..d5ed97a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
@@ -21,16 +21,16 @@ package org.apache.eagle.jpm.mr.history.parser;
 /**
  * used to warn that one job history file has not yet completed writing to hdfs
  * This happens when feeder catches up and the history file has not been written into hdfs completely
- * @author yonzhang
  *
+ * @author yonzhang
  */
 public class JHFWriteNotCompletedException extends Exception {
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = -3060175780718218490L;
 
-    public JHFWriteNotCompletedException(String msg){
+    public JHFWriteNotCompletedException(String msg) {
         super(msg);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 0334c9b..4163f7b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.JobConfigurationAPIEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -32,10 +32,10 @@ import java.util.List;
 public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener {
     private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
     private static final int MAX_RETRY_TIMES = 3;
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
     private JobConfigurationAPIEntity m_jobConfigurationEntity;
 
-    public JobConfigurationCreationServiceListener(JHFConfigManager configManager) {
+    public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
     }
 
@@ -43,7 +43,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         if (entity != null) {
             if (entity instanceof JobConfigurationAPIEntity) {
-                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity)entity;
+                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
             }
         }
     }
@@ -55,13 +55,13 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
 
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
@@ -76,7 +76,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
                 break;
             } catch (Exception ex) {
                 if (tried < MAX_RETRY_TIMES) {
-                    logger.error("Got exception to flush, retry as " + (tried + 1) +" times",ex);
+                    logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
                 } else {
                     logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
                 }
@@ -86,7 +86,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
                 client.getJerseyClient().destroy();
                 client.close();
             }
-            tried ++;
+            tried++;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index e1a3c69..abddf3b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -34,23 +34,24 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     private static final int BATCH_SIZE = 1000;
     private int batchSize;
     private List<JobBaseAPIEntity> list = new ArrayList<>();
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
     List<JobExecutionAPIEntity> jobs = new ArrayList<>();
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
-    
-    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager){
+
+    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
         this(configManager, BATCH_SIZE);
     }
-    
-    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager, int batchSize) {
+
+    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager, int batchSize) {
         this.configManager = configManager;
-        if (batchSize <= 0)
+        if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
+        }
         this.batchSize = batchSize;
     }
-    
+
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         list.add(entity);
@@ -65,26 +66,26 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
      */
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
         for (int i = 0; i < list.size(); i++) {
             JobBaseAPIEntity entity = list.get(i);
             if (entity instanceof JobExecutionAPIEntity) {
-                jobs.add((JobExecutionAPIEntity)entity);
-            } else if(entity instanceof JobEventAPIEntity) {
-                jobEvents.add((JobEventAPIEntity)entity);
-            } else if(entity instanceof TaskExecutionAPIEntity) {
-                taskExecs.add((TaskExecutionAPIEntity)entity);
-            } else if(entity instanceof TaskAttemptExecutionAPIEntity) {
-                taskAttemptExecs.add((TaskAttemptExecutionAPIEntity)entity);
+                jobs.add((JobExecutionAPIEntity) entity);
+            } else if (entity instanceof JobEventAPIEntity) {
+                jobEvents.add((JobEventAPIEntity) entity);
+            } else if (entity instanceof TaskExecutionAPIEntity) {
+                taskExecs.add((TaskExecutionAPIEntity) entity);
+            } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+                taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
             }
         }
         GenericServiceAPIResponseEntity result;
@@ -117,7 +118,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
         client.getJerseyClient().destroy();
         client.close();
     }
-    
+
     private void checkResult(GenericServiceAPIResponseEntity result) throws Exception {
         if (!result.isSuccess()) {
             logger.error(result.getException());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index 6e0371c..8b85b26 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -24,21 +24,22 @@ import java.util.Vector;
 
 /**
  * not thread safe
- * @author yonzhang
  *
+ * @author yonzhang
  */
 public class JobEntityCreationPublisher {
     private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
-    public void addListener(HistoryJobEntityCreationListener l){
+
+    public void addListener(HistoryJobEntityCreationListener l) {
         listeners.add(l);
     }
-    
+
     public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.jobEntityCreated(entity);
         }
     }
-    
+
     public void flush() throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.flush();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index d97e2a3..594d8e2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -53,10 +53,10 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
-        if (entity != null ) {
+        if (entity != null) {
             if (entity instanceof TaskAttemptExecutionAPIEntity) {
                 taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
-            } else if(entity instanceof JobExecutionAPIEntity) {
+            } else if (entity instanceof JobExecutionAPIEntity) {
                 this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
             }
         }
@@ -74,24 +74,28 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
             JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
 
             if (jobCounters == null) {
-                LOG.warn("no job counter found for "+this.m_jobExecutionAPIEntity);
+                LOG.warn("no job counter found for " + this.m_jobExecutionAPIEntity);
                 jobCounters = new JobCounters();
             }
 
-            Map<String,Map<String,Long>> counters = jobCounters.getCounters();
+            Map<String, Map<String, Long>> counters = jobCounters.getCounters();
 
-            Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
-            if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
+            Map<String, Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+            if (mapTaskAttemptCounter == null) {
+                mapTaskAttemptCounter = new HashMap<>();
+            }
             mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
-            counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+            counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER, mapTaskAttemptCounter);
 
-            Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
-            if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
+            Map<String, Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+            if (reduceTaskAttemptCounter == null) {
+                reduceTaskAttemptCounter = new HashMap<>();
+            }
             reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
-            counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER, reduceTaskAttemptCounter);
 
             counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
-            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_reduceFileSystemTaskCounterAgg.result());
 
             jobCounters.setCounters(counters);
 
@@ -122,9 +126,9 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
         ObjectMapper objectMapper = new ObjectMapper();
         try {
-            LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity));
+            LOG.warn("Unknown task type of task attempt execution entity: " + objectMapper.writeValueAsString(entity));
         } catch (Exception e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
         }
     }
 
@@ -137,13 +141,14 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
     interface JobCounterAggregateFunction {
         void accumulate(Map<String, Long> mapCounters);
-        Map<String,Long> result();
+
+        Map<String, Long> result();
     }
 
     static class JobCounterSumFunction implements JobCounterAggregateFunction {
         final Map<String, Long> result;
 
-        public JobCounterSumFunction(){
+        public JobCounterSumFunction() {
             result = new HashMap<>();
         }
 
@@ -153,7 +158,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
         @Override
         public void accumulate(Map<String, Long> counters) {
             if (counters != null) {
-                for (Map.Entry<String,Long> taskEntry: counters.entrySet()) {
+                for (Map.Entry<String, Long> taskEntry : counters.entrySet()) {
                     String counterName = taskEntry.getKey();
                     long counterValue = taskEntry.getValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
index a2f4c3a..e65ec17 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
@@ -28,13 +28,13 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public final class MRErrorClassifier {
-    
+
     private final List<ErrorCategory> categories = new ArrayList<ErrorCategory>();
 
     public MRErrorClassifier(InputStream configStream) throws IOException {
         final BufferedReader reader = new BufferedReader(new InputStreamReader(configStream));
         String line;
-        while((line = reader.readLine()) != null) {
+        while ((line = reader.readLine()) != null) {
             line = line.trim();
             if (line.isEmpty() || line.startsWith("#")) {
                 continue;
@@ -58,7 +58,7 @@ public final class MRErrorClassifier {
             categories.add(category);
         }
     }
-    
+
     public List<ErrorCategory> getErrorCategories() {
         return categories;
     }
@@ -72,31 +72,36 @@ public final class MRErrorClassifier {
         }
         return "UNKNOWN";
     }
-    
+
     public static class ErrorCategory {
         private String name;
         private Pattern pattern;
         private boolean needTransform;
-        
+
         public String getName() {
             return name;
         }
+
         public void setName(String name) {
             this.name = name;
         }
+
         public Pattern getPattern() {
             return pattern;
         }
+
         public void setPattern(Pattern pattern) {
             this.pattern = pattern;
         }
+
         public boolean isNeedTransform() {
             return needTransform;
         }
+
         public void setNeedTransform(boolean needTransform) {
             this.needTransform = needTransform;
         }
-        
+
         public String classify(String error) {
             Matcher matcher = pattern.matcher(error);
             if (matcher.find()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
index d2ac944..be23051 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
@@ -17,9 +17,10 @@
 */
 
 package org.apache.eagle.jpm.mr.history.parser;
+
 /**
- * Record types are identifiers for each line of log in history files. 
- * A record type appears as the first token in a single line of log. 
+ * Record types are identifiers for each line of log in history files.
+ * A record type appears as the first token in a single line of log.
  */
 public enum RecordTypes {
     Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index d819baf..efc43c5 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
@@ -34,46 +34,48 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
     private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class);
     private static final int BATCH_SIZE = 1000;
     private Map<CounterKey, CounterValue> counters = new HashMap<>();
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
 
-    public TaskAttemptCounterListener(JHFConfigManager configManager) {
+    public TaskAttemptCounterListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
     }
 
     private static class CounterKey {
         Map<String, String> tags = new HashMap<>();
         long timestamp;
-        
+
         @Override
         public boolean equals(Object thatKey) {
-            if (!(thatKey instanceof CounterKey))
+            if (!(thatKey instanceof CounterKey)) {
                 return false;
-            CounterKey that = (CounterKey)thatKey;
-            if (that.tags.equals(this.tags) && that.timestamp == this.timestamp)
+            }
+            CounterKey that = (CounterKey) thatKey;
+            if (that.tags.equals(this.tags) && that.timestamp == this.timestamp) {
                 return true;
+            }
             return false;
         }
-        
+
         @Override
-        public int hashCode(){
+        public int hashCode() {
             return tags.hashCode() ^ Long.valueOf(timestamp).hashCode();
         }
     }
-    
+
     private static class CounterValue {
         int totalCount;
         int failedCount;
         int killedCount;
     }
-    
+
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
             return;
         }
-        
-        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
-        
+
+        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+
         Map<String, String> tags = new HashMap<>();
         tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
         tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
@@ -85,21 +87,21 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         CounterKey key = new CounterKey();
         key.tags = tags;
         key.timestamp = roundToMinute(e.getEndTime());
-        
+
         CounterValue value = counters.get(key);
         if (value == null) {
             value = new CounterValue();
             counters.put(key, value);
         }
-        
+
         if (e.getTaskStatus().equals(EagleTaskStatus.FAILED.name())) {
             value.failedCount++;
-        } else if(e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+        } else if (e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
             value.killedCount++;
         }
         value.totalCount++;
     }
-    
+
     private long roundToMinute(long timestamp) {
         GregorianCalendar cal = new GregorianCalendar();
         cal.setTimeInMillis(timestamp);
@@ -107,16 +109,16 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         cal.set(Calendar.MILLISECOND, 0);
         return cal.getTimeInMillis();
     }
-    
+
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
@@ -132,7 +134,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
             entity.setFailedCount(value.failedCount);
             entity.setKilledCount(value.killedCount);
             list.add(entity);
-            
+
             if (list.size() >= BATCH_SIZE) {
                 logger.info("start flushing TaskAttemptCounter " + list.size());
                 client.create(list);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 6e42fe2..e0c3c6b 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskFailureCountAPIEntity;
@@ -44,94 +44,98 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
 
     private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>();
     private final MRErrorClassifier classifier;
-	private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
 
-    public TaskFailureListener(JHFConfigManager configManager) {
+    public TaskFailureListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
-    	InputStream is = null;
-    	try {
-    		is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
+        InputStream is = null;
+        try {
+            is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             if (url != null) {
                 logger.info("Feeder is going to load configuration file: " + url.toString());
             }
-    		classifier = new MRErrorClassifier(is);
-    	} catch (IOException ex) {
-    		throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
-    	} finally {
-    		if (is != null) {
-    			try {
-    				is.close();
-    			} catch (IOException e) {
-    			}
-    		}
-    	}
+            classifier = new MRErrorClassifier(is);
+        } catch (IOException ex) {
+            throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                }
+            }
+        }
     }
 
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
-    	if (!(entity instanceof TaskAttemptExecutionAPIEntity))
-    		return;
+        if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
+            return;
+        }
 
-    	TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
-    	// only store those killed or failed tasks
-    	if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name()))
-    		return;
+        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+        // only store those killed or failed tasks
+        if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+            return;
+        }
 
-    	TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
-    	Map<String, String> tags = new HashMap<>();
-    	failureTask.setTags(tags);
-    	tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
-    	tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
-    	tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
-    	tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
-    	tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
-    	tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
-    	tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
+        TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
+        Map<String, String> tags = new HashMap<>();
+        failureTask.setTags(tags);
+        tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+        tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+        tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+        tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+        tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+        tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+        tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
 
-    	//TODO need optimize, match and then capture the data
-    	final String errCategory = classifier.classifyError(e.getError());
-    	tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
+        //TODO need optimize, match and then capture the data
+        final String errCategory = classifier.classifyError(e.getError());
+        tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
 
-    	failureTask.setError(e.getError());
-    	failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
-    	failureTask.setTimestamp(e.getTimestamp());
-    	failureTask.setTaskStatus(e.getTaskStatus());
-    	failureTasks.add(failureTask);
+        failureTask.setError(e.getError());
+        failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
+        failureTask.setTimestamp(e.getTimestamp());
+        failureTask.setTaskStatus(e.getTaskStatus());
+        failureTasks.add(failureTask);
 
-    	if (failureTasks.size() >= BATCH_SIZE) flush();
+        if (failureTasks.size() >= BATCH_SIZE) {
+            flush();
+        }
     }
-    
+
     @Override
     public void flush() throws Exception {
-		JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-		JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
-		IEagleServiceClient client = new EagleServiceClientImpl(
-				eagleServiceConfig.eagleServiceHost,
-				eagleServiceConfig.eagleServicePort,
-				eagleServiceConfig.username,
-				eagleServiceConfig.password);
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-		client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
 
-    	int tried = 0;
-    	while (tried <= MAX_RETRY_TIMES) {
-    		try {
-    			logger.info("start flushing entities of total number " + failureTasks.size());
-    			client.create(failureTasks);
-    			logger.info("finish flushing entities of total number " + failureTasks.size());
-    			failureTasks.clear();
-				break;
-    		} catch (Exception ex) {
-    			if (tried < MAX_RETRY_TIMES) {
-    				logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
-    			} else {
-    				logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
-    				throw ex;
-    			}
-    		}
-			tried ++;
-    	}
+        int tried = 0;
+        while (tried <= MAX_RETRY_TIMES) {
+            try {
+                logger.info("start flushing entities of total number " + failureTasks.size());
+                client.create(failureTasks);
+                logger.info("finish flushing entities of total number " + failureTasks.size());
+                failureTasks.clear();
+                break;
+            } catch (Exception ex) {
+                if (tried < MAX_RETRY_TIMES) {
+                    logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+                } else {
+                    logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                    throw ex;
+                }
+            }
+            tried++;
+        }
         client.getJerseyClient().destroy();
         client.close();
     }


Mime
View raw message