eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject incubator-eagle git commit: [EAGLE-539] add failure category and job count by state to mr feeder
Date Mon, 19 Sep 2016 10:04:38 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 02aab65b7 -> 83bb66661


[EAGLE-539] add failure category and job count by state to mr feeder

https://issues.apache.org/jira/browse/EAGLE-539

1. We need to classify the errors that generated by mr jobs so that we can analysis requirements
like top-n errors or top-n hosts that generate errors. I implement this by using the error
messages that generated by the tasks and extract the rules from them.
2. Another requirement is get job count by job status. When we parse jobs in mr history feeder,
we can save job id and job status in zookeeper, and then we can flush them to eagle server.

Author: wujinhu <wujinhu920@126.com>

Closes #434 from wujinhu/EAGLE-539.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/83bb6666
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/83bb6666
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/83bb6666

Branch: refs/heads/master
Commit: 83bb66661b46e8d69497e9459d7cca78264a1300
Parents: 02aab65
Author: wujinhu <wujinhu920@126.com>
Authored: Mon Sep 19 18:04:17 2016 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Mon Sep 19 18:04:17 2016 +0800

----------------------------------------------------------------------
 .../mr/historyentity/JobExecutionAPIEntity.java | 13 ++++++
 .../metrics/JobCountMetricsGenerator.java       | 43 ++++++++++++++------
 .../JobExecutionMetricsCreationListener.java    | 10 +++++
 .../mr/history/parser/JHFEventReaderBase.java   | 33 +++++++--------
 .../mr/history/parser/TaskFailureListener.java  |  1 +
 .../jpm/mr/running/parser/MRJobParser.java      |  3 +-
 .../org/apache/eagle/jpm/util/Constants.java    |  3 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   | 26 ++++++++++++
 8 files changed, 99 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index f7540d5..0e40099 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -23,6 +23,8 @@ import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
+import java.util.Map;
+
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eaglejpa")
 @ColumnFamily("f")
@@ -91,6 +93,8 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     private int failedReduceAttempts;
     @Column("ad")
     private String trackingUrl;
+    @Column("ae")
+    private Map<String, Map<String, String>> failedTasks;
 
     public String getTrackingUrl() {
         return trackingUrl;
@@ -343,4 +347,13 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
         this.failedReduceAttempts = failedReduceAttempts;
         valueChanged("failedReduceAttempts");
     }
+
+    public Map<String, Map<String, String>> getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(Map<String, Map<String, String>> failedTasks)
{
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
index a6f9d56..ac4a33d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -43,12 +44,19 @@ public class JobCountMetricsGenerator {
     public void flush(String date, int year, int month, int day) throws Exception {
         List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(date);
         final int total = jobs.size();
-        int fail = 0;
+        int succeeded = 0;
+        int killed = 0;
+
         for (Pair<String, String> job : jobs) {
-            if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
-                ++fail;
+            if (job.getRight().equals(EagleJobStatus.KILLED.toString())) {
+                ++killed;
+            }
+
+            if (job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
+                ++succeeded;
             }
         }
+        int failed = total - killed - succeeded;
 
         final IEagleServiceClient client = new EagleServiceClientImpl(
             MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
@@ -59,24 +67,33 @@ public class JobCountMetricsGenerator {
 
         GregorianCalendar cal = new GregorianCalendar(year, month, day);
         cal.setTimeZone(timeZone);
+
+        List<GenericMetricEntity> entities = new ArrayList<>();
+        entities.add(generateEntity(cal, EagleJobStatus.FAILED.toString(), failed));
+        entities.add(generateEntity(cal, EagleJobStatus.KILLED.toString(), killed));
+        entities.add(generateEntity(cal, EagleJobStatus.SUCCEEDED.toString(), succeeded));
+
+        LOG.info("start flushing entities of total number " + entities.size());
+        client.create(entities);
+        LOG.info("finish flushing entities of total number " + entities.size());
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+
+    private GenericMetricEntity generateEntity(GregorianCalendar calendar, String state,
int count) {
         GenericMetricEntity metricEntity = new GenericMetricEntity();
-        metricEntity.setTimestamp(cal.getTimeInMillis());
-        metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY);
-        metricEntity.setValue(new double[] {total, fail});
+        metricEntity.setTimestamp(calendar.getTimeInMillis());
+        metricEntity.setPrefix(String.format(Constants.HADOOP_HISTORY_TOTAL_METRIC_FORMAT,
Constants.JOB_LEVEL, Constants.JOB_COUNT_PER_DAY));
+        metricEntity.setValue(new double[] {count});
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
                 put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
+                put(MRJobTagName.JOB_STATUS.toString(), state);
             }
         };
         metricEntity.setTags(baseTags);
-        List<GenericMetricEntity> entities = new ArrayList<>();
-        entities.add(metricEntity);
 
-        LOG.info("start flushing entities of total number " + entities.size());
-        client.create(entities);
-        LOG.info("finish flushing entities of total number " + entities.size());
-        client.getJerseyClient().destroy();
-        client.close();
+        return metricEntity;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
index ce788a3..3e0256f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
@@ -20,10 +20,12 @@ package org.apache.eagle.jpm.mr.history.metrics;
 
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
 import org.apache.eagle.log.entity.GenericMetricEntity;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,6 +63,14 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation
                     }
                 }
             }
+
+            //generate Constants.JOB_COUNT_PER_HOUR data
+            Map<String, String> baseTags = new HashMap<>(tags);
+            baseTags.put(MRJobTagName.JOB_STATUS.toString(), entity.getCurrentState());
+            metrics.add(metricWrapper(timeStamp / 3600000 * 3600000,
+                Constants.JOB_COUNT_PER_HOUR,
+                new double[]{1},
+                baseTags));
         }
         return metrics;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d33c26b..c87f4b2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator;
@@ -26,6 +28,7 @@ import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.JobNameNormalization;
 import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.Utils;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
@@ -79,25 +82,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
 
     private JobCounterMetricsGenerator jobCounterMetricsGenerator;
 
-    public Constants.JobType fetchJobType(Configuration config) {
-        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
-            return Constants.JobType.CASCADING;
-        }
-        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
-            return Constants.JobType.HIVE;
-        }
-        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
-            return Constants.JobType.PIG;
-        }
-        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
-            return Constants.JobType.SCOOBI;
-        }
-        return Constants.JobType.NOTAVALIABLE;
-    }
-
     /**
      * baseTags stores the basic tag name values which might be used for persisting various
entities.
-     * baseTags includes: cluster, datacenter and jobName
+     * baseTags includes: site and jobName
      * baseTags are used for all job/task related entities
      *
      * @param baseTags
@@ -119,6 +106,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
         jobExecutionEntity.setTags(new HashMap<>(baseTags));
         jobExecutionEntity.setNumFailedMaps(0);
         jobExecutionEntity.setNumFailedReduces(0);
+        jobExecutionEntity.setFailedTasks(new HashMap<>());
 
         taskRunningHosts = new HashMap<>();
 
@@ -130,7 +118,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
         this.configuration = configuration;
 
         if (this.configuration != null && this.jobType == null) {
-            this.setJobType(fetchJobType(this.configuration).toString());
+            this.setJobType(Utils.fetchJobType(this.configuration).toString());
         }
         this.sumMapTaskDuration = 0L;
         this.sumReduceTaskDuration = 0L;
@@ -451,6 +439,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
             }
 
             entityCreated(entity);
+            if (entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()) != null) {
+                jobExecutionEntity.getFailedTasks().put(taskID,
+                    new HashMap<String, String>() {
+                        {
+                            put(entity.getTags().get(MRJobTagName.ERROR_CATEGORY.toString()),
entity.getError());
+                        }
+                    }
+                );
+            }
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 1a7a5fc..0c18133 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -93,6 +93,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener
{
         //TODO need optimize, match and then capture the data
         final String errCategory = classifier.classifyError(e.getError());
         tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
+        entity.getTags().put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
 
         failureTask.setError(e.getError());
         failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in
the future

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 5811f72..2e36bc4 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -452,7 +452,7 @@ public class MRJobParser implements Runnable {
             Utils.closeInputStream(is);
         }
 
-        Set<String> needFetchAttemptTasks = calcFetchCounterAndAttemptTaskId(tasks);
+        Set<String> needFetchAttemptTasks = new HashSet<>();//calcFetchCounterAndAttemptTaskId(tasks);
         for (MRTask task : tasks) {
             if (this.finishedTaskIds.contains(task.getId()) && !needFetchAttemptTasks.contains(task.getId()))
{
                 continue;
@@ -530,6 +530,7 @@ public class MRJobParser implements Runnable {
                     mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOD_DEF_ID.toString(),
value);
                 }
             }
+            mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), Utils.fetchJobType(config).toString());
             mrJobEntityMap.get(jobId).setJobConfig(config);
             mrJobConfigs.put(jobId, config);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index c9bb387..8c561bd 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -178,7 +178,8 @@ public class Constants {
     public static final String JOB_LEVEL = "job";
     public static final String TASK_LEVEL = "task";
     public static final String USER_LEVEL = "user";
-    public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count";
+    public static final String JOB_COUNT_PER_DAY = "day.count";
+    public static final String JOB_COUNT_PER_HOUR = "hour.count";
 
     public static final String HADOOP_HISTORY_TOTAL_METRIC_FORMAT = "hadoop.%s.history.%s";
     public static final String HADOOP_HISTORY_MINUTE_METRIC_FORMAT = "hadoop.%s.history.minute.%s";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/83bb6666/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index d738439..91077df 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.util;
 
+import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,6 +26,9 @@ import java.io.InputStream;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -86,4 +90,26 @@ public class Utils {
 
         return 0L;
     }
+    
+    public static Constants.JobType fetchJobType(Map config) {
+        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
+            return Constants.JobType.CASCADING;
+        }
+        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
+            return Constants.JobType.HIVE;
+        }
+        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
+            return Constants.JobType.PIG;
+        }
+        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
+            return Constants.JobType.SCOOBI;
+        }
+        return Constants.JobType.NOTAVALIABLE;
+    }
+
+    public static Constants.JobType fetchJobType(Configuration config) {
+        Map<String, String> mapConfig = new HashMap<>();
+        config.forEach(entry -> mapConfig.put(entry.getKey(), entry.getValue()));
+        return fetchJobType(mapConfig);
+    }
 }


Mime
View raw message