eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [7/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring
Date Tue, 05 Jul 2016 18:07:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
new file mode 100644
index 0000000..e357cf6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.common;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.mr.history.storm.JobIdPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class JHFConfigManager implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(JHFConfigManager.class);
+
+    private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
+
+    public String getEnv() {
+        return env;
+    }
+    private String env;
+
+    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    private ZKStateConfig zkStateConfig;
+
+    public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { return jobHistoryEndpointConfig; }
+    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
+
+    public ControlConfig getControlConfig() { return controlConfig; }
+    private ControlConfig controlConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() { return jobExtractorConfig; }
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+    private EagleServiceConfig eagleServiceConfig;
+
+    public Config getConfig() {
+        return config;
+    }
+    private Config config;
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+    }
+
+    public static class JobHistoryEndpointConfig implements Serializable {
+        public String nnEndpoint;
+        public String basePath;
+        public boolean pathContainsJobTrackerName;
+        public String jobTrackerName;
+        public String principal;
+        public String keyTab;
+    }
+
+    public static class ControlConfig implements Serializable {
+        public boolean dryRun;
+        public Class<? extends JobIdPartitioner> partitionerCls;
+        public boolean zeroBasedMonth;
+        public String timeZone;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public String mrVersion;
+        public int readTimeoutSeconds;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public String username;
+        public String password;
+    }
+
+    private static JHFConfigManager manager = new JHFConfigManager();
+
+    /**
+     * As this is singleton object and constructed while this class is being initialized,
+     * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError.
+     * And this is unrecoverable and hard to troubleshooting.
+     */
+    private JHFConfigManager() {
+        this.zkStateConfig = new ZKStateConfig();
+        this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
+        this.controlConfig = new ControlConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.eagleServiceConfig = new EagleServiceConfig();
+    }
+
+    public static JHFConfigManager getInstance(String []args) {
+        manager.init(args);
+        return manager;
+    }
+
+    /**
+     * read configuration file and load hbase config etc
+     */
+    private void init(String[] args) {
+        // TODO: Probably we can remove the properties file path check in future
+        try {
+            LOG.info("Loading from configuration file");
+            this.config = new ConfigOptionParser().load(args);
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+
+        this.env = config.getString("envContextConfig.env");
+
+        //parse eagle job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
+        this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
+        //parse eagle zk
+        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+        //parse job history endpoint
+        this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
+        this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
+        this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+        this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
+        this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
+        this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+
+        //parse control config
+        this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun");
+        try {
+            this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+            assert this.controlConfig.partitionerCls != null;
+        } catch (Exception e) {
+            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.mr.history.storm.DefaultJobIdPartitioner", e);
+            this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
+        } finally {
+            LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls);
+        }
+        this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth");
+        this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone");
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+
+        LOG.info("Successfully initialized JHFConfigManager");
+        LOG.info("env: " + this.env);
+        LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
+        LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
new file mode 100755
index 0000000..feb5498
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JPAConstants {
+
+    private final static Logger LOG = LoggerFactory.getLogger(JPAConstants.class);
+
+    public static final String JPA_JOB_CONFIG_SERVICE_NAME = "JobConfigService";
+    public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService";
+    public static final String JPA_JOB_EXECUTION_SERVICE_NAME = "JobExecutionService";
+
+    public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "TaskAttemptExecutionService";
+    public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = "TaskFailureCountService";
+    public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = "TaskAttemptCounterService";
+    public static final String JPA_TASK_EXECUTION_SERVICE_NAME = "TaskExecutionService";
+    public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+
+    public static final String JOB_TASK_TYPE_TAG = "taskType";
+
+    public static class JobConfiguration {
+        // job type
+        public static final String SCOOBI_JOB = "scoobi.mode";
+        public static final String HIVE_JOB = "hive.query.string";
+        public static final String PIG_JOB = "pig.script";
+        public static final String CASCADING_JOB = "cascading.app.name";
+    }
+
+    /**
+     * MR task types
+     */
+    public enum TaskType {
+        SETUP, MAP, REDUCE, CLEANUP
+    }
+
+    public enum JobType {
+        CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"),
+        NOTAVALIABLE("N/A")
+        ;
+        private String value;
+        JobType(String value){
+            this.value = value;
+        }
+        @Override
+        public String toString() {
+            return this.value;
+        }
+    }
+
+    public static final String FILE_SYSTEM_COUNTER = "org.apache.hadoop.mapreduce.FileSystemCounter";
+    public static final String TASK_COUNTER = "org.apache.hadoop.mapreduce.TaskCounter";
+
+    public static final String MAP_TASK_ATTEMPT_COUNTER = "MapTaskAttemptCounter";
+    public static final String REDUCE_TASK_ATTEMPT_COUNTER = "ReduceTaskAttemptCounter";
+
+    public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "MapTaskAttemptFileSystemCounter";
+    public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = "ReduceTaskAttemptFileSystemCounter";
+
+    public enum TaskAttemptCounter {
+        TASK_ATTEMPT_DURATION,
+    }
+
+
+
+    private static final String DEFAULT_JOB_CONF_NORM_JOBNAME_KEY = "eagle.job.name";
+    private static final String EAGLE_NORM_JOBNAME_CONF_KEY = "eagle.job.normalizedfieldname";
+
+    public static String JOB_CONF_NORM_JOBNAME_KEY = null;
+
+    static {
+        if (JOB_CONF_NORM_JOBNAME_KEY == null) {
+            JOB_CONF_NORM_JOBNAME_KEY = DEFAULT_JOB_CONF_NORM_JOBNAME_KEY;
+        }
+        LOG.info("Loaded " + EAGLE_NORM_JOBNAME_CONF_KEY + " : " + JOB_CONF_NORM_JOBNAME_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
new file mode 100644
index 0000000..f85f7bc
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.common;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public final class JobConfig {
+    private Map<String, String> config = new TreeMap<>();
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+    
+    public String toString(){
+        return config.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
new file mode 100644
index 0000000..5b330fc
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/AbstractJobHistoryDAO.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * job history is the resource
+ */
+public abstract class AbstractJobHistoryDAO implements JobHistoryLCM {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobHistoryDAO.class);
+
+    private final static String YEAR_URL_FORMAT = "/%4d";
+    private final static String MONTH_URL_FORMAT = "/%02d";
+    private final static String DAY_URL_FORMAT = "/%02d";
+    private final static String YEAR_MONTH_DAY_URL_FORMAT = YEAR_URL_FORMAT + MONTH_URL_FORMAT + DAY_URL_FORMAT;
+    protected final static String SERIAL_URL_FORMAT = "/%06d";
+    protected final static String FILE_URL_FORMAT = "/%s";
+    private static final Pattern JOBTRACKERNAME_PATTERN = Pattern.compile("^.*_(\\d+)_$");
+    protected static final Pattern JOBID_PATTERN = Pattern.compile("job_\\d+_\\d+");
+
+    protected final String m_basePath;
+    protected volatile String m_jobTrackerName;
+
+    public  static final String JOB_CONF_POSTFIX = "_conf.xml";
+
+    private final static Timer timer = new Timer(true);
+    private final static long JOB_TRACKER_SYNC_DURATION = 10 * 60 * 1000; // 10 minutes
+
+    private boolean m_pathContainsJobTrackerName;
+
+    public AbstractJobHistoryDAO(String basePath, boolean pathContainsJobTrackerName, String startingJobTrackerName) throws Exception {
+        m_basePath = basePath;
+        m_pathContainsJobTrackerName = pathContainsJobTrackerName;
+        m_jobTrackerName = startingJobTrackerName;
+        if (m_pathContainsJobTrackerName) {
+            if (startingJobTrackerName == null || startingJobTrackerName.isEmpty())
+                throw new IllegalStateException("startingJobTrackerName should not be null or empty");
+            // start background thread to check what is current job tracker
+            startThread(m_basePath);
+        }
+    }
+
+    protected String buildWholePathToYearMonthDay(int year, int month, int day) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(m_basePath);
+        if (!m_pathContainsJobTrackerName && m_jobTrackerName != null && !m_jobTrackerName.isEmpty()) {
+            sb.append("/");
+            sb.append(m_jobTrackerName);
+        }
+        sb.append(String.format(YEAR_MONTH_DAY_URL_FORMAT, year, month, day));
+        return sb.toString();
+    }
+
+    protected String buildWholePathToSerialNumber(int year, int month, int day, int serialNumber) {
+        String wholePathToYearMonthDay = buildWholePathToYearMonthDay(year, month, day);
+        StringBuilder sb = new StringBuilder();
+        sb.append(wholePathToYearMonthDay);
+        sb.append(String.format(SERIAL_URL_FORMAT, serialNumber));
+        return sb.toString();
+    }
+
+    protected String buildWholePathToJobHistoryFile(int year, int month, int day, int serialNumber, String jobHistoryFileName) {
+        String wholePathToJobHistoryFile = buildWholePathToSerialNumber(year, month, day, serialNumber);
+        StringBuilder sb = new StringBuilder();
+        sb.append(wholePathToJobHistoryFile);
+        sb.append(String.format(FILE_URL_FORMAT, jobHistoryFileName));
+        return sb.toString();
+    }
+
+
+    protected String buildWholePathToJobConfFile(int year, int month, int day, int serialNumber,String jobHistFileName) {
+        Matcher matcher = JOBID_PATTERN.matcher(jobHistFileName);
+        if (matcher.find()) {
+            String wholePathToJobConfFile = buildWholePathToSerialNumber(year, month, day, serialNumber);
+            StringBuilder sb = new StringBuilder();
+            sb.append(wholePathToJobConfFile);
+            sb.append("/");
+            sb.append(String.format(FILE_URL_FORMAT, matcher.group()));
+            sb.append(JOB_CONF_POSTFIX);
+            return sb.toString();
+        }
+        LOG.warn("Illegal job history file name: "+jobHistFileName);
+        return null;
+    }
+
+    private void startThread(final String basePath) throws Exception {
+        LOG.info("start an every-" + JOB_TRACKER_SYNC_DURATION / (60 * 1000) + "min timer task to check current jobTrackerName in background");
+        // Automatically update current job tracker name in background every 30 minutes
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("regularly checking current jobTrackerName in background");
+                    final String _jobTrackerName = calculateJobTrackerName(basePath);
+                    if (_jobTrackerName != null && !_jobTrackerName.equals(m_jobTrackerName)) {
+                        LOG.info("jobTrackerName changed from " + m_jobTrackerName +" to " + _jobTrackerName);
+                        m_jobTrackerName = _jobTrackerName;
+                    }
+                    LOG.info("Current jobTrackerName is: " + m_jobTrackerName);
+                } catch (Exception e) {
+                    LOG.error("failed to figure out current job tracker name that is not configured due to: " + e.getMessage(), e);
+                } catch (Throwable t) {
+                    LOG.error("failed to figure out current job tracker name that is not configured due to: " + t.getMessage(), t);
+                }
+            }
+        }, JOB_TRACKER_SYNC_DURATION, JOB_TRACKER_SYNC_DURATION);
+    }
+
+
+    @Override
+    public void readFileContent(int year, int month, int day, int serialNumber, String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception {
+        InputStream downloadIs;
+        try {
+            downloadIs = getJHFFileContentAsStream(year, month, day, serialNumber, jobHistoryFileName);
+        } catch (FileNotFoundException ex) {
+            LOG.error("job history file not found " + jobHistoryFileName+", ignore and will NOT process any more");
+            return;
+        }
+
+        InputStream downloadJobConfIs = null;
+        try {
+            downloadJobConfIs = getJHFConfContentAsStream(year, month, day, serialNumber, jobHistoryFileName);
+        } catch (FileNotFoundException ex) {
+            LOG.warn("job configuration file of "+ jobHistoryFileName+" not found , ignore and use empty configuration");
+        }
+
+        org.apache.hadoop.conf.Configuration conf = null;
+
+        if (downloadJobConfIs != null) {
+            conf = new org.apache.hadoop.conf.Configuration();
+            conf.addResource(downloadJobConfIs);
+        }
+
+        try {
+            if (downloadIs != null) {
+                reader.onInputStream(downloadIs, conf);
+            }
+        } catch (Exception ex) {
+            LOG.error("fail reading job history file", ex);
+            throw ex;
+        } catch(Throwable t) {
+            LOG.error("fail reading job history file", t);
+            throw new Exception(t);
+        } finally {
+            try {
+                if(downloadJobConfIs != null) {
+                    downloadJobConfIs.close();
+                }
+                if (downloadIs != null) {
+                    downloadIs.close();
+                }
+            } catch (IOException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+    }
+
+
+    protected static long parseJobTrackerNameTimestamp(String jtname) {
+        Matcher matcher = JOBTRACKERNAME_PATTERN.matcher(jtname);
+        if (matcher.find()) {
+            return Long.parseLong(matcher.group(1));
+        }
+        LOG.warn("invalid job tracker name: " + jtname);
+        return -1;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
new file mode 100644
index 0000000..ff0c8c8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
+import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class);
+
+
+    private JobHistoryContentFilter m_filter;
+    private EagleOutputCollector m_eagleCollector;
+    private JHFConfigManager m_configManager;
+
+    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) {
+        this.m_filter = filter;
+        this.m_configManager = configManager;
+        this.m_eagleCollector = eagleCollector;
+    }
+
+    @Override
+    public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception {
+        final JHFConfigManager.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
+        @SuppressWarnings("serial")
+        Map<String, String> baseTags = new HashMap<String, String>() { {
+            put("site", jobExtractorConfig.site);
+        } };
+
+        if (!m_filter.acceptJobFile()) {
+            // close immediately if we don't need job file
+            jobFileInputStream.close();
+        } else {
+            //get parser and parse, do not need to emit data now
+            JHFParserBase parser = JHFParserFactory.getParser(m_configManager,
+                    baseTags,
+                    conf, m_filter);
+            parser.parse(jobFileInputStream);
+            jobFileInputStream.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
new file mode 100644
index 0000000..3edde5b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+public interface JHFCrawlerDriver {
+    /**
+     * return -1 if failed or there is no file to crawl
+     * return modified time of the file if succeed
+     */
+    long crawl() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
new file mode 100644
index 0000000..8445434
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.storm.JobIdFilter;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * single thread crawling per driver
+ * multiple drivers can achieve parallelism
+ *
+ */
+public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
+    private static final Logger LOG = LoggerFactory.getLogger(JHFCrawlerDriverImpl.class);
+
+    private final static int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10;
+    private final static String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d";
+    private final static Pattern PATTERN_JOB_PROCESS_DATE = Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})");
+
+    private static final int INITIALIZED = 0x0;
+    private static final int TODAY = 0x1;
+    private static final int BEFORETODAY = 0x10;
+    private final int PROCESSED_JOB_KEEP_DAYS = 5;
+
+    private int m_flag = INITIALIZED; // 0 not set, 1 TODAY, 2 BEFORETODAY
+    private Deque<Pair<Long, String> > m_processQueue = new LinkedList<>();
+    private Set<String> m_processedJobFileNames = new HashSet<>();
+
+    private final JobProcessDate m_proceeDate = new JobProcessDate();
+    private boolean m_dryRun;
+    private JHFInputStreamCallback m_reader;
+    protected boolean m_zeroBasedMonth = true;
+
+    private JobHistoryZKStateLCM m_zkStatelcm;
+    private JobHistoryLCM m_jhfLCM;
+    private JobIdFilter m_jobFilter;
+    private int m_partitionId;
+    private TimeZone m_timeZone;
+
+    public JHFCrawlerDriverImpl(JHFConfigManager.JobHistoryEndpointConfig jobHistoryConfig,
+                                JHFConfigManager.ControlConfig controlConfig, JHFInputStreamCallback reader,
+                                JobHistoryZKStateLCM zkStateLCM,
+                                JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
+        this.m_zeroBasedMonth = controlConfig.zeroBasedMonth;
+        this.m_dryRun = controlConfig.dryRun;
+        if (this.m_dryRun)  LOG.info("this is a dry run");
+        this.m_reader = reader;
+        m_jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
+        this.m_zkStatelcm = zkStateLCM;
+        this.m_partitionId = partitionId;
+        this.m_jobFilter = jobFilter;
+        m_timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
+    }
+
+    /**
+     * <br>
+     * 1. if queue is not empty <br>
+     * 1.1 dequeue and process one job file <br>
+     * 1.2 store processed job file and also cache it to processedJobFileNames
+     * 2. if queue is empty <br>
+     * 2.0 if flag is BEFORETODAY, then write currentProcessedDate to jobProcessedDate as this day's data are all processed <br>
+     * 2.1 crawl that day's job file list <br>
+     * 2.2 filter out those jobID which are in _processedJobIDs keyed by
+     * currentProcessedDate <br>
+     * 2.3 put available file list to processQueue and then go to step 1
+     */
+    @Override
+    public long crawl() throws Exception {
+        LOG.info("queue size is " + m_processQueue.size());
+        while (m_processQueue.isEmpty()) {
+            // read lastProcessedDate only when it's initialized
+            if (m_flag == INITIALIZED) {
+                readAndCacheLastProcessedDate();
+            }
+            if (m_flag == BEFORETODAY) {
+                updateProcessDate();
+                clearProcessedJobFileNames();
+            }
+            if (m_flag != TODAY) { // advance one day if initialized or BEFORE today
+                advanceOneDay();
+            }
+
+            if (isToday()) {
+                m_flag = TODAY;
+            } else {
+                m_flag = BEFORETODAY;
+            }
+
+            List<String> serialNumbers = m_jhfLCM.readSerialNumbers(this.m_proceeDate.year, getActualMonth(m_proceeDate.month), this.m_proceeDate.day);
+            List<Pair<Long, String> > allJobHistoryFiles = new LinkedList<>();
+            for (String serialNumber : serialNumbers) {
+                List<Pair<Long, String> > jobHistoryFiles = m_jhfLCM.readFileNames(
+                        this.m_proceeDate.year,
+                        getActualMonth(m_proceeDate.month),
+                        this.m_proceeDate.day,
+                        Integer.parseInt(serialNumber));
+                LOG.info("total number of job history files " + jobHistoryFiles.size());
+                for (Pair<Long, String> jobHistoryFile : jobHistoryFiles) {
+                    if (m_jobFilter.accept(jobHistoryFile.getRight()) && !fileProcessed(jobHistoryFile.getRight())) {
+                        allJobHistoryFiles.add(jobHistoryFile);
+                    }
+                }
+                jobHistoryFiles.clear();
+                LOG.info("after filtering, number of job history files " + m_processQueue.size());
+            }
+
+            Collections.sort(allJobHistoryFiles,
+                    new Comparator<Pair<Long, String>>() {
+                        @Override
+                        public int compare(Pair<Long, String> o1, Pair<Long, String> o2) {
+                            if (o1.getLeft() > o2.getLeft()) return 1;
+                            else if (o1.getLeft() == o2.getLeft()) return 0;
+                            else return -1;
+                        }
+                    }
+            );
+            for (Pair<Long, String> jobHistoryFile : allJobHistoryFiles) {
+                m_processQueue.add(jobHistoryFile);
+            }
+
+            allJobHistoryFiles.clear();
+
+            if (m_processQueue.isEmpty()) {
+                Thread.sleep(SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY * 1000);
+            } else {
+                LOG.info("queue size after populating is now : " + m_processQueue.size());
+            }
+        }
+        // start to process job history file
+        Pair<Long, String> item = m_processQueue.pollFirst();
+        String jobHistoryFile = item.getRight();
+        Long modifiedTime = item.getLeft();
+        if (jobHistoryFile == null) { // terminate this round of crawling when the queue is empty
+            LOG.info("process queue is empty, ignore this round");
+            return -1;
+        }
+        // get serialNumber from job history file name
+        Pattern p = Pattern.compile("^job_[0-9]+_([0-9]+)[0-9]{3}[_-]{1}");
+        Matcher m = p.matcher(jobHistoryFile);
+        String serialNumber;
+        if (m.find()) {
+            serialNumber = m.group(1);
+        } else {
+            LOG.warn("illegal job history file name : " + jobHistoryFile);
+            return -1;
+        }
+        if (!m_dryRun) {
+            m_jhfLCM.readFileContent(
+                    m_proceeDate.year,
+                    getActualMonth(m_proceeDate.month),
+                    m_proceeDate.day,
+                    Integer.valueOf(serialNumber),
+                    jobHistoryFile,
+                    m_reader);
+        }
+        m_zkStatelcm.addProcessedJob(String.format(FORMAT_JOB_PROCESS_DATE,
+                this.m_proceeDate.year,
+                this.m_proceeDate.month + 1,
+                this.m_proceeDate.day),
+                jobHistoryFile);
+        m_processedJobFileNames.add(jobHistoryFile);
+
+        return modifiedTime;
+    }
+
+    private void updateProcessDate() throws Exception {
+        String line = String.format(FORMAT_JOB_PROCESS_DATE, this.m_proceeDate.year,
+                this.m_proceeDate.month + 1, this.m_proceeDate.day);
+        m_zkStatelcm.updateProcessedDate(m_partitionId, line);
+    }
+
+    private int getActualMonth(int month){
+        return m_zeroBasedMonth ? m_proceeDate.month : m_proceeDate.month + 1;
+    }
+
+    private static class JobProcessDate {
+        public int year;
+        public int month; // 0 based month
+        public int day;
+    }
+
+    private void clearProcessedJobFileNames() {
+        m_processedJobFileNames.clear();
+    }
+
+    private void readAndCacheLastProcessedDate() throws Exception {
+        String lastProcessedDate = m_zkStatelcm.readProcessedDate(m_partitionId);
+        Matcher m = PATTERN_JOB_PROCESS_DATE.matcher(lastProcessedDate);
+        if (m.find() && m.groupCount() == 3) {
+            this.m_proceeDate.year = Integer.parseInt(m.group(1));
+            this.m_proceeDate.month = Integer.parseInt(m.group(2)) - 1; // zero based month
+            this.m_proceeDate.day = Integer.parseInt(m.group(3));
+        } else {
+            throw new IllegalStateException("job lastProcessedDate must have format YYYYMMDD " + lastProcessedDate);
+        }
+
+        GregorianCalendar cal = new GregorianCalendar(m_timeZone);
+        cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0);
+        cal.add(Calendar.DATE, 1);
+        List<String> list = m_zkStatelcm.readProcessedJobs(String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
+                cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH)));
+        if (list != null) {
+            this.m_processedJobFileNames = new HashSet<>(list);
+        }
+    }
+
+    private void advanceOneDay() throws Exception {
+        GregorianCalendar cal = new GregorianCalendar(m_timeZone);
+        cal.set(this.m_proceeDate.year, this.m_proceeDate.month, this.m_proceeDate.day, 0, 0, 0);
+        cal.add(Calendar.DATE, 1);
+        this.m_proceeDate.year = cal.get(Calendar.YEAR);
+        this.m_proceeDate.month = cal.get(Calendar.MONTH);
+        this.m_proceeDate.day = cal.get(Calendar.DAY_OF_MONTH);
+
+        try {
+            clearProcessedJob(cal);
+        } catch (Exception e) {
+            LOG.error("failed to clear processed job ", e);
+        }
+
+    }
+
+    private void clearProcessedJob(Calendar cal) {
+        // clear all already processed jobs some days before current processing date (PROCESSED_JOB_KEEP_DAYS)
+        cal.add(Calendar.DATE, -1 - PROCESSED_JOB_KEEP_DAYS);
+        String line = String.format(FORMAT_JOB_PROCESS_DATE, cal.get(Calendar.YEAR),
+                cal.get(Calendar.MONTH) + 1, cal.get(Calendar.DAY_OF_MONTH));
+        m_zkStatelcm.truncateProcessedJob(line);
+    }
+
+    private boolean isToday() {
+        GregorianCalendar today = new GregorianCalendar(m_timeZone);
+
+        if (today.get(Calendar.YEAR) == this.m_proceeDate.year
+                && today.get(Calendar.MONTH) == this.m_proceeDate.month
+                && today.get(Calendar.DAY_OF_MONTH) == this.m_proceeDate.day)
+            return true;
+
+        return false;
+    }
+
+    /**
+     * check if this file was already processed
+     *
+     * @param fileName
+     * @return
+     */
+    private boolean fileProcessed(String fileName) {
+        if (m_processedJobFileNames.contains(fileName))
+            return true;
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
new file mode 100644
index 0000000..52a62a4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFInputStreamCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * callback when job history file input stream is ready
+ */
+public interface JHFInputStreamCallback extends Serializable {
+    /**
+     * this is called when job file string and job configuration file is ready
+     * @param is
+     * @param configuration
+     * @throws Exception
+     */
+    void onInputStream(InputStream is, Configuration configuration) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
new file mode 100644
index 0000000..6fbf3d3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * define what content in job history stream should be streamed
+ * @author yonzhang
+ *
+ */
+public interface JobHistoryContentFilter extends Serializable {
+    boolean acceptJobFile();
+    boolean acceptJobConfFile();
+    List<Pattern> getMustHaveJobConfKeyPatterns();
+    List<Pattern> getJobConfKeyInclusionPatterns();
+    List<Pattern> getJobConfKeyExclusionPatterns();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
new file mode 100644
index 0000000..43234c2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class JobHistoryContentFilterBuilder {
+    private final static Logger LOG = LoggerFactory.getLogger(JobHistoryContentFilterBuilder.class);
+
+    private boolean m_acceptJobFile;
+    private boolean m_acceptJobConfFile;
+    private List<Pattern> m_mustHaveJobConfKeyPatterns;
+    private List<Pattern> m_jobConfKeyInclusionPatterns;
+    private List<Pattern> m_jobConfKeyExclusionPatterns;
+
+    public static JobHistoryContentFilterBuilder newBuilder(){
+        return new JobHistoryContentFilterBuilder();
+    }
+
+    public JobHistoryContentFilterBuilder acceptJobFile() {
+        this.m_acceptJobFile = true;
+        return this;
+    }
+
+    public JobHistoryContentFilterBuilder acceptJobConfFile() {
+        this.m_acceptJobConfFile = true;
+        return this;
+    }
+
+    public JobHistoryContentFilterBuilder mustHaveJobConfKeyPatterns(Pattern ...patterns) {
+        m_mustHaveJobConfKeyPatterns = Arrays.asList(patterns);
+        if (m_jobConfKeyInclusionPatterns != null) {
+            List<Pattern> list = new ArrayList<Pattern>();
+            list.addAll(m_jobConfKeyInclusionPatterns);
+            list.addAll(Arrays.asList(patterns));
+            m_jobConfKeyInclusionPatterns = list;
+        }
+        else
+            m_jobConfKeyInclusionPatterns = Arrays.asList(patterns);
+        return this;
+    }
+
+    public JobHistoryContentFilterBuilder includeJobKeyPatterns(Pattern ... patterns) {
+        if (m_jobConfKeyInclusionPatterns != null) {
+            List<Pattern> list = new ArrayList<Pattern>();
+            list.addAll(m_jobConfKeyInclusionPatterns);
+            list.addAll(Arrays.asList(patterns));
+            m_jobConfKeyInclusionPatterns = list;
+        } else
+            m_jobConfKeyInclusionPatterns = Arrays.asList(patterns);
+        return this;
+    }
+
+    public JobHistoryContentFilterBuilder excludeJobKeyPatterns(Pattern ...patterns) {
+        m_jobConfKeyExclusionPatterns = Arrays.asList(patterns);
+        return this;
+    }
+
+    public JobHistoryContentFilter build() {
+        JobHistoryContentFilterImpl filter = new JobHistoryContentFilterImpl();
+        filter.setAcceptJobFile(m_acceptJobFile);
+        filter.setAcceptJobConfFile(m_acceptJobConfFile);
+        filter.setMustHaveJobConfKeyPatterns(m_mustHaveJobConfKeyPatterns);
+        filter.setJobConfKeyInclusionPatterns(m_jobConfKeyInclusionPatterns);
+        filter.setJobConfKeyExclusionPatterns(m_jobConfKeyExclusionPatterns);
+        LOG.info("job history content filter:" + filter);
+        return filter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
new file mode 100644
index 0000000..d8a482b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryContentFilterImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class JobHistoryContentFilterImpl implements JobHistoryContentFilter {
+    private boolean m_acceptJobFile;
+    private boolean m_acceptJobConfFile;
+    private List<Pattern> m_mustHaveJobConfKeyPatterns;
+    private List<Pattern> m_jobConfKeyInclusionPatterns;
+    private List<Pattern> m_jobConfKeyExclusionPatterns;
+
+    @Override
+    public boolean acceptJobFile() {
+        return m_acceptJobFile;
+    }
+
+    @Override
+    public boolean acceptJobConfFile() {
+        return m_acceptJobConfFile;
+    }
+
+    @Override
+    public List<Pattern> getMustHaveJobConfKeyPatterns() {
+        return m_mustHaveJobConfKeyPatterns;
+    }
+
+    @Override
+    public List<Pattern> getJobConfKeyInclusionPatterns() {
+        return m_jobConfKeyInclusionPatterns;
+    }
+
+    @Override
+    public List<Pattern> getJobConfKeyExclusionPatterns() {
+        return m_jobConfKeyExclusionPatterns;
+    }
+
+    public void setAcceptJobFile(boolean acceptJobFile) {
+        this.m_acceptJobFile = acceptJobFile;
+    }
+
+    public void setAcceptJobConfFile(boolean acceptJobConfFile) {
+        this.m_acceptJobConfFile = acceptJobConfFile;
+    }
+
+    public void setJobConfKeyInclusionPatterns(
+            List<Pattern> jobConfKeyInclusionPatterns) {
+        this.m_jobConfKeyInclusionPatterns = jobConfKeyInclusionPatterns;
+    }
+
+    public void setJobConfKeyExclusionPatterns(
+            List<Pattern> jobConfKeyExclusionPatterns) {
+        this.m_jobConfKeyExclusionPatterns = jobConfKeyExclusionPatterns;
+    }
+
+    public void setMustHaveJobConfKeyPatterns(List<Pattern> mustHaveJobConfKeyPatterns) {
+        this.m_mustHaveJobConfKeyPatterns = mustHaveJobConfKeyPatterns;
+    }
+
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("job history file:");
+        sb.append(m_acceptJobFile);
+        sb.append(", job config file:");
+        sb.append(m_acceptJobConfFile);
+        if(m_acceptJobConfFile){
+            sb.append(", must contain keys:");
+            sb.append(m_mustHaveJobConfKeyPatterns);
+            sb.append(", include keys:");
+            sb.append(m_jobConfKeyInclusionPatterns);
+            sb.append(", exclude keys:");
+            sb.append(m_jobConfKeyExclusionPatterns);
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
new file mode 100644
index 0000000..3b303fd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.JobHistoryEndpointConfig;
+
+public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
+    private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class);
+
+    private Configuration m_conf = new Configuration();
+
+    private FileSystem m_hdfs;
+
+    public JobHistoryDAOImpl(JobHistoryEndpointConfig endpointConfig) throws Exception {
+        super(endpointConfig.basePath, endpointConfig.pathContainsJobTrackerName, endpointConfig.jobTrackerName);
+        this.m_conf.set("fs.defaultFS", endpointConfig.nnEndpoint);
+        this.m_conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        if (!endpointConfig.principal.equals("")) {
+            this.m_conf.set("hdfs.kerberos.principal", endpointConfig.principal);
+            this.m_conf.set("hdfs.keytab.file", endpointConfig.keyTab);
+        }
+        LOG.info("file system:" + endpointConfig.nnEndpoint);
+        m_hdfs = HDFSUtil.getFileSystem(m_conf);
+    }
+
+    @Override
+    public void freshFileSystem() throws Exception {
+        try {
+            m_hdfs.close();
+        } catch (Exception e) {
+
+        } finally {
+            m_hdfs = HDFSUtil.getFileSystem(m_conf);
+        }
+    }
+
+    @Override
+    public String calculateJobTrackerName(String basePath) throws Exception {
+        String latestJobTrackerName = null;
+        try {
+            Path hdfsFile = new Path(basePath);
+            FileStatus[] files = m_hdfs.listStatus(hdfsFile);
+
+            // Sort by modification time as order of desc
+            Arrays.sort(files, new Comparator<FileStatus>() {
+                @Override
+                public int compare(FileStatus o1, FileStatus o2) {
+                    long comp = parseJobTrackerNameTimestamp(o1.getPath().toString()) - parseJobTrackerNameTimestamp(o2.getPath().toString());
+                    if (comp > 0l) {
+                        return -1;
+                    } else if (comp < 0l) {
+                        return 1;
+                    }
+                    return 0;
+                }
+            });
+
+            for (FileStatus fs : files) {
+                // back-compatible with hadoop 0.20
+                // pick the first directory file which should be the latest modified.
+                if (fs.isDir()) {
+                    latestJobTrackerName = fs.getPath().getName();
+                    break;
+                }
+            }
+        } catch(Exception ex) {
+            LOG.error("fail read job tracker name " + basePath, ex);
+            throw ex;
+        }
+        return latestJobTrackerName == null ? "" : latestJobTrackerName;
+    }
+
+    @Override
+    public List<String> readSerialNumbers(int year, int month, int day) throws Exception {
+        List<String> serialNumbers = new ArrayList<>();
+        String dailyPath = buildWholePathToYearMonthDay(year, month, day);
+        LOG.info("crawl serial numbers under one day : " + dailyPath);
+        try {
+            Path hdfsFile = new Path(dailyPath);
+            FileStatus[] files = m_hdfs.listStatus(hdfsFile);
+            for (FileStatus fs : files) {
+                if (fs.isDir()) {
+                    serialNumbers.add(fs.getPath().getName());
+                }
+            }
+        } catch (java.io.FileNotFoundException ex) {
+            LOG.warn("continue to crawl with failure to find file " + dailyPath);
+            LOG.debug("continue to crawl with failure to find file " + dailyPath, ex);
+            // continue to execute
+            return serialNumbers;
+        } catch (Exception ex) {
+            LOG.error("fail reading serial numbers under one day " + dailyPath, ex);
+            throw ex;
+        }
+        StringBuilder sb = new StringBuilder();
+        for (String sn : serialNumbers) {
+            sb.append(sn);sb.append(",");
+        }
+        LOG.info("crawled serialNumbers: " + sb);
+        return serialNumbers;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception {
+        LOG.info("crawl file names under one serial number : " + year + "/" + month + "/" + day + ":" + serialNumber);
+        List<Pair<Long, String> > jobFileNames = new ArrayList<>();
+        String serialPath = buildWholePathToSerialNumber(year, month, day, serialNumber);
+        try {
+            Path hdfsFile = new Path(serialPath);
+            // filter those files which is job configuration file in xml format
+            FileStatus[] files = m_hdfs.listStatus(hdfsFile, new PathFilter(){
+                @Override
+                public boolean accept(Path path){
+                    if (path.getName().endsWith(".xml"))
+                        return false;
+                    return true;
+                }
+            });
+            for (FileStatus fs : files) {
+                if (!fs.isDir()) {
+                    jobFileNames.add(Pair.of(fs.getModificationTime(), fs.getPath().getName()));
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                for (Pair<Long, String> sn : jobFileNames) {
+                    sb.append(sn.getRight());sb.append(",");
+                }
+                LOG.debug("crawled: " + sb);
+            }
+        } catch (Exception ex) {
+            LOG.error("fail reading job history file names under serial number " + serialPath, ex);
+            throw ex;
+        }
+        return jobFileNames;
+    }
+
+    /**
+     * it's the responsibility of caller to close input stream
+     */
+    @Override
+    public InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception {
+        String path = buildWholePathToJobHistoryFile(year, month, day, serialNumber, jobHistoryFileName);
+        LOG.info("Read job history file: " + path);
+        try {
+            Path hdfsFile = new Path(path);
+            return m_hdfs.open(hdfsFile);
+        } catch(Exception ex) {
+            LOG.error("fail getting hdfs file inputstream " + path, ex);
+            throw ex;
+        }
+    }
+
+    /**
+     * it's the responsibility of caller to close input stream
+     */
+    @Override
+    public InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception {
+        String path = buildWholePathToJobConfFile(year, month, day, serialNumber,jobHistoryFileName);
+        if (path  == null) return null;
+
+        LOG.info("Read job conf file: " + path);
+        try {
+            Path hdfsFile = new Path(path);
+            return m_hdfs.open(hdfsFile);
+        } catch(Exception ex) {
+            LOG.error("fail getting job configuration input stream from " + path, ex);
+            throw ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
new file mode 100644
index 0000000..de8d3f7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryLCM.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Define various operations on job history file resource for lifecycle management
+ *
+ * The job history file directory structure supported is as follows:
+ * <basePath>/<jobTrackerName>/<year>/<month>/<day>/<serialNumber>/<jobHistoryFileName>
+ *
+ * In some hadoop version, <jobTrackerName> is not included
+ *
+ * The operations involved in resource read
+ * - list job tracker names under basePath (mostly basePath is configured in entry mapreduce.jobhistory.done-dir of mapred-site.xml)
+ * - list serial numbers under one day
+ * - list job history files under one serial number
+ * - read one job history file
+ *
+ */
+public interface JobHistoryLCM {
+    String calculateJobTrackerName(String basePath) throws Exception;
+    /**
+     * @param year
+     * @param month 0-based or 1-based month depending on hadoop cluster setting
+     * @param day
+     * @return
+     * @throws Exception
+     */
+    List<String> readSerialNumbers(int year, int month, int day) throws Exception;
+    /**
+     * @param year
+     * @param month 0-based or 1-based month depending on hadoop cluster setting
+     * @param day
+     * @param serialNumber
+     * @return
+     * @throws Exception
+     */
+    List<Pair<Long, String> > readFileNames(int year, int month, int day, int serialNumber) throws Exception;
+    /**
+     * @param year
+     * @param month 0-based or 1-based month depending on hadoop cluster setting
+     * @param day
+     * @param serialNumber
+     * @param jobHistoryFileName
+     * @param reader
+     * @throws Exception
+     */
+    void readFileContent(int year, int month, int day, int serialNumber, String jobHistoryFileName, JHFInputStreamCallback reader) throws Exception;
+    /**
+     * @param year
+     * @param month 0-based or 1-based month depending on hadoop cluster setting
+     * @param day
+     * @param serialNumber
+     * @param jobHistoryFileName
+     * @return
+     * @throws Exception
+     */
+    InputStream getJHFFileContentAsStream(int year, int month, int day, int serialNumber, String jobHistoryFileName) throws Exception;
+    InputStream getJHFConfContentAsStream(int year, int month, int day, int serialNumber, String jobConfFileName) throws Exception;
+
+    /**
+     *
+     */
+    void freshFileSystem() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
new file mode 100644
index 0000000..e055957
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistorySpoutCollectorInterceptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.crawler;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.eagle.dataproc.core.EagleOutputCollector;
+import org.apache.eagle.dataproc.core.ValuesArray;
+
+public class JobHistorySpoutCollectorInterceptor implements EagleOutputCollector {
+    private SpoutOutputCollector m_collector;
+
+    public void setSpoutOutputCollector(SpoutOutputCollector collector) {
+        this.m_collector = collector;
+    }
+
+    @Override
+    public void collect(ValuesArray t) {
+        m_collector.emit(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
new file mode 100755
index 0000000..d49cdef
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPAEntityRepository extends EntityRepository {
+
+    public JPAEntityRepository() {
+        serDeserMap.put(JobCounters.class, new JobCountersSerDeser());
+        serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+        entitySet.add(JobConfigurationAPIEntity.class);
+        entitySet.add(JobEventAPIEntity.class);
+        entitySet.add(JobExecutionAPIEntity.class);
+
+        entitySet.add(TaskAttemptExecutionAPIEntity.class);
+        entitySet.add(TaskExecutionAPIEntity.class);
+        entitySet.add(TaskFailureCountAPIEntity.class);
+        entitySet.add(TaskAttemptCounterAPIEntity.class);
+        entitySet.add(JobProcessTimeStampEntity.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java
new file mode 100644
index 0000000..32c6f7c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobBaseAPIEntity.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+public class JobBaseAPIEntity extends TaggedLogAPIEntity {
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
new file mode 100755
index 0000000..65f535f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
+
+    @Override
+    public JobConfig deserialize(byte[] bytes) {
+        JobConfig jc = new JobConfig();
+        Map<String, String> map = new TreeMap<String, String>();
+        jc.setConfig(map);
+        String sb = Bytes.toString(bytes);
+        String[] keyValue = sb.split(",");
+        for (String pair : keyValue) {
+            String str[] = pair.split(":");
+            if (pair.equals("") || str[0].equals("")) continue;            
+            String key = str[0];
+            String value = "";
+            if (str.length == 2) value = str[1];
+            map.put(key, value);
+        }
+        return jc;
+    }
+    
+    @Override
+    public byte[] serialize(JobConfig conf) {
+        Map<String, String> map = conf.getConfig();
+        StringBuilder sb = new StringBuilder();
+        for (Entry<String, String> entry : map.entrySet())
+            sb.append(entry.getKey() + ":" + entry.getValue() + ",");
+        sb.deleteCharAt(sb.length() - 1);
+        return sb.toString().getBytes();
+    }
+
+    @Override
+    public Class<JobConfig> type(){
+        return JobConfig.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
new file mode 100755
index 0000000..44fa98c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jconf")
+@Service(JPAConstants.JPA_JOB_CONFIG_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true),
+        @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique = false)
+})
+public class JobConfigurationAPIEntity extends JobBaseAPIEntity {
+    
+    @Column("a")
+    private String configJobName;
+    @Column("b")
+    private JobConfig jobConfig;
+    @Column("c")
+    private String alertEmailList;
+    
+    public JobConfig getJobConfig() {
+        return jobConfig;
+    }
+    public void setJobConfig(JobConfig jobConfig) {
+        this.jobConfig = jobConfig;
+        _pcs.firePropertyChange("jobConfig", null, null);
+    }
+    public String getConfigJobName() {
+        return configJobName;
+    }
+    public void setConfigJobName(String configJobName) {
+        this.configJobName = configJobName;
+        _pcs.firePropertyChange("configJobName", null, null);
+    }
+    public String getAlertEmailList() {
+        return alertEmailList;
+    }
+    public void setAlertEmailList(String alertEmailList) {
+        this.alertEmailList = alertEmailList;
+        _pcs.firePropertyChange("alertEmailList", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
new file mode 100755
index 0000000..01044bb
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.jobcounter.*;
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class JobCountersSerDeser implements EntitySerDeser<JobCounters> {
+
+    private CounterGroupDictionary dictionary = null;
+
+    @Override
+    public JobCounters deserialize(byte[] bytes) {
+        JobCounters counters = new JobCounters();
+        final int length = bytes.length;
+        if (length < 4) {
+            return counters;
+        }
+
+        final Map<String, Map<String, Long> > groupMap = counters.getCounters();
+        int pos = 0;
+        final int totalGroups = Bytes.toInt(bytes, pos);
+        pos += 4;
+        
+        for (int i = 0; i < totalGroups; ++i) {
+            final int groupIndex = Bytes.toInt(bytes, pos);
+            pos += 4;
+            final int totalCounters = Bytes.toInt(bytes, pos);
+            pos += 4;
+            final int nextGroupPos = pos + (totalCounters * 12);
+            try {
+                final CounterGroupKey groupKey = getCounterGroup(groupIndex);
+                if (groupKey == null) {
+                    throw new JobCounterException("Group index " + groupIndex + " is not defined");
+                }
+                final Map<String, Long> counterMap = new TreeMap<String, Long>();
+                groupMap.put(groupKey.getName(), counterMap);
+                for (int j = 0; j < totalCounters; ++j) {
+                    final int counterIndex = Bytes.toInt(bytes, pos);
+                    pos += 4;
+                    final long value = Bytes.toLong(bytes, pos);
+                    pos += 8;
+                    final CounterKey counterKey = groupKey.getCounterKeyByID(counterIndex);
+                    if (counterKey == null) {
+                        continue;
+                    }
+                    counterMap.put(counterKey.getNames().get(0), value);
+                }
+            } catch (JobCounterException ex) {
+                // skip the group
+                pos = nextGroupPos;
+            }
+        }
+        return counters;
+    }
+
+    @Override
+    public byte[] serialize(JobCounters counters) {
+        
+        final Map<String, Map<String, Long>> groupMap = counters.getCounters();
+        int totalSize = 4;
+        for (Map<String, Long> counterMap : groupMap.values()) {
+            final int counterCount = counterMap.size();
+            totalSize += counterCount * 12 + 8;
+        }
+        byte[] buffer = new byte[totalSize];
+
+        int totalGroups = 0;
+        int pos = 0;
+        int totalGroupNumberPos = pos;
+        pos += 4;
+        int nextGroupPos = pos;
+        
+        for (Map.Entry<String, Map<String, Long>> entry : groupMap.entrySet()) {
+            final String groupName = entry.getKey();
+            final Map<String, Long> counterMap = entry.getValue();
+            try {
+                nextGroupPos = pos = serializeGroup(buffer, pos, groupName, counterMap);
+                ++totalGroups;
+            } catch (JobCounterException ex) {
+                pos = nextGroupPos;
+            }
+        }
+        
+        Bytes.putInt(buffer, totalGroupNumberPos, totalGroups);
+        if (pos < totalSize) {
+            buffer = Arrays.copyOf(buffer, pos);
+        }
+        return buffer;
+    }
+
+    @Override
+    public Class<JobCounters> type() {
+        return JobCounters.class;
+    }
+
+    private int serializeGroup(byte[] buffer, int currentPos, String groupName, Map<String, Long> counterMap) throws JobCounterException {
+        int pos = currentPos;
+        final CounterGroupKey groupKey = getCounterGroup(groupName);
+        if (groupKey == null) {
+            throw new JobCounterException("Group name " + groupName + " is not defined");
+        }
+        Bytes.putInt(buffer, pos, groupKey.getIndex());
+        pos += 4;
+        int totalCounterNumberPos = pos;
+        pos += 4;
+        int totalCounters = 0;
+        
+        for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
+            final String counterName = entry.getKey();
+            final CounterKey counterKey = groupKey.getCounterKeyByName(counterName);
+            if (counterKey == null) {
+                continue;
+            }
+            final Long counterValue = entry.getValue();
+            Bytes.putInt(buffer, pos, counterKey.getIndex());
+            pos += 4;
+            Bytes.putLong(buffer, pos, counterValue);
+            pos += 8;
+            ++totalCounters;
+        }
+        Bytes.putInt(buffer, totalCounterNumberPos, totalCounters);
+        return pos;
+    }
+
+    private CounterGroupKey getCounterGroup(String groupName) throws JobCounterException {
+        if (dictionary == null) {
+            dictionary = CounterGroupDictionary.getInstance();
+        }
+        final CounterGroupKey groupKey = dictionary.getCounterGroupByName(groupName);
+        if (groupKey == null) {
+            throw new JobCounterException("Invalid counter group name: " + groupName);
+        }
+        return groupKey;
+    }
+
+    private CounterGroupKey getCounterGroup(int groupIndex) throws JobCounterException {
+        if (dictionary == null) {
+            dictionary = CounterGroupDictionary.getInstance();
+        }
+        return dictionary.getCounterGroupByIndex(groupIndex);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
new file mode 100644
index 0000000..3639ad0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jevent")
+@Service(JPAConstants.JPA_JOB_EVENT_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+public class JobEventAPIEntity extends JobBaseAPIEntity {
+
+    @Column("a")
+    private String eventType;
+
+    public String getEventType() {
+        return eventType;
+    }
+    public void setEventType(String eventType) {
+        this.eventType = eventType;
+        _pcs.firePropertyChange("eventType", null, null);
+    }
+}



Mime
View raw message