Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A046C200B35 for ; Tue, 5 Jul 2016 20:07:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9EC3D160A6F; Tue, 5 Jul 2016 18:07:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 50E87160A2C for ; Tue, 5 Jul 2016 20:07:56 +0200 (CEST) Received: (qmail 22483 invoked by uid 500); 5 Jul 2016 18:07:55 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 22474 invoked by uid 99); 5 Jul 2016 18:07:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 18:07:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 73F73182915 for ; Tue, 5 Jul 2016 18:07:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rFIuDJ4HhEzr for ; Tue, 5 Jul 2016 18:07:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id BC9F45FCD8 for ; Tue, 5 Jul 2016 18:07:41 +0000 (UTC) Received: (qmail 22013 invoked by uid 99); 5 Jul 2016 18:07:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jul 2016 18:07:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05E73E95B7; Tue, 5 Jul 2016 18:07:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Tue, 05 Jul 2016 18:07:44 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring archived-at: Tue, 05 Jul 2016 18:07:58 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java new file mode 100644 index 0000000..bb08ef0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class JHFMRVer1Parser implements JHFParserBase { + private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1Parser.class); + static final char LINE_DELIMITER_CHAR = '.'; + static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR}; + static final String KEY = "(\\w+)"; + // value is any character other than quote, but escaped quotes can be there + static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+"; + static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\""); + static final String MAX_COUNTER_COUNT = "10000"; + + private JHFMRVer1EventReader m_reader; + public JHFMRVer1Parser(JHFMRVer1EventReader reader){ + this.m_reader = reader; + } + + /** + * Parses history file and invokes Listener.handle() for + * each line of history. It can be used for looking through history + * files for specific items without having to keep whole history in memory. + * @throws IOException + */ + @Override + public void parse(InputStream in) throws Exception, ParseException { + // set enough counter number as user may build more counters + System.setProperty("mapreduce.job.counters.max", MAX_COUNTER_COUNT); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + try { + String line = null; + StringBuffer buf = new StringBuffer(); + + // Read the meta-info line. Note that this might be a jobinfo line for files written with older format + line = reader.readLine(); + + // Check if the file is empty + if (line == null) { + return; + } + + // Get the information required for further processing + MetaInfoManager mgr = new MetaInfoManager(line); + boolean isEscaped = mgr.isValueEscaped(); + String lineDelim = String.valueOf(mgr.getLineDelim()); + String escapedLineDelim = StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, mgr.getLineDelim()); + + do { + buf.append(line); + if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) { + buf.append("\n"); + continue; + } + parseLine(buf.toString(), m_reader, isEscaped); + buf = new StringBuffer(); + } while ((line = reader.readLine())!= null); + + // flush to tell listener that we have finished parsing + logger.info("finish parsing job history file and close"); + m_reader.close(); + } catch(Exception ex) { + logger.error("can not parse correctly ", ex); + throw ex; + } finally { + if (reader != null) { + reader.close(); + } + } + } + + private static void parseLine(String line, JHFMRVer1PerLineListener l, boolean isEscaped) throws Exception, ParseException { + // extract the record type + int idx = line.indexOf(' '); + String recType = line.substring(0, idx); + String data = line.substring(idx+1, line.length()); + + Matcher matcher = pattern.matcher(data); + Map parseBuffer = new HashMap(); + + while(matcher.find()) { + String tuple = matcher.group(0); + String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '='); + String value = parts[1].substring(1, parts[1].length() -1); + if (isEscaped) { + value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape); + } + parseBuffer.put(Keys.valueOf(parts[0]), value); + } + +// if(conf!=null){ +// parseBuffer.put(Keys.NORM_JOBNAME, conf.get(JPAConstants.JOB_CONF_NORM_JOBNAME_KEY)); +// } + + try { + l.handle(RecordTypes.valueOf(recType), parseBuffer); + } catch (IllegalArgumentException ex) { + // somehow record type does not exist, but continue to run + logger.warn("record type does not exist " + recType, ex); + } + + parseBuffer.clear(); + } + + /** + * Manages job-history's meta information such as version etc. + * Helps in logging version information to the job-history and recover + * version information from the history. + */ + static class MetaInfoManager implements JHFMRVer1PerLineListener { + private long version = 0L; + private KeyValuePair pairs = new KeyValuePair(); + + public void close() { + } + // Extract the version of the history that was used to write the history + public MetaInfoManager(String line) throws Exception, ParseException { + if (null != line) { + // Parse the line + parseLine(line, this, false); + } + } + + // Get the line delimiter + char getLineDelim() { + if (version == 0) { + return '"'; + } else { + return LINE_DELIMITER_CHAR; + } + } + + // Checks if the values are escaped or not + boolean isValueEscaped() { + // Note that the values are not escaped in version 0 + return version != 0; + } + + public void handle(RecordTypes recType, Map values) throws IOException { + // Check if the record is of type META + if (RecordTypes.Meta == recType) { + pairs.handle(values); + version = pairs.getLong(Keys.VERSION); // defaults to 0 + } + } + } + + /** + * Base class contais utility stuff to manage types key value pairs with enums. + */ + static class KeyValuePair { + private Map values = new HashMap(); + + /** + * Get 'String' value for given key. Most of the places use Strings as + * values so the default get' method returns 'String'. This method never returns + * null to ease on GUIs. if no value is found it returns empty string "" + * @param k + * @return if null it returns empty string - "" + */ + public String get(Keys k) { + String s = values.get(k); + return s == null ? "" : s; + } + /** + * Convert value from history to int and return. + * if no value is found it returns 0. + * @param k key + */ + public int getInt(Keys k) { + String s = values.get(k); + if (null != s){ + return Integer.parseInt(s); + } + return 0; + } + /** + * Convert value from history to int and return. + * if no value is found it returns 0. + * @param k + */ + public long getLong(Keys k) { + String s = values.get(k); + if (null != s){ + return Long.parseLong(s); + } + return 0; + } + /** + * Set value for the key. + * @param k + * @param s + */ + public void set(Keys k, String s) { + values.put(k, s); + } + /** + * Adds all values in the Map argument to its own values. + * @param m + */ + public void set(Map m) { + values.putAll(m); + } + /** + * Reads values back from the history, input is same Map as passed to Listener by parseHistory(). + * @param values + */ + public synchronized void handle(Map values) { + set(values); + } + /** + * Returns Map containing all key-values. + */ + public Map getValues() { + return values; + } + } + + /** + * Job history files contain key="value" pairs, where keys belong to this enum. + * It acts as a global namespace for all keys. + */ + public static enum Keys { + JOBTRACKERID, + START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, + LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, + FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, + ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, + SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, + TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS, + VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK, + + UBERISED,SPLIT_LOCATIONS,FAILED_DUE_TO_ATTEMPT,MAP_FINISH_TIME,PORT,RACK_NAME, + + //For Artemis + WORKFLOW_ID,WORKFLOW_NAME,WORKFLOW_NODE_NAME,WORKFLOW_ADJACENCIES,WORKFLOW_TAGS, + SHUFFLE_PORT,LOCALITY,AVATAAR,FAIL_REASON + } +} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java new file mode 100644 index 0000000..1c096fc --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; + +import java.io.IOException; +import java.util.Map; + +/** + * Callback interface for reading back log events from JobHistory. This interface + * should be implemented and passed to JobHistory.parseHistory() + * + */ +public interface JHFMRVer1PerLineListener{ + /** + * Callback method for history parser. + * @param recType type of record, which is the first entry in the line. + * @param values a map of key-value pairs as thry appear in history. + * @throws IOException + */ + void handle(RecordTypes recType, Map values) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java new file mode 100644 index 0000000..6fd2c18 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.jobhistory.*; +import org.apache.hadoop.util.StringInterner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class JHFMRVer2EventReader extends JHFEventReaderBase { + private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2EventReader.class); + + /** + * Create a new Event Reader + * @throws IOException + */ + public JHFMRVer2EventReader(Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { + super(baseTags, configuration, filter); + } + + @SuppressWarnings("deprecation") + public void handleEvent(Event wrapper) throws Exception { + switch (wrapper.type) { + case JOB_SUBMITTED: + handleJobSubmitted(wrapper); break; + case JOB_INITED: + handleJobInited(wrapper); break; + case JOB_FINISHED: + handleJobFinished(wrapper); break; + case JOB_PRIORITY_CHANGED: + handleJobPriorityChanged(); break; + case JOB_STATUS_CHANGED: + handleJobStatusChanged(); break; + case JOB_FAILED: + case JOB_KILLED: + case JOB_ERROR: + handleJobUnsuccessfulCompletion(wrapper); break; + case JOB_INFO_CHANGED: + handleJobInfoChanged(); break; + case JOB_QUEUE_CHANGED: + handleJobQueueChanged();break; + case TASK_STARTED: + handleTaskStarted(wrapper); break; + case TASK_FINISHED: + handleTaskFinished(wrapper); break; + case TASK_FAILED: + handleTaskFailed(wrapper); break; + case TASK_UPDATED: + handleTaskUpdated(); break; + + // map task + case MAP_ATTEMPT_STARTED: + handleMapAttemptStarted(wrapper); break; + case MAP_ATTEMPT_FINISHED: + handleMapAttemptFinished(wrapper); break; + case MAP_ATTEMPT_FAILED: + handleMapAttemptFailed(wrapper); break; + case MAP_ATTEMPT_KILLED: + handleMapAttemptKilled(wrapper); break; + + // reduce task + case REDUCE_ATTEMPT_STARTED: + handleReduceAttemptStarted(wrapper); break; + case REDUCE_ATTEMPT_FINISHED: + handleReduceAttemptFinished(wrapper); break; + case REDUCE_ATTEMPT_FAILED: + handleReduceAttemptFailed(wrapper); break; + case REDUCE_ATTEMPT_KILLED: + handleReduceAttemptKilled(wrapper); break; + + // set up task + case SETUP_ATTEMPT_STARTED: + break; + case SETUP_ATTEMPT_FINISHED: + handleSetupAttemptFinished(); break; + case SETUP_ATTEMPT_FAILED: + handleSetupAttemptFailed(); break; + case SETUP_ATTEMPT_KILLED: + handleSetupAttemptKilled(); break; + + // clean up task + case CLEANUP_ATTEMPT_STARTED: + break; + case CLEANUP_ATTEMPT_FINISHED: + handleCleanupAttemptFinished(); break; + case CLEANUP_ATTEMPT_FAILED: + handleCleanupAttemptFailed(); break; + case CLEANUP_ATTEMPT_KILLED: + handleCleanupAttemptKilled(); break; + + case AM_STARTED: + handleAMStarted(); break; + default: + logger.warn("unexpected event type: " + wrapper.type); + } + } + + private void handleJobPriorityChanged() throws Exception { + return; + } + private void handleJobStatusChanged() throws Exception { + return; + } + private void handleJobInfoChanged() throws Exception { + return; + } + + private void handleJobQueueChanged() throws Exception { + return; + } + + private void handleJobSubmitted(Event wrapper) throws Exception { + JobSubmitted js = ((JobSubmitted)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString()); + if (js.getJobName() != null) values.put(Keys.JOBNAME, js.getJobName().toString()); + if (js.getUserName() != null) values.put(Keys.USER, js.getUserName().toString()); + if (js.getSubmitTime() != null) values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString()); + if (js.getJobQueueName() != null) values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString()); + handleJob(wrapper.getType(), values, null); + } + + private void handleJobInited(Event wrapper) throws Exception { + JobInited js = ((JobInited)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString()); + if (js.getLaunchTime() != null) values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString()); + if (js.getTotalMaps() != null) values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString()); + if (js.getTotalReduces() != null) values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString()); + if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString()); + if (js.getUberized() != null) values.put(Keys.UBERISED, js.getUberized().toString()); + handleJob(wrapper.getType(), values, null); + } + + private void handleJobFinished(Event wrapper) throws Exception { + JobFinished js = ((JobFinished)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString()); + if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString()); + if (js.getFailedMaps() != null) values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString()); + if (js.getFailedReduces() != null) values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString()); + values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name()); + handleJob(wrapper.getType(), values, js.getTotalCounters()); + } + + private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception { + JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString()); + if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString()); + if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString()); + handleJob(wrapper.getType(), values, null); + } + + private void handleTaskStarted(Event wrapper) throws Exception { + TaskStarted js = ((TaskStarted)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString()); + if (js.getSplitLocations() != null) values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString()); + handleTask(RecordTypes.Task, wrapper.getType(), values, null); + } + + private void handleTaskFinished(Event wrapper) throws Exception { + TaskFinished js = ((TaskFinished)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString())); + handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters()); + } + + private void handleTaskFailed(Event wrapper) throws Exception { + TaskFailed js = ((TaskFailed)wrapper.getEvent()); + Map values = new HashMap<>(); + + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString())); + if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString()); + if (js.getFailedDueToAttempt() != null) values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString()); + handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters()); + } + + private String normalizeTaskStatus(String taskStatus) { + if (taskStatus.equals("SUCCEEDED")) + return EagleTaskStatus.SUCCESS.name(); + return taskStatus; + } + + private void handleTaskUpdated(){ + return; + } + + private void handleMapAttemptStarted(Event wrapper) throws Exception { + handleTaskAttemptStarted(wrapper, RecordTypes.MapAttempt); + } + private void handleReduceAttemptStarted(Event wrapper) throws Exception { + handleTaskAttemptStarted(wrapper, RecordTypes.ReduceAttempt); + } + + private void handleTaskAttemptStarted(Event wrapper, RecordTypes recordType) throws Exception { + TaskAttemptStarted js = ((TaskAttemptStarted)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); + if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString()); + if (js.getTrackerName() != null) values.put(Keys.TRACKER_NAME, js.getTrackerName().toString()); + if (js.getHttpPort() != null) values.put(Keys.HTTP_PORT, js.getHttpPort().toString()); + if (js.getShufflePort() != null) values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString()); + if (js.getLocality() != null) values.put(Keys.LOCALITY, js.getLocality().toString()); + if (js.getAvataar() != null) values.put(Keys.AVATAAR, js.getAvataar().toString()); + handleTask(recordType,wrapper.getType(), values, null); + } + + private void handleMapAttemptFinished(Event wrapper) throws Exception { + MapAttemptFinished js = ((MapAttemptFinished)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString())); + if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getMapFinishTime() != null) values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString()); + if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString()); + if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString()); + if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString()); + if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString()); + + ensureRackAfterAttemptFinish(js.getRackname().toString(), values); + handleTask(RecordTypes.MapAttempt,wrapper.getType(), values, js.getCounters()); + } + private void handleReduceAttemptFinished(Event wrapper) throws Exception { + ReduceAttemptFinished js = ((ReduceAttemptFinished)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString())); + if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getShuffleFinishTime() != null) values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString()); + if (js.getSortFinishTime() != null) values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString()); + if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString()); + if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString()); + if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString()); + if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString()); + ensureRackAfterAttemptFinish(js.getRackname().toString(), values); + handleTask(RecordTypes.ReduceAttempt,wrapper.getType(), values, js.getCounters()); + } + + private void ensureRackAfterAttemptFinish(String rackname, Map values) { + // rack name has the format like /default/rack13 + String[] tmp = rackname.split("/"); + String rack = tmp[tmp.length - 1]; + values.put(Keys.RACK, rack); + m_host2RackMapping.put(values.get(Keys.HOSTNAME), rack); + } + + private void handleMapAttemptFailed(Event wrapper) throws Exception { + handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt); + } + private void handleReduceAttemptFailed(Event wrapper) throws Exception { + handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt); + } + private void handleMapAttemptKilled(Event wrapper)throws Exception { + handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt); + } + private void handleReduceAttemptKilled(Event wrapper)throws Exception { + handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt); + } + + private void handleTaskAttemptFailed(Event wrapper, RecordTypes recordType) throws Exception { + TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion)wrapper.getEvent()); + Map values = new HashMap<>(); + if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString()); + if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString()); + if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString()); + if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString()); + if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString()); + if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString()); + if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString()); + if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString()); + if (js.getStatus() != null) values.put(Keys.TASK_STATUS, js.getStatus().toString()); + ensureRackAfterAttemptFinish(js.getRackname().toString(), values); + handleTask(recordType,wrapper.getType(), values, js.getCounters()); + } + private void handleSetupAttemptFinished(){ + return; + } + private void handleSetupAttemptFailed(){ + return; + } + private void handleSetupAttemptKilled(){ + return; + } + private void handleCleanupAttemptFinished(){ + return; + } + private void handleCleanupAttemptFailed(){ + return; + } + private void handleCleanupAttemptKilled(){ + return; + } + private void handleAMStarted(){ + return; + } + + protected JobCounters parseCounters(Object value) throws IOException{ + JobCounters jc = new JobCounters(); + Map> groups = new HashMap<>(); + JhCounters counters = (JhCounters)value; + List list = counters.getGroups(); + for (JhCounterGroup cg : list) { + String cgName = cg.getName().toString(); + if(!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter") + && !cgName.equals("org.apache.hadoop.mapreduce.TaskCounter") + && !cgName.equals("org.apache.hadoop.mapreduce.JobCounter") + && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter") + && !cgName.equals("org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter") + && !cgName.equals("FileSystemCounters") // for artemis + && !cgName.equals("org.apache.hadoop.mapred.Task$Counter") // for artemis + && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis + && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") // for artemis + ) continue; + + groups.put(cgName, new HashMap()); + Map counterValues = groups.get(cgName); + logger.debug("groupname:" + cg.getName() + "(" + cg.getDisplayName() + ")"); + + for (JhCounter c : cg.getCounts()) { + counterValues.put(c.getName().toString(), c.getValue()); + logger.debug(c.getName() + "=" + c.getValue() + "(" + c.getDisplayName() + ")"); + } + } + jc.setCounters(groups); + return jc; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java new file mode 100644 index 0000000..c289e4d --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.avro.Schema; +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.hadoop.mapreduce.jobhistory.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class JHFMRVer2Parser implements JHFParserBase { + private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class); + private JHFMRVer2EventReader _reader; + + public JHFMRVer2Parser(JHFMRVer2EventReader reader){ + this._reader = reader; + } + + @SuppressWarnings({ "rawtypes", "deprecation" }) + @Override + public void parse(InputStream is) throws Exception { + int eventCtr = 0; + try { + long start = System.currentTimeMillis(); + DataInputStream in = new DataInputStream(is); + String version = in.readLine(); + if (!"Avro-Json".equals(version)) { + throw new IOException("Incompatible event log version: " + version); + } + + Schema schema = Schema.parse(in.readLine()); + SpecificDatumReader datumReader = new SpecificDatumReader(schema); + JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in); + + Event wrapper; + while ((wrapper = getNextEvent(datumReader, decoder)) != null) { + ++eventCtr; + _reader.handleEvent(wrapper); + } + _reader.parseConfiguration(); + // don't need put to finally as it's a kind of flushing data + _reader.close(); + logger.info("reader used " + (System.currentTimeMillis() - start) + "ms"); + } catch (Exception ioe) { + logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe); + throw ioe; + } finally { + if (is != null) { + is.close(); + } + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public Event getNextEvent(DatumReader datumReader, JsonDecoder decoder) throws Exception { + Event wrapper; + try { + wrapper = (Event)datumReader.read(null, decoder); + } catch (java.io.EOFException e) { // at EOF + return null; + } + return wrapper; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java new file mode 100644 index 0000000..e4f437f --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import java.io.InputStream; + +/** + * + * @author yonzhang + * + */ +public interface JHFParserBase { + /** + * this method will ensure to close the inputstream + * @param is + * @throws Exception + */ + void parse(InputStream is) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java new file mode 100755 index 0000000..dd640c2 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class JHFParserFactory { + + private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class); + + public static JHFParserBase getParser(JHFConfigManager configManager, Map baseTags, Configuration configuration, JobHistoryContentFilter filter) { + String format = configManager.getJobExtractorConfig().mrVersion; + JHFParserBase parser; + JHFFormat f; + try { + if (format == null) { + f = JHFFormat.MRVer1; + } else { + f = JHFFormat.valueOf(format); + } + } catch(IllegalArgumentException ex) { + f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2 + } + + switch (f) { + case MRVer2: + JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter); + reader2.addListener(new JobEntityCreationEagleServiceListener(configManager)); + reader2.addListener(new TaskFailureListener(configManager)); + reader2.addListener(new TaskAttemptCounterListener(configManager)); + reader2.addListener(new JobConfigurationCreationServiceListener(configManager)); + + reader2.register(new JobEntityLifecycleAggregator()); + parser = new JHFMRVer2Parser(reader2); + break; + case MRVer1: + default: + JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter); + reader1.addListener(new JobEntityCreationEagleServiceListener(configManager)); + reader1.addListener(new TaskFailureListener(configManager)); + reader1.addListener(new TaskAttemptCounterListener(configManager)); + + reader1.register(new JobEntityLifecycleAggregator()); + parser = new JHFMRVer1Parser(reader1); + break; + } + return parser; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java new file mode 100644 index 0000000..4529f8b --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +/** + * used to warn that one job history file has not yet completed writing to hdfs + * This happens when feeder catches up and the history file has not been written into hdfs completely + * @author yonzhang + * + */ +public class JHFWriteNotCompletedException extends Exception { + /** + * + */ + private static final long serialVersionUID = -3060175780718218490L; + + public JHFWriteNotCompletedException(String msg){ + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java new file mode 100644 index 0000000..a748565 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.JobConfigurationAPIEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener { + private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class); + private static final int MAX_RETRY_TIMES = 3; + private JHFConfigManager configManager; + private JobConfigurationAPIEntity m_jobConfigurationEntity; + + public JobConfigurationCreationServiceListener(JHFConfigManager configManager) { + this.configManager = configManager; + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + if (entity != null) { + if (entity instanceof JobConfigurationAPIEntity) { + this.m_jobConfigurationEntity = (JobConfigurationAPIEntity)entity; + } + } + } + + @Override + public void jobFinish() { + + } + + @Override + public void flush() throws Exception { + JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + List list = new ArrayList<>(); + list.add(m_jobConfigurationEntity); + + int tried = 0; + while (tried <= MAX_RETRY_TIMES) { + try { + logger.info("start flushing JobConfigurationAPIEntity entities of total number " + list.size()); + client.create(list); + logger.info("finish flushing entities of total number " + list.size()); + break; + } catch (Exception ex) { + if (tried < MAX_RETRY_TIMES) { + logger.error("Got exception to flush, retry as " + (tried + 1) +" times",ex); + } else { + logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); + } + } finally { + list.clear(); + m_jobConfigurationEntity = null; + client.getJerseyClient().destroy(); + client.close(); + } + tried ++; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java new file mode 100644 index 0000000..15c932d --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.entities.*; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCreationListener { + private static final Logger logger = LoggerFactory.getLogger(JobEntityCreationEagleServiceListener.class); + private static final int BATCH_SIZE = 1000; + private int batchSize; + private List list = new ArrayList<>(); + private JHFConfigManager configManager; + List jobs = new ArrayList<>(); + List jobEvents = new ArrayList<>(); + List taskExecs = new ArrayList<>(); + List taskAttemptExecs = new ArrayList<>(); + + public JobEntityCreationEagleServiceListener(JHFConfigManager configManager){ + this(configManager, BATCH_SIZE); + } + + public JobEntityCreationEagleServiceListener(JHFConfigManager configManager, int batchSize) { + this.configManager = configManager; + if (batchSize <= 0) + throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided"); + this.batchSize = batchSize; + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + list.add(entity); + if (list.size() % batchSize == 0) { + flush(); + list.clear(); + } + } + + /** + * We need save network bandwidth as well + */ + @Override + public void flush() throws Exception { + JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + logger.info("start flushing entities of total number " + list.size()); + for (int i = 0; i < list.size(); i++) { + JobBaseAPIEntity entity = list.get(i); + if (entity instanceof JobExecutionAPIEntity) { + jobs.add((JobExecutionAPIEntity)entity); + } else if(entity instanceof JobEventAPIEntity) { + jobEvents.add((JobEventAPIEntity)entity); + } else if(entity instanceof TaskExecutionAPIEntity) { + taskExecs.add((TaskExecutionAPIEntity)entity); + } else if(entity instanceof TaskAttemptExecutionAPIEntity) { + taskAttemptExecs.add((TaskAttemptExecutionAPIEntity)entity); + } + } + GenericServiceAPIResponseEntity result; + if (jobs.size() > 0) { + logger.info("flush JobExecutionAPIEntity of number " + jobs.size()); + result = client.create(jobs); + checkResult(result); + jobs.clear(); + } + if (jobEvents.size() > 0) { + logger.info("flush JobEventAPIEntity of number " + jobEvents.size()); + result = client.create(jobEvents); + checkResult(result); + jobEvents.clear(); + } + if (taskExecs.size() > 0) { + logger.info("flush TaskExecutionAPIEntity of number " + taskExecs.size()); + result = client.create(taskExecs); + checkResult(result); + taskExecs.clear(); + } + if (taskAttemptExecs.size() > 0) { + logger.info("flush TaskAttemptExecutionAPIEntity of number " + taskAttemptExecs.size()); + result = client.create(taskAttemptExecs); + checkResult(result); + taskAttemptExecs.clear(); + } + logger.info("finish flushing entities of total number " + list.size()); + list.clear(); + client.getJerseyClient().destroy(); + client.close(); + } + + private void checkResult(GenericServiceAPIResponseEntity result) throws Exception { + if (!result.isSuccess()) { + logger.error(result.getException()); + throw new Exception("Entity creation fails going to EagleService"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java new file mode 100644 index 0000000..b9dc13b --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; + +import java.util.Vector; + +/** + * not thread safe + * @author yonzhang + * + */ +public class JobEntityCreationPublisher { + private Vector listeners = new Vector<>(2); + public void addListener(HistoryJobEntityCreationListener l){ + listeners.add(l); + } + + public void notifiyListeners(JobBaseAPIEntity entity) throws Exception { + for (HistoryJobEntityCreationListener l : listeners) { + l.jobEntityCreated(entity); + } + } + + public void flush() throws Exception { + for (HistoryJobEntityCreationListener l : listeners) { + l.flush(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java new file mode 100644 index 0000000..e485cc8 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.eagle.jpm.mr.history.common.JPAConstants; +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleListener { + private final static Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class); + private JobExecutionAPIEntity m_jobExecutionAPIEntity; + private final JobCounterAggregateFunction m_mapTaskAttemptCounterAgg; + private final JobCounterAggregateFunction m_reduceTaskAttemptCounterAgg; + + private final JobCounterAggregateFunction m_mapFileSystemCounterAgg; + private final JobCounterAggregateFunction m_reduceFileSystemTaskCounterAgg; + + private long m_mapAttemptDuration = 0; + private long m_reduceAttemptDuration = 0; + private boolean jobFinished = false; + + public JobEntityLifecycleAggregator() { + this.m_mapTaskAttemptCounterAgg = new JobCounterSumFunction(); + this.m_reduceTaskAttemptCounterAgg = new JobCounterSumFunction(); + this.m_mapFileSystemCounterAgg = new JobCounterSumFunction(); + this.m_reduceFileSystemTaskCounterAgg = new JobCounterSumFunction(); + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + if (entity != null ) { + if (entity instanceof TaskAttemptExecutionAPIEntity) { + taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity); + } else if(entity instanceof JobExecutionAPIEntity) { + this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity; + } + } + } + + @Override + public void jobFinish() { + try { + if (m_jobExecutionAPIEntity == null) { + throw new IOException("No JobExecutionAPIEntity found before flushing"); + } + + LOG.debug("Updating aggregated task attempts to job level counters"); + + JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters(); + + if (jobCounters == null) { + LOG.warn("no job counter found for "+this.m_jobExecutionAPIEntity); + jobCounters = new JobCounters(); + } + + Map> counters = jobCounters.getCounters(); + + Map mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result(); + if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>(); + mapTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration); + counters.put(JPAConstants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter); + + Map reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result(); + if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>(); + reduceTaskAttemptCounter.put(JPAConstants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration); + counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter); + + counters.put(JPAConstants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result()); + counters.put(JPAConstants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result()); + + jobCounters.setCounters(counters); + + m_jobExecutionAPIEntity.setJobCounters(jobCounters); + jobFinished = true; + } catch (Exception e) { + LOG.error("Failed to update job execution entity: " + this.m_jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e); + } + } + + private void taskAttemptEntityCreated(TaskAttemptExecutionAPIEntity entity) { + JobCounters jobCounters = entity.getJobCounters(); + String taskType = entity.getTags().get(JPAConstants.JOB_TASK_TYPE_TAG); + + if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) { + if (JPAConstants.TaskType.MAP.toString().equals(taskType.toUpperCase())) { + m_mapAttemptDuration += entity.getDuration(); + this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER)); + this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER)); + return; + } else if (JPAConstants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) { + m_reduceAttemptDuration += entity.getDuration(); + this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.TASK_COUNTER)); + this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(JPAConstants.FILE_SYSTEM_COUNTER)); + return; + } + } + + ObjectMapper objectMapper = new ObjectMapper(); + try { + LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity)); + } catch (JsonProcessingException e) { + LOG.error(e.getMessage(),e); + } + } + + @Override + public void flush() throws Exception { + if (!this.jobFinished) { + this.jobFinish(); + } + } + + interface JobCounterAggregateFunction { + void accumulate(Map mapCounters); + Map result(); + } + + static class JobCounterSumFunction implements JobCounterAggregateFunction { + final Map result; + + public JobCounterSumFunction(){ + result = new HashMap<>(); + } + + /** + * @param counters + */ + @Override + public void accumulate(Map counters) { + if (counters != null) { + for (Map.Entry taskEntry: counters.entrySet()) { + String counterName = taskEntry.getKey(); + long counterValue = taskEntry.getValue(); + + if (!result.containsKey(counterName)) { + result.put(counterName, counterValue); + } else { + long preValue = result.get(counterName); + result.put(counterName, preValue + counterValue); + } + } + } + } + + @Override + public Map result() { + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java new file mode 100644 index 0000000..a2f4c3a --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class MRErrorClassifier { + + private final List categories = new ArrayList(); + + public MRErrorClassifier(InputStream configStream) throws IOException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(configStream)); + String line; + while((line = reader.readLine()) != null) { + line = line.trim(); + if (line.isEmpty() || line.startsWith("#")) { + continue; + } + int index = line.indexOf("="); + if (index == -1) { + throw new IllegalArgumentException("Invalid config, line: " + line); + } + final String key = line.substring(0, index).trim(); + final String value = line.substring(index + 1).trim(); + index = value.indexOf("|"); + if (index == -1) { + throw new IllegalArgumentException("Invalid config, line: " + line); + } + final boolean needTransform = Boolean.valueOf(value.substring(0, index)); + final Pattern pattern = Pattern.compile(value.substring(index + 1)); + final ErrorCategory category = new ErrorCategory(); + category.setName(key); + category.setNeedTransform(needTransform); + category.setPattern(pattern); + categories.add(category); + } + } + + public List getErrorCategories() { + return categories; + } + + public String classifyError(String error) throws IOException { + for (ErrorCategory category : categories) { + final String result = category.classify(error); + if (result != null) { + return result; + } + } + return "UNKNOWN"; + } + + public static class ErrorCategory { + private String name; + private Pattern pattern; + private boolean needTransform; + + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public Pattern getPattern() { + return pattern; + } + public void setPattern(Pattern pattern) { + this.pattern = pattern; + } + public boolean isNeedTransform() { + return needTransform; + } + public void setNeedTransform(boolean needTransform) { + this.needTransform = needTransform; + } + + public String classify(String error) { + Matcher matcher = pattern.matcher(error); + if (matcher.find()) { + if (!needTransform) { + return name; + } else { + return matcher.group(1); + } + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java new file mode 100644 index 0000000..d2ac944 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; +/** + * Record types are identifiers for each line of log in history files. + * A record type appears as the first token in a single line of log. + */ +public enum RecordTypes { + Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java new file mode 100755 index 0000000..23e7072 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.TaskAttemptCounterAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class TaskAttemptCounterListener implements HistoryJobEntityCreationListener { + private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class); + private static final int BATCH_SIZE = 1000; + private Map counters = new HashMap<>(); + private JHFConfigManager configManager; + + public TaskAttemptCounterListener(JHFConfigManager configManager) { + this.configManager = configManager; + } + + private static class CounterKey { + Map tags = new HashMap<>(); + long timestamp; + + @Override + public boolean equals(Object thatKey) { + if (!(thatKey instanceof CounterKey)) + return false; + CounterKey that = (CounterKey)thatKey; + if (that.tags.equals(this.tags) && that.timestamp == this.timestamp) + return true; + return false; + } + + @Override + public int hashCode(){ + return tags.hashCode() ^ Long.valueOf(timestamp).hashCode(); + } + } + + private static class CounterValue { + int totalCount; + int failedCount; + int killedCount; + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + if (!(entity instanceof TaskAttemptExecutionAPIEntity)) { + return; + } + + TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity; + + Map tags = new HashMap<>(); + tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString())); + tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString())); + tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString())); + tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString())); + tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString())); + tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString())); + + CounterKey key = new CounterKey(); + key.tags = tags; + key.timestamp = roundToMinute(e.getEndTime()); + + CounterValue value = counters.get(key); + if (value == null) { + value = new CounterValue(); + counters.put(key, value); + } + + if (e.getTaskStatus().equals(EagleTaskStatus.FAILED.name())) { + value.failedCount++; + } else if(e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) { + value.killedCount++; + } + value.totalCount++; + } + + private long roundToMinute(long timestamp) { + GregorianCalendar cal = new GregorianCalendar(); + cal.setTimeInMillis(timestamp); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTimeInMillis(); + } + + @Override + public void flush() throws Exception { + JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + List list = new ArrayList<>(); + logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size()); + // create entity + for (Map.Entry entry : counters.entrySet()) { + CounterKey key = entry.getKey(); + CounterValue value = entry.getValue(); + TaskAttemptCounterAPIEntity entity = new TaskAttemptCounterAPIEntity(); + entity.setTags(key.tags); + entity.setTimestamp(key.timestamp); + entity.setTotalCount(value.totalCount); + entity.setFailedCount(value.failedCount); + entity.setKilledCount(value.killedCount); + list.add(entity); + + if (list.size() >= BATCH_SIZE) { + logger.info("start flushing TaskAttemptCounter " + list.size()); + client.create(list); + logger.info("end flushing TaskAttemptCounter " + list.size()); + list.clear(); + } + } + + logger.info("start flushing rest of TaskAttemptCounter " + list.size()); + client.create(list); + logger.info("end flushing rest of TaskAttemptCounter " + list.size()); + logger.info("end flushing TaskAttemptCounter entities of total number " + counters.size()); + counters.clear(); + list.clear(); + client.getJerseyClient().destroy(); + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java new file mode 100755 index 0000000..14cc882 --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.mr.history.parser; + +import org.apache.eagle.jpm.mr.history.common.JHFConfigManager; +import org.apache.eagle.jpm.mr.history.entities.JobBaseAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.mr.history.entities.TaskFailureCountAPIEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TaskFailureListener implements HistoryJobEntityCreationListener { + private static final Logger logger = LoggerFactory.getLogger(TaskFailureListener.class); + private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = "MRErrorCategory.config"; + private static final int BATCH_SIZE = 1000; + private static final int MAX_RETRY_TIMES = 3; + + private final List failureTasks = new ArrayList(); + private final MRErrorClassifier classifier; + private JHFConfigManager configManager; + + public TaskFailureListener(JHFConfigManager configManager) { + this.configManager = configManager; + InputStream is = null; + try { + is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME); + URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME); + if (url != null) { + logger.info("Feeder is going to load configuration file: " + url.toString()); + } + classifier = new MRErrorClassifier(is); + } catch (IOException ex) { + throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory"); + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + } + } + } + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + if (!(entity instanceof TaskAttemptExecutionAPIEntity)) + return; + + TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity; + // only store those killed or failed tasks + if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) + return; + + TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity(); + Map tags = new HashMap<>(); + failureTask.setTags(tags); + tags.put(EagleJobTagName.SITE.toString(), e.getTags().get(EagleJobTagName.SITE.toString())); + tags.put(EagleJobTagName.NORM_JOB_NAME.toString(), e.getTags().get(EagleJobTagName.NORM_JOB_NAME.toString())); + tags.put(EagleJobTagName.RACK.toString(), e.getTags().get(EagleJobTagName.RACK.toString())); + tags.put(EagleJobTagName.HOSTNAME.toString(), e.getTags().get(EagleJobTagName.HOSTNAME.toString())); + tags.put(EagleJobTagName.JOB_ID.toString(), e.getTags().get(EagleJobTagName.JOB_ID.toString())); + tags.put(EagleJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID()); + tags.put(EagleJobTagName.TASK_TYPE.toString(), e.getTags().get(EagleJobTagName.TASK_TYPE.toString())); + + //TODO need optimize, match and then capture the data + final String errCategory = classifier.classifyError(e.getError()); + tags.put(EagleJobTagName.ERROR_CATEGORY.toString(), errCategory); + + failureTask.setError(e.getError()); + failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future + failureTask.setTimestamp(e.getTimestamp()); + failureTask.setTaskStatus(e.getTaskStatus()); + failureTasks.add(failureTask); + + if (failureTasks.size() >= BATCH_SIZE) flush(); + } + + @Override + public void flush() throws Exception { + JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig(); + JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig(); + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleServiceHost, + eagleServiceConfig.eagleServicePort, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000); + + int tried = 0; + while (tried <= MAX_RETRY_TIMES) { + try { + logger.info("start flushing entities of total number " + failureTasks.size()); + client.create(failureTasks); + logger.info("finish flushing entities of total number " + failureTasks.size()); + failureTasks.clear(); + break; + } catch (Exception ex) { + if (tried < MAX_RETRY_TIMES) { + logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex); + } else { + logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex); + throw ex; + } + } + tried ++; + } + client.getJerseyClient().destroy(); + client.close(); + } +}