Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 4138 invoked from network); 11 Mar 2009 22:40:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Mar 2009 22:40:43 -0000 Received: (qmail 21483 invoked by uid 500); 11 Mar 2009 22:40:43 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 21470 invoked by uid 500); 11 Mar 2009 22:40:43 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 21459 invoked by uid 99); 11 Mar 2009 22:40:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2009 15:40:42 -0700 X-ASF-Spam-Status: No, hits=-1994.7 required=10.0 tests=ALL_TRUSTED,URIBL_WS_SURBL,URI_HEX X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2009 22:40:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B410E2388D70; Wed, 11 Mar 2009 22:39:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r752666 [8/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop... Date: Wed, 11 Mar 2009 22:39:32 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090311223943.B410E2388D70@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,12 +18,12 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.chukwa.extraction.engine.Record; @@ -31,386 +31,375 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class JobLogHistoryProcessor extends AbstractProcessor -{ - static Logger log = Logger.getLogger(JobLogHistoryProcessor.class); - - private static final String recordType = "JobLogHistory"; - private static String internalRegex = null; - private static Pattern ip = null; - - private Matcher internalMatcher = null; - - public JobLogHistoryProcessor() - { - internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; - ip = Pattern.compile(internalRegex); - internalMatcher = ip.matcher("-"); - } - - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) - throws Throwable - { - -// log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type[" -// + chunk.getDataType() + "]"); - - try - { - - HashMap keys = new HashMap(); - ChukwaRecord record = null; - - int firstSep = recordEntry.indexOf(" "); - keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep)); -// log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE][" -// + keys.get("RECORD_TYPE") + "]"); - - String body = recordEntry.substring(firstSep); - - internalMatcher.reset(body); - -// String fieldName = null; -// String fieldValue = null; - - while (internalMatcher.matches()) - { - - keys.put(internalMatcher.group(1).trim(), internalMatcher - .group(2).trim()); - - // TODO Remove debug info before production -// fieldName = internalMatcher.group(1).trim(); -// fieldValue = internalMatcher.group(2).trim(); -// log.info("JobLogHistoryProcessor Add field: [" + fieldName + -// "][" + fieldValue +"]" ); -// log.info("EOL : [" + internalMatcher.group(3) + "]" ); - internalMatcher.reset(internalMatcher.group(3)); - } - - if (!keys.containsKey("JOBID")) - { - // Extract JobID from taskID - // JOBID = "job_200804210403_0005" - // TASKID = "tip_200804210403_0005_m_000018" - String jobId = keys.get("TASKID"); - int idx1 = jobId.indexOf('_',0); - int idx2 = jobId.indexOf('_', idx1+1); - idx2 = jobId.indexOf('_', idx2+1); - keys.put("JOBID",jobId.substring(idx1+1,idx2)); -// log.info("JobLogHistoryProcessor Add field: [JOBID][" -// + keys.get("JOBID") + "]"); - } - else - { - String jobId = keys.get("JOBID").replace("_", "").substring(3); - keys.put("JOBID",jobId); - } - // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("SUBMIT_TIME")) - // { - // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB" - // USER="userxxx" - // // SUBMIT_TIME="1208760436751" - // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml" - // - // - // } - // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("LAUNCH_TIME")) - // { - // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110" - // TOTAL_MAPS="5912" TOTAL_REDUCES="739" - // - // } - // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("FINISH_TIME")) - // { - // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" - // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739" - // FAILED_MAPS="0" FAILED_REDUCES="0" - // // COUNTERS="File Systems.Local bytes read:1735053407244,File - // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes - // read:801605644910,File Systems.HDFS bytes written:44135800, - // // Job Counters .Launched map tasks:5912,Job Counters .Launched - // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job - // Counters .Rack-local map tasks:316,Map-Reduce Framework. - // // Map input records:9410696067,Map-Reduce Framework.Map output - // records:9410696067,Map-Reduce Framework.Map input - // bytes:801599188816,Map-Reduce Framework.Map output - // bytes:784427968116, - // // Map-Reduce Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0,Map-Reduce Framework.Reduce - // input groups:477265,Map-Reduce Framework.Reduce input - // records:739000, - // // Map-Reduce Framework.Reduce output records:739000" - // - // } - // else - if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") - && keys.containsKey("START_TIME")) - { - // MapAttempt TASK_TYPE="MAP" - // TASKID="tip_200804210403_0005_m_000018" - // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0" - // START_TIME="1208760437531" - // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("START_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("START_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/Map/S"); - output.collect(key, record); - - } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") - && keys.containsKey("FINISH_TIME")) - { - // MapAttempt TASK_TYPE="MAP" - // TASKID="tip_200804210403_0005_m_005494" - // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0" - // TASK_STATUS="SUCCESS" - // FINISH_TIME="1208760624124" - // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("FINISH_TIME", keys.get("FINISH_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/Map/E"); - output.collect(key, record); - } - - else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") - && keys.containsKey("START_TIME")) - { - // ReduceAttempt TASK_TYPE="REDUCE" - // TASKID="tip_200804210403_0005_r_000138" - // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" - // START_TIME="1208760454885" - // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("START_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("START_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SHUFFLE/S"); - output.collect(key, record); - - } else if (keys.get("RECORD_TYPE") - .equalsIgnoreCase("ReduceAttempt") - && keys.containsKey("FINISH_TIME")) - { - // ReduceAttempt TASK_TYPE="REDUCE" - // TASKID="tip_200804210403_0005_r_000138" - // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" - // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167" - // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395" - // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SHUFFLE/E"); - output.collect(key, record); - - // SORT - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("SHUFFLE_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SORT/S"); - output.collect(key, record); - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("SORT_FINISHED", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SORT/E"); - output.collect(key, record); - - // Reduce - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/REDUCE/S"); - output.collect(key, record); - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("FINISH_TIME", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/REDUCE/E"); - output.collect(key, record); - - } else if ( keys.get("RECORD_TYPE").equalsIgnoreCase("Job") ) - { - // 1 - // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx" - // SUBMIT_TIME="1208760906812" - // JOBCONF="/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml" - - // 2 - // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816" TOTAL_MAPS="3" TOTAL_REDUCES="7" - - // 3 - // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826" - // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" - // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0" - // COUNTERS="File Systems.Local bytes read:1735053407244,File - // Systems.Local bytes written:2610106384012,File Systems.HDFS - // bytes read:801605644910,File Systems.HDFS bytes - // written:44135800, - // Job Counters .Launched map tasks:5912,Job Counters .Launched - // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job - // Counters .Rack-local map tasks:316,Map-Reduce Framework. - // Map input records:9410696067,Map-Reduce Framework.Map output - // records:9410696067,Map-Reduce Framework.Map input - // bytes:801599188816,Map-Reduce Framework.Map output - // bytes:784427968116, - // Map-Reduce Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0,Map-Reduce - // Framework.Reduce input groups:477265,Map-Reduce - // Framework.Reduce input records:739000, - // Map-Reduce Framework.Reduce output records:739000" - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJob"); - if (keys.containsKey("COUNTERS")) - { - extractCounters(record, keys.get("COUNTERS")); - } - - key = new ChukwaRecordKey(); - key.setKey("MRJob/" + keys.get("JOBID") ); - key.setReduceType("MRJobReduceProcessor"); - - record = new ChukwaRecord(); - record.add(Record.tagsField, chunk.getTags()); - if (keys.containsKey("SUBMIT_TIME")) - { record.setTime(Long.parseLong(keys.get("SUBMIT_TIME"))); } - else if (keys.containsKey("LAUNCH_TIME")) - { record.setTime(Long.parseLong(keys.get("LAUNCH_TIME"))); } - else if (keys.containsKey("FINISH_TIME")) - { record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); } - - Iterator it = keys.keySet().iterator(); - while(it.hasNext()) - { - String field = it.next(); - record.add(field, keys.get(field)); - } - - - output.collect(key, record); - } - - if (keys.containsKey("TASK_TYPE") - && keys.containsKey("COUNTERS") - && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys - .get("TASK_TYPE").equalsIgnoreCase("MAP"))) - { - // MAP - // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP" - // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883" - // COUNTERS="File Systems.Local bytes read:159265655,File - // Systems.Local bytes written:318531310, - // File Systems.HDFS bytes read:145882417,Map-Reduce - // Framework.Map input records:1706604, - // Map-Reduce Framework.Map output records:1706604,Map-Reduce - // Framework.Map input bytes:145882057, - // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce - // Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0" - - // REDUCE - // Task TASKID="tip_200804210403_0005_r_000524" - // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS" - // FINISH_TIME="1208760877072" - // COUNTERS="File Systems.Local bytes read:1179319677,File - // Systems.Local bytes written:1184474889,File Systems.HDFS - // bytes written:59021, - // Map-Reduce Framework.Reduce input groups:684,Map-Reduce - // Framework.Reduce input records:1000,Map-Reduce - // Framework.Reduce output records:1000" - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime"); - extractCounters(record, keys.get("COUNTERS")); - record.add("JOBID",keys.get("JOBID")); - record.add("TASKID", keys.get("TASKID")); - record.add("TASK_TYPE", keys.get("TASK_TYPE")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("MR_Graph +1"); - output.collect(key, record); - - } - } - catch (IOException e) - { - log.warn("Unable to collect output in JobLogHistoryProcessor [" - + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - - } - - protected void extractCounters(ChukwaRecord record, String input) - { - - String[] data = null; - String[] counters = input.split(","); - - for (String counter : counters) - { - data = counter.split(":"); - record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_") - .toUpperCase(), data[1]); - } - } - - public String getDataType() - { - return JobLogHistoryProcessor.recordType; - } +public class JobLogHistoryProcessor extends AbstractProcessor { + static Logger log = Logger.getLogger(JobLogHistoryProcessor.class); + + private static final String recordType = "JobLogHistory"; + private static String internalRegex = null; + private static Pattern ip = null; + + private Matcher internalMatcher = null; + + public JobLogHistoryProcessor() { + internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; + ip = Pattern.compile(internalRegex); + internalMatcher = ip.matcher("-"); + } + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + + // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type[" + // + chunk.getDataType() + "]"); + + try { + + HashMap keys = new HashMap(); + ChukwaRecord record = null; + + int firstSep = recordEntry.indexOf(" "); + keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep)); + // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE][" + // + keys.get("RECORD_TYPE") + "]"); + + String body = recordEntry.substring(firstSep); + + internalMatcher.reset(body); + + // String fieldName = null; + // String fieldValue = null; + + while (internalMatcher.matches()) { + + keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2) + .trim()); + + // TODO Remove debug info before production + // fieldName = internalMatcher.group(1).trim(); + // fieldValue = internalMatcher.group(2).trim(); + // log.info("JobLogHistoryProcessor Add field: [" + fieldName + + // "][" + fieldValue +"]" ); + // log.info("EOL : [" + internalMatcher.group(3) + "]" ); + internalMatcher.reset(internalMatcher.group(3)); + } + + if (!keys.containsKey("JOBID")) { + // Extract JobID from taskID + // JOBID = "job_200804210403_0005" + // TASKID = "tip_200804210403_0005_m_000018" + String jobId = keys.get("TASKID"); + int idx1 = jobId.indexOf('_', 0); + int idx2 = jobId.indexOf('_', idx1 + 1); + idx2 = jobId.indexOf('_', idx2 + 1); + keys.put("JOBID", jobId.substring(idx1 + 1, idx2)); + // log.info("JobLogHistoryProcessor Add field: [JOBID][" + // + keys.get("JOBID") + "]"); + } else { + String jobId = keys.get("JOBID").replace("_", "").substring(3); + keys.put("JOBID", jobId); + } + // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("SUBMIT_TIME")) + // { + // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB" + // USER="userxxx" + // // SUBMIT_TIME="1208760436751" + // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml" + // + // + // } + // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("LAUNCH_TIME")) + // { + // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110" + // TOTAL_MAPS="5912" TOTAL_REDUCES="739" + // + // } + // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("FINISH_TIME")) + // { + // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" + // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739" + // FAILED_MAPS="0" FAILED_REDUCES="0" + // // COUNTERS="File Systems.Local bytes read:1735053407244,File + // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes + // read:801605644910,File Systems.HDFS bytes written:44135800, + // // Job Counters .Launched map tasks:5912,Job Counters .Launched + // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job + // Counters .Rack-local map tasks:316,Map-Reduce Framework. + // // Map input records:9410696067,Map-Reduce Framework.Map output + // records:9410696067,Map-Reduce Framework.Map input + // bytes:801599188816,Map-Reduce Framework.Map output + // bytes:784427968116, + // // Map-Reduce Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0,Map-Reduce Framework.Reduce + // input groups:477265,Map-Reduce Framework.Reduce input + // records:739000, + // // Map-Reduce Framework.Reduce output records:739000" + // + // } + // else + if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") + && keys.containsKey("START_TIME")) { + // MapAttempt TASK_TYPE="MAP" + // TASKID="tip_200804210403_0005_m_000018" + // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0" + // START_TIME="1208760437531" + // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + + keys.get("START_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("START_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("START_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/Map/S"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") + && keys.containsKey("FINISH_TIME")) { + // MapAttempt TASK_TYPE="MAP" + // TASKID="tip_200804210403_0005_m_005494" + // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0" + // TASK_STATUS="SUCCESS" + // FINISH_TIME="1208760624124" + // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + + keys.get("FINISH_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("FINISH_TIME", keys.get("FINISH_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/Map/E"); + output.collect(key, record); + } + + else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") + && keys.containsKey("START_TIME")) { + // ReduceAttempt TASK_TYPE="REDUCE" + // TASKID="tip_200804210403_0005_r_000138" + // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" + // START_TIME="1208760454885" + // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + + keys.get("START_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("START_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("START_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SHUFFLE/S"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") + && keys.containsKey("FINISH_TIME")) { + // ReduceAttempt TASK_TYPE="REDUCE" + // TASKID="tip_200804210403_0005_r_000138" + // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" + // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167" + // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395" + // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + + keys.get("SHUFFLE_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SHUFFLE/E"); + output.collect(key, record); + + // SORT + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + + keys.get("SHUFFLE_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("SHUFFLE_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SORT/S"); + output.collect(key, record); + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + + keys.get("SORT_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("SORT_FINISHED", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SORT/E"); + output.collect(key, record); + + // Reduce + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + + keys.get("SORT_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/REDUCE/S"); + output.collect(key, record); + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + + keys.get("FINISH_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("FINISH_TIME", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/REDUCE/E"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) { + // 1 + // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx" + // SUBMIT_TIME="1208760906812" + // JOBCONF= + // "/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml" + + // 2 + // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816" + // TOTAL_MAPS="3" TOTAL_REDUCES="7" + + // 3 + // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826" + // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" + // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0" + // COUNTERS="File Systems.Local bytes read:1735053407244,File + // Systems.Local bytes written:2610106384012,File Systems.HDFS + // bytes read:801605644910,File Systems.HDFS bytes + // written:44135800, + // Job Counters .Launched map tasks:5912,Job Counters .Launched + // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job + // Counters .Rack-local map tasks:316,Map-Reduce Framework. + // Map input records:9410696067,Map-Reduce Framework.Map output + // records:9410696067,Map-Reduce Framework.Map input + // bytes:801599188816,Map-Reduce Framework.Map output + // bytes:784427968116, + // Map-Reduce Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0,Map-Reduce + // Framework.Reduce input groups:477265,Map-Reduce + // Framework.Reduce input records:739000, + // Map-Reduce Framework.Reduce output records:739000" + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, Long + .parseLong(keys.get("FINISH_TIME")), "MRJob"); + if (keys.containsKey("COUNTERS")) { + extractCounters(record, keys.get("COUNTERS")); + } + + key = new ChukwaRecordKey(); + key.setKey("MRJob/" + keys.get("JOBID")); + key.setReduceType("MRJobReduceProcessor"); + + record = new ChukwaRecord(); + record.add(Record.tagsField, chunk.getTags()); + if (keys.containsKey("SUBMIT_TIME")) { + record.setTime(Long.parseLong(keys.get("SUBMIT_TIME"))); + } else if (keys.containsKey("LAUNCH_TIME")) { + record.setTime(Long.parseLong(keys.get("LAUNCH_TIME"))); + } else if (keys.containsKey("FINISH_TIME")) { + record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); + } + + Iterator it = keys.keySet().iterator(); + while (it.hasNext()) { + String field = it.next(); + record.add(field, keys.get(field)); + } + + output.collect(key, record); + } + + if (keys.containsKey("TASK_TYPE") + && keys.containsKey("COUNTERS") + && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get( + "TASK_TYPE").equalsIgnoreCase("MAP"))) { + // MAP + // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP" + // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883" + // COUNTERS="File Systems.Local bytes read:159265655,File + // Systems.Local bytes written:318531310, + // File Systems.HDFS bytes read:145882417,Map-Reduce + // Framework.Map input records:1706604, + // Map-Reduce Framework.Map output records:1706604,Map-Reduce + // Framework.Map input bytes:145882057, + // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce + // Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0" + + // REDUCE + // Task TASKID="tip_200804210403_0005_r_000524" + // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS" + // FINISH_TIME="1208760877072" + // COUNTERS="File Systems.Local bytes read:1179319677,File + // Systems.Local bytes written:1184474889,File Systems.HDFS + // bytes written:59021, + // Map-Reduce Framework.Reduce input groups:684,Map-Reduce + // Framework.Reduce input records:1000,Map-Reduce + // Framework.Reduce output records:1000" + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, Long + .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime"); + extractCounters(record, keys.get("COUNTERS")); + record.add("JOBID", keys.get("JOBID")); + record.add("TASKID", keys.get("TASKID")); + record.add("TASK_TYPE", keys.get("TASK_TYPE")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("MR_Graph +1"); + output.collect(key, record); + + } + } catch (IOException e) { + log.warn("Unable to collect output in JobLogHistoryProcessor [" + + recordEntry + "]", e); + e.printStackTrace(); + throw e; + } + + } + + protected void extractCounters(ChukwaRecord record, String input) { + + String[] data = null; + String[] counters = input.split(","); + + for (String counter : counters) { + data = counter.split(":"); + record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_") + .toUpperCase(), data[1]); + } + } + + public String getDataType() { + return JobLogHistoryProcessor.recordType; + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,11 +18,11 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.util.Hashtable; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.chukwa.extraction.engine.Record; @@ -30,367 +30,359 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class Log4jJobHistoryProcessor extends AbstractProcessor -{ - static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class); - - private static final String recordType = "JobLogHistory"; - private static String internalRegex = null; - private static Pattern ip = null; - - private Matcher internalMatcher = null; - - public Log4jJobHistoryProcessor() - { - internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; - ip = Pattern.compile(internalRegex); - internalMatcher = ip.matcher("-"); - } - - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) - throws Throwable - { - -// log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type[" -// + chunk.getDataType() + "]"); - - try - { - - //String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - // String level = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - // String className = recordEntry.substring(start, idx-1); - String body = recordEntry.substring(idx + 1); - - Hashtable keys = new Hashtable(); - ChukwaRecord record = null; - - int firstSep = body.indexOf(" "); - keys.put("RECORD_TYPE", body.substring(0, firstSep)); -// log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE][" -// + keys.get("RECORD_TYPE") + "]"); - - body = body.substring(firstSep); - - internalMatcher.reset(body); - -// String fieldName = null; -// String fieldValue = null; - - while (internalMatcher.matches()) - { - - keys.put(internalMatcher.group(1).trim(), internalMatcher - .group(2).trim()); - - // TODO Remove debug info before production -// fieldName = internalMatcher.group(1).trim(); -// fieldValue = internalMatcher.group(2).trim(); -// log.info("JobLogHistoryProcessor Add field: [" + fieldName + -// "][" + fieldValue +"]" ); -// log.info("EOL : [" + internalMatcher.group(3) + "]" ); - internalMatcher.reset(internalMatcher.group(3)); - } - - if (!keys.containsKey("JOBID")) - { - // Extract JobID from taskID - // JOBID = "job_200804210403_0005" - // TASKID = "tip_200804210403_0005_m_000018" - String jobId = keys.get("TASKID"); - int idx1 = jobId.indexOf('_',0); - int idx2 = jobId.indexOf('_', idx1+1); - idx2 = jobId.indexOf('_', idx2+1); - keys.put("JOBID","job" + jobId.substring(idx1,idx2)); -// log.info("JobLogHistoryProcessor Add field: [JOBID][" -// + keys.get("JOBID") + "]"); - } - - // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("SUBMIT_TIME")) - // { - // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB" - // USER="userxxx" - // // SUBMIT_TIME="1208760436751" - // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml" - // - // - // } - // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("LAUNCH_TIME")) - // { - // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110" - // TOTAL_MAPS="5912" TOTAL_REDUCES="739" - // - // } - // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && - // keys.containsKey("FINISH_TIME")) - // { - // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" - // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739" - // FAILED_MAPS="0" FAILED_REDUCES="0" - // // COUNTERS="File Systems.Local bytes read:1735053407244,File - // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes - // read:801605644910,File Systems.HDFS bytes written:44135800, - // // Job Counters .Launched map tasks:5912,Job Counters .Launched - // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job - // Counters .Rack-local map tasks:316,Map-Reduce Framework. - // // Map input records:9410696067,Map-Reduce Framework.Map output - // records:9410696067,Map-Reduce Framework.Map input - // bytes:801599188816,Map-Reduce Framework.Map output - // bytes:784427968116, - // // Map-Reduce Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0,Map-Reduce Framework.Reduce - // input groups:477265,Map-Reduce Framework.Reduce input - // records:739000, - // // Map-Reduce Framework.Reduce output records:739000" - // - // } - // else - if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") - && keys.containsKey("START_TIME")) - { - // MapAttempt TASK_TYPE="MAP" - // TASKID="tip_200804210403_0005_m_000018" - // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0" - // START_TIME="1208760437531" - // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("START_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("START_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/Map/S"); - output.collect(key, record); - - } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") - && keys.containsKey("FINISH_TIME")) - { - // MapAttempt TASK_TYPE="MAP" - // TASKID="tip_200804210403_0005_m_005494" - // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0" - // TASK_STATUS="SUCCESS" - // FINISH_TIME="1208760624124" - // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("FINISH_TIME", keys.get("FINISH_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/Map/E"); - output.collect(key, record); - } - - else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") - && keys.containsKey("START_TIME")) - { - // ReduceAttempt TASK_TYPE="REDUCE" - // TASKID="tip_200804210403_0005_r_000138" - // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" - // START_TIME="1208760454885" - // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("START_TIME"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("START_TIME")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SHUFFLE/S"); - output.collect(key, record); - - } else if (keys.get("RECORD_TYPE") - .equalsIgnoreCase("ReduceAttempt") - && keys.containsKey("FINISH_TIME")) - { - // ReduceAttempt TASK_TYPE="REDUCE" - // TASKID="tip_200804210403_0005_r_000138" - // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" - // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167" - // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395" - // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SHUFFLE/E"); - output.collect(key, record); - - // SORT - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("SHUFFLE_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SORT/S"); - output.collect(key, record); - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("SORT_FINISHED", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/SORT/E"); - output.collect(key, record); - - // Reduce - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("START_TIME", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/REDUCE/S"); - output.collect(key, record); - - key = new ChukwaRecordKey(); - key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME")); - key.setReduceType("JobLogHistoryReduceProcessor"); - record = new ChukwaRecord(); - record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); - record.add("JOBID",keys.get("JOBID")); - record.add("FINISH_TIME", keys.get("SORT_FINISHED")); - record.add(Record.tagsField, chunk.getTags()); -// log.info("JobLogHist/REDUCE/E"); - output.collect(key, record); - - } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") - && keys.containsKey("COUNTERS")) - { - // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" - // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" - // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0" - // COUNTERS="File Systems.Local bytes read:1735053407244,File - // Systems.Local bytes written:2610106384012,File Systems.HDFS - // bytes read:801605644910,File Systems.HDFS bytes - // written:44135800, - // Job Counters .Launched map tasks:5912,Job Counters .Launched - // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job - // Counters .Rack-local map tasks:316,Map-Reduce Framework. - // Map input records:9410696067,Map-Reduce Framework.Map output - // records:9410696067,Map-Reduce Framework.Map input - // bytes:801599188816,Map-Reduce Framework.Map output - // bytes:784427968116, - // Map-Reduce Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0,Map-Reduce - // Framework.Reduce input groups:477265,Map-Reduce - // Framework.Reduce input records:739000, - // Map-Reduce Framework.Reduce output records:739000" - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJobCounters"); - extractCounters(record, keys.get("COUNTERS")); - - String jobId = keys.get("JOBID").replace("_", "").substring(3); - record.add("JobId", "" + jobId); - - // FIXME validate this when HodId will be available - if (keys.containsKey("HODID")) - { record.add("HodId", keys.get("HODID")); } - -// log.info("MRJobCounters +1"); - output.collect(key, record); - } - - if (keys.containsKey("TASK_TYPE") - && keys.containsKey("COUNTERS") - && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys - .get("TASK_TYPE").equalsIgnoreCase("MAP"))) - { - // MAP - // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP" - // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883" - // COUNTERS="File Systems.Local bytes read:159265655,File - // Systems.Local bytes written:318531310, - // File Systems.HDFS bytes read:145882417,Map-Reduce - // Framework.Map input records:1706604, - // Map-Reduce Framework.Map output records:1706604,Map-Reduce - // Framework.Map input bytes:145882057, - // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce - // Framework.Combine input records:0,Map-Reduce - // Framework.Combine output records:0" - - // REDUCE - // Task TASKID="tip_200804210403_0005_r_000524" - // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS" - // FINISH_TIME="1208760877072" - // COUNTERS="File Systems.Local bytes read:1179319677,File - // Systems.Local bytes written:1184474889,File Systems.HDFS - // bytes written:59021, - // Map-Reduce Framework.Reduce input groups:684,Map-Reduce - // Framework.Reduce input records:1000,Map-Reduce - // Framework.Reduce output records:1000" - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime"); - extractCounters(record, keys.get("COUNTERS")); - record.add("JOBID",keys.get("JOBID")); - record.add("TASKID", keys.get("TASKID")); - record.add("TASK_TYPE", keys.get("TASK_TYPE")); - -// log.info("MR_Graph +1"); - output.collect(key, record); - - } - } - catch (IOException e) - { - log.warn("Unable to collect output in JobLogHistoryProcessor [" - + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - - } - - protected void extractCounters(ChukwaRecord record, String input) - { - - String[] data = null; - String[] counters = input.split(","); - - for (String counter : counters) - { - data = counter.split(":"); - record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_") - .toUpperCase(), data[1]); - } - } - - public String getDataType() - { - return Log4jJobHistoryProcessor.recordType; - } +public class Log4jJobHistoryProcessor extends AbstractProcessor { + static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class); + + private static final String recordType = "JobLogHistory"; + private static String internalRegex = null; + private static Pattern ip = null; + + private Matcher internalMatcher = null; + + public Log4jJobHistoryProcessor() { + internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; + ip = Pattern.compile(internalRegex); + internalMatcher = ip.matcher("-"); + } + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + + // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type[" + // + chunk.getDataType() + "]"); + + try { + + // String dStr = recordEntry.substring(0, 23); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + // String level = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + // String className = recordEntry.substring(start, idx-1); + String body = recordEntry.substring(idx + 1); + + Hashtable keys = new Hashtable(); + ChukwaRecord record = null; + + int firstSep = body.indexOf(" "); + keys.put("RECORD_TYPE", body.substring(0, firstSep)); + // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE][" + // + keys.get("RECORD_TYPE") + "]"); + + body = body.substring(firstSep); + + internalMatcher.reset(body); + + // String fieldName = null; + // String fieldValue = null; + + while (internalMatcher.matches()) { + + keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2) + .trim()); + + // TODO Remove debug info before production + // fieldName = internalMatcher.group(1).trim(); + // fieldValue = internalMatcher.group(2).trim(); + // log.info("JobLogHistoryProcessor Add field: [" + fieldName + + // "][" + fieldValue +"]" ); + // log.info("EOL : [" + internalMatcher.group(3) + "]" ); + internalMatcher.reset(internalMatcher.group(3)); + } + + if (!keys.containsKey("JOBID")) { + // Extract JobID from taskID + // JOBID = "job_200804210403_0005" + // TASKID = "tip_200804210403_0005_m_000018" + String jobId = keys.get("TASKID"); + int idx1 = jobId.indexOf('_', 0); + int idx2 = jobId.indexOf('_', idx1 + 1); + idx2 = jobId.indexOf('_', idx2 + 1); + keys.put("JOBID", "job" + jobId.substring(idx1, idx2)); + // log.info("JobLogHistoryProcessor Add field: [JOBID][" + // + keys.get("JOBID") + "]"); + } + + // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("SUBMIT_TIME")) + // { + // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB" + // USER="userxxx" + // // SUBMIT_TIME="1208760436751" + // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml" + // + // + // } + // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("LAUNCH_TIME")) + // { + // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110" + // TOTAL_MAPS="5912" TOTAL_REDUCES="739" + // + // } + // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") && + // keys.containsKey("FINISH_TIME")) + // { + // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" + // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739" + // FAILED_MAPS="0" FAILED_REDUCES="0" + // // COUNTERS="File Systems.Local bytes read:1735053407244,File + // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes + // read:801605644910,File Systems.HDFS bytes written:44135800, + // // Job Counters .Launched map tasks:5912,Job Counters .Launched + // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job + // Counters .Rack-local map tasks:316,Map-Reduce Framework. + // // Map input records:9410696067,Map-Reduce Framework.Map output + // records:9410696067,Map-Reduce Framework.Map input + // bytes:801599188816,Map-Reduce Framework.Map output + // bytes:784427968116, + // // Map-Reduce Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0,Map-Reduce Framework.Reduce + // input groups:477265,Map-Reduce Framework.Reduce input + // records:739000, + // // Map-Reduce Framework.Reduce output records:739000" + // + // } + // else + if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") + && keys.containsKey("START_TIME")) { + // MapAttempt TASK_TYPE="MAP" + // TASKID="tip_200804210403_0005_m_000018" + // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0" + // START_TIME="1208760437531" + // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + + keys.get("START_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("START_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("START_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/Map/S"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt") + && keys.containsKey("FINISH_TIME")) { + // MapAttempt TASK_TYPE="MAP" + // TASKID="tip_200804210403_0005_m_005494" + // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0" + // TASK_STATUS="SUCCESS" + // FINISH_TIME="1208760624124" + // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + + keys.get("FINISH_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("FINISH_TIME", keys.get("FINISH_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/Map/E"); + output.collect(key, record); + } + + else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") + && keys.containsKey("START_TIME")) { + // ReduceAttempt TASK_TYPE="REDUCE" + // TASKID="tip_200804210403_0005_r_000138" + // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" + // START_TIME="1208760454885" + // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + + keys.get("START_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("START_TIME"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("START_TIME")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SHUFFLE/S"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt") + && keys.containsKey("FINISH_TIME")) { + // ReduceAttempt TASK_TYPE="REDUCE" + // TASKID="tip_200804210403_0005_r_000138" + // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0" + // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167" + // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395" + // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947" + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + + keys.get("SHUFFLE_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SHUFFLE/E"); + output.collect(key, record); + + // SORT + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + + keys.get("SHUFFLE_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("SHUFFLE_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SORT/S"); + output.collect(key, record); + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + + keys.get("SORT_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("SORT_FINISHED", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/SORT/E"); + output.collect(key, record); + + // Reduce + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + + keys.get("SORT_FINISHED")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("START_TIME", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/REDUCE/S"); + output.collect(key, record); + + key = new ChukwaRecordKey(); + key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + + keys.get("FINISH_TIME")); + key.setReduceType("JobLogHistoryReduceProcessor"); + record = new ChukwaRecord(); + record.setTime(Long.parseLong(keys.get("SORT_FINISHED"))); + record.add("JOBID", keys.get("JOBID")); + record.add("FINISH_TIME", keys.get("SORT_FINISHED")); + record.add(Record.tagsField, chunk.getTags()); + // log.info("JobLogHist/REDUCE/E"); + output.collect(key, record); + + } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") + && keys.containsKey("COUNTERS")) { + // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816" + // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" + // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0" + // COUNTERS="File Systems.Local bytes read:1735053407244,File + // Systems.Local bytes written:2610106384012,File Systems.HDFS + // bytes read:801605644910,File Systems.HDFS bytes + // written:44135800, + // Job Counters .Launched map tasks:5912,Job Counters .Launched + // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job + // Counters .Rack-local map tasks:316,Map-Reduce Framework. + // Map input records:9410696067,Map-Reduce Framework.Map output + // records:9410696067,Map-Reduce Framework.Map input + // bytes:801599188816,Map-Reduce Framework.Map output + // bytes:784427968116, + // Map-Reduce Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0,Map-Reduce + // Framework.Reduce input groups:477265,Map-Reduce + // Framework.Reduce input records:739000, + // Map-Reduce Framework.Reduce output records:739000" + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, Long + .parseLong(keys.get("FINISH_TIME")), "MRJobCounters"); + extractCounters(record, keys.get("COUNTERS")); + + String jobId = keys.get("JOBID").replace("_", "").substring(3); + record.add("JobId", "" + jobId); + + // FIXME validate this when HodId will be available + if (keys.containsKey("HODID")) { + record.add("HodId", keys.get("HODID")); + } + + // log.info("MRJobCounters +1"); + output.collect(key, record); + } + + if (keys.containsKey("TASK_TYPE") + && keys.containsKey("COUNTERS") + && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get( + "TASK_TYPE").equalsIgnoreCase("MAP"))) { + // MAP + // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP" + // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883" + // COUNTERS="File Systems.Local bytes read:159265655,File + // Systems.Local bytes written:318531310, + // File Systems.HDFS bytes read:145882417,Map-Reduce + // Framework.Map input records:1706604, + // Map-Reduce Framework.Map output records:1706604,Map-Reduce + // Framework.Map input bytes:145882057, + // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce + // Framework.Combine input records:0,Map-Reduce + // Framework.Combine output records:0" + + // REDUCE + // Task TASKID="tip_200804210403_0005_r_000524" + // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS" + // FINISH_TIME="1208760877072" + // COUNTERS="File Systems.Local bytes read:1179319677,File + // Systems.Local bytes written:1184474889,File Systems.HDFS + // bytes written:59021, + // Map-Reduce Framework.Reduce input groups:684,Map-Reduce + // Framework.Reduce input records:1000,Map-Reduce + // Framework.Reduce output records:1000" + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, Long + .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime"); + extractCounters(record, keys.get("COUNTERS")); + record.add("JOBID", keys.get("JOBID")); + record.add("TASKID", keys.get("TASKID")); + record.add("TASK_TYPE", keys.get("TASK_TYPE")); + + // log.info("MR_Graph +1"); + output.collect(key, record); + + } + } catch (IOException e) { + log.warn("Unable to collect output in JobLogHistoryProcessor [" + + recordEntry + "]", e); + e.printStackTrace(); + throw e; + } + + } + + protected void extractCounters(ChukwaRecord record, String input) { + + String[] data = null; + String[] counters = input.split(","); + + for (String counter : counters) { + data = counter.split(":"); + record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_") + .toUpperCase(), data[1]); + } + } + + public String getDataType() { + return Log4jJobHistoryProcessor.recordType; + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,6 +18,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; @@ -25,7 +26,7 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -public interface MapProcessor -{ - public void process(ChukwaArchiveKey archiveKey,Chunk chunk,OutputCollector output, Reporter reporter); +public interface MapProcessor { + public void process(ChukwaArchiveKey archiveKey, Chunk chunk, + OutputCollector output, Reporter reporter); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java Wed Mar 11 22:39:26 2009 @@ -18,49 +18,38 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.util.HashMap; +import java.util.HashMap; import org.apache.log4j.Logger; +public class MapProcessorFactory { + static Logger log = Logger.getLogger(MapProcessorFactory.class); + + private static HashMap processors = new HashMap(); // registry + private MapProcessorFactory() { + } -public class MapProcessorFactory -{ - static Logger log = Logger.getLogger(MapProcessorFactory.class); - - private static HashMap processors = - new HashMap(); // registry - - private MapProcessorFactory() - {} - - public static MapProcessor getProcessor(String parserClass) - throws UnknownRecordTypeException - { - if (processors.containsKey(parserClass)) - { - return processors.get(parserClass); - } - else - { - MapProcessor processor = null; - try - { - processor = (MapProcessor)Class.forName(parserClass).getConstructor().newInstance(); - } - catch(ClassNotFoundException e) - { - throw new UnknownRecordTypeException("Unknown parserClass:" + parserClass, e); - } - catch(Exception e) - { - throw new UnknownRecordTypeException("error constructing processor", e); - } - - //TODO using a ThreadSafe/reuse flag to actually decide if we want - // to reuse the same processor again and again - processors.put(parserClass,processor); - return processor; - } - } + public static MapProcessor getProcessor(String parserClass) + throws UnknownRecordTypeException { + if (processors.containsKey(parserClass)) { + return processors.get(parserClass); + } else { + MapProcessor processor = null; + try { + processor = (MapProcessor) Class.forName(parserClass).getConstructor() + .newInstance(); + } catch (ClassNotFoundException e) { + throw new UnknownRecordTypeException("Unknown parserClass:" + + parserClass, e); + } catch (Exception e) { + throw new UnknownRecordTypeException("error constructing processor", e); + } + + // TODO using a ThreadSafe/reuse flag to actually decide if we want + // to reuse the same processor again and again + processors.put(parserClass, processor); + return processor; + } + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java Wed Mar 11 22:39:26 2009 @@ -18,30 +18,27 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -public class PbsInvalidEntry extends Exception -{ - /** +public class PbsInvalidEntry extends Exception { + + /** * */ - private static final long serialVersionUID = 9154096600390233023L; + private static final long serialVersionUID = 9154096600390233023L; - public PbsInvalidEntry() - {} + public PbsInvalidEntry() { + } - public PbsInvalidEntry(String message) - { - super(message); - } - - public PbsInvalidEntry(Throwable cause) - { - super(cause); - } - - public PbsInvalidEntry(String message, Throwable cause) - { - super(message, cause); - } + public PbsInvalidEntry(String message) { + super(message); + } + + public PbsInvalidEntry(Throwable cause) { + super(cause); + } + + public PbsInvalidEntry(String message, Throwable cause) { + super(message, cause); + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java Wed Mar 11 22:39:26 2009 @@ -18,215 +18,181 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class PbsNodes extends AbstractProcessor -{ - static Logger log = Logger.getLogger(PbsNodes.class); - - private static final String rawPBSRecordType = "PbsNodes"; - private static final String machinePBSRecordType = "MachinePbsNodes"; - private SimpleDateFormat sdf = null; - - public PbsNodes() - { - //TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - } - - @Override - protected void parse(String recordEntry, OutputCollector output, - Reporter reporter) - throws Throwable - { - -// log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]"); - - - - StringBuilder sb = new StringBuilder(); - int i = 0; - String nodeActivityStatus = null; - StringBuilder sbFreeMachines = new StringBuilder(); - StringBuilder sbUsedMachines = new StringBuilder(); - StringBuilder sbDownMachines = new StringBuilder(); - - int totalFreeNode = 0; - int totalUsedNode = 0; - int totalDownNode = 0; - - String body = null; - ChukwaRecord record = null; - - - try - { - - String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - //String level = recordEntry.substring(start, idx); - start = idx+1; - idx = recordEntry.indexOf(' ', start ); - //String className = recordEntry.substring(start, idx-1); - body = recordEntry.substring(idx+1); - - - Date d = sdf.parse(dStr ); - - String[] lines = body.split("\n"); - while (i < lines.length) - { - while ((i < lines.length) && (lines[i].trim().length() > 0)) - { - sb.append(lines[i].trim()).append("\n"); - i++; - } - - if ( (i< lines.length) && (lines[i].trim().length() > 0) ) - { - throw new PbsInvalidEntry(recordEntry); - } - - // Empty line - i++; - - if (sb.length() > 0) - { - body = sb.toString(); - // Process all entries for a machine - //System.out.println("=========>>> Record [" + body+ "]"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - - buildGenericRecord(record,null,d.getTime(),machinePBSRecordType); - parsePbsRecord(body, record); - - //Output PbsNode record for 1 machine - output.collect(key, record); - //log.info("PbsNodeProcessor output 1 sub-record"); - - //compute Node Activity information - nodeActivityStatus = record.getValue("state"); - if (nodeActivityStatus != null) - { - if (nodeActivityStatus.equals("free")) - { - totalFreeNode ++; - sbFreeMachines.append(record.getValue("Machine")).append(","); - } - else if (nodeActivityStatus.equals("job-exclusive")) - { - totalUsedNode ++; - sbUsedMachines.append(record.getValue("Machine")).append(","); - } - else - { - totalDownNode ++; - sbDownMachines.append(record.getValue("Machine")).append(","); - } - } - sb = new StringBuilder(); - } - } - - // End of parsing - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record,null,d.getTime(),"NodeActivity"); - - record.setTime(d.getTime()); - record.add("used", ""+totalUsedNode); - record.add("free", ""+totalFreeNode); - record.add("down", ""+totalDownNode); - record.add("usedMachines", sbUsedMachines.toString()); - record.add("freeMachines", sbFreeMachines.toString()); - record.add("downMachines", sbDownMachines.toString()); - - output.collect(key,record); - //log.info("PbsNodeProcessor output 1 NodeActivity"); - } - catch (ParseException e) - { - e.printStackTrace(); - log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); - throw e; - } - catch (IOException e) - { - log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - catch (PbsInvalidEntry e) - { - log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - - - - - } - - protected static void parsePbsRecord(String recordLine, ChukwaRecord record) - { - int i = 0; - String[] lines = recordLine.split("\n"); - record.add("Machine", lines[0]); - - i++; - String[] data = null; - while (i < lines.length) - { - data = extractFields(lines[i]); - record.add(data[0].trim(), data[1].trim()); - if (data[0].trim().equalsIgnoreCase("status")) - { - parseStatusField(data[1].trim(), record); - } - i++; - } - } - - protected static void parseStatusField(String statusField, - ChukwaRecord record) - { - String[] data = null; - String[] subFields = statusField.trim().split(","); - for (String subflied : subFields) - { - data = extractFields(subflied); - record.add("status-"+data[0].trim(), data[1].trim()); - } - } - - - static String[] extractFields(String line) - { - String[] args = new String[2]; - int index = line.indexOf("="); - args[0] = line.substring(0,index ); - args[1] = line.substring(index + 1); - - return args; - } - - public String getDataType() - { - return PbsNodes.rawPBSRecordType; - } - +public class PbsNodes extends AbstractProcessor { + static Logger log = Logger.getLogger(PbsNodes.class); + + private static final String rawPBSRecordType = "PbsNodes"; + private static final String machinePBSRecordType = "MachinePbsNodes"; + private SimpleDateFormat sdf = null; + + public PbsNodes() { + // TODO move that to config + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + } + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + + // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + + // chunk.getDataType() + "]"); + + StringBuilder sb = new StringBuilder(); + int i = 0; + String nodeActivityStatus = null; + StringBuilder sbFreeMachines = new StringBuilder(); + StringBuilder sbUsedMachines = new StringBuilder(); + StringBuilder sbDownMachines = new StringBuilder(); + + int totalFreeNode = 0; + int totalUsedNode = 0; + int totalDownNode = 0; + + String body = null; + ChukwaRecord record = null; + + try { + + String dStr = recordEntry.substring(0, 23); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + // String level = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + // String className = recordEntry.substring(start, idx-1); + body = recordEntry.substring(idx + 1); + + Date d = sdf.parse(dStr); + + String[] lines = body.split("\n"); + while (i < lines.length) { + while ((i < lines.length) && (lines[i].trim().length() > 0)) { + sb.append(lines[i].trim()).append("\n"); + i++; + } + + if ((i < lines.length) && (lines[i].trim().length() > 0)) { + throw new PbsInvalidEntry(recordEntry); + } + + // Empty line + i++; + + if (sb.length() > 0) { + body = sb.toString(); + // Process all entries for a machine + // System.out.println("=========>>> Record [" + body+ "]"); + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + + buildGenericRecord(record, null, d.getTime(), machinePBSRecordType); + parsePbsRecord(body, record); + + // Output PbsNode record for 1 machine + output.collect(key, record); + // log.info("PbsNodeProcessor output 1 sub-record"); + + // compute Node Activity information + nodeActivityStatus = record.getValue("state"); + if (nodeActivityStatus != null) { + if (nodeActivityStatus.equals("free")) { + totalFreeNode++; + sbFreeMachines.append(record.getValue("Machine")).append(","); + } else if (nodeActivityStatus.equals("job-exclusive")) { + totalUsedNode++; + sbUsedMachines.append(record.getValue("Machine")).append(","); + } else { + totalDownNode++; + sbDownMachines.append(record.getValue("Machine")).append(","); + } + } + sb = new StringBuilder(); + } + } + + // End of parsing + + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, d.getTime(), "NodeActivity"); + + record.setTime(d.getTime()); + record.add("used", "" + totalUsedNode); + record.add("free", "" + totalFreeNode); + record.add("down", "" + totalDownNode); + record.add("usedMachines", sbUsedMachines.toString()); + record.add("freeMachines", sbFreeMachines.toString()); + record.add("downMachines", sbDownMachines.toString()); + + output.collect(key, record); + // log.info("PbsNodeProcessor output 1 NodeActivity"); + } catch (ParseException e) { + e.printStackTrace(); + log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); + throw e; + } catch (IOException e) { + log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + + "]", e); + e.printStackTrace(); + throw e; + } catch (PbsInvalidEntry e) { + log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); + e.printStackTrace(); + throw e; + } + + } + + protected static void parsePbsRecord(String recordLine, ChukwaRecord record) { + int i = 0; + String[] lines = recordLine.split("\n"); + record.add("Machine", lines[0]); + + i++; + String[] data = null; + while (i < lines.length) { + data = extractFields(lines[i]); + record.add(data[0].trim(), data[1].trim()); + if (data[0].trim().equalsIgnoreCase("status")) { + parseStatusField(data[1].trim(), record); + } + i++; + } + } + + protected static void parseStatusField(String statusField, ChukwaRecord record) { + String[] data = null; + String[] subFields = statusField.trim().split(","); + for (String subflied : subFields) { + data = extractFields(subflied); + record.add("status-" + data[0].trim(), data[1].trim()); + } + } + + static String[] extractFields(String line) { + String[] args = new String[2]; + int index = line.indexOf("="); + args[0] = line.substring(0, index); + args[1] = line.substring(index + 1); + + return args; + } + + public String getDataType() { + return PbsNodes.rawPBSRecordType; + } + } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java Wed Mar 11 22:39:26 2009 @@ -18,65 +18,63 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.util.HashMap; +import java.util.HashMap; import org.apache.log4j.Logger; +public class ProcessorFactory { + static Logger log = Logger.getLogger(ProcessorFactory.class); - -public class ProcessorFactory -{ - static Logger log = Logger.getLogger(ProcessorFactory.class); - - // TODO - // add new mapper package at the end. - // We should have a more generic way to do this. - // Ex: read from config - // list of alias - // and - // alias -> processor class - - - private static HashMap processors = - new HashMap(); // registry - - private ProcessorFactory() - {} - - public static ChunkProcessor getProcessor(String recordType) - throws UnknownRecordTypeException - { - String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"+recordType; - if (processors.containsKey(recordType)) { - return processors.get(recordType); - } else { - ChunkProcessor processor = null; - try { - processor = (ChunkProcessor)Class.forName(path).getConstructor().newInstance(); - } catch(ClassNotFoundException e) { - throw new UnknownRecordTypeException("Unknown recordType:" + recordType, e); - } catch(Exception e) { - throw new UnknownRecordTypeException("error constructing processor", e); - } - - //TODO using a ThreadSafe/reuse flag to actually decide if we want - // to reuse the same processor again and again - register(recordType,processor); - return processor; - } - } - - /** Register a specific parser for a {@link ChunkProcessor} - * implementation. */ - public static synchronized void register(String recordType, - ChunkProcessor processor) - { - log.info("register " + processor.getClass().getName() + " for this recordType :" + recordType); - if (processors.containsKey(recordType)) - { - throw new DuplicateProcessorException("Duplicate processor for recordType:" + recordType); - } - ProcessorFactory.processors.put(recordType, processor); - } + // TODO + // add new mapper package at the end. + // We should have a more generic way to do this. + // Ex: read from config + // list of alias + // and + // alias -> processor class + + private static HashMap processors = new HashMap(); // registry + + private ProcessorFactory() { + } + + public static ChunkProcessor getProcessor(String recordType) + throws UnknownRecordTypeException { + String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper" + + recordType; + if (processors.containsKey(recordType)) { + return processors.get(recordType); + } else { + ChunkProcessor processor = null; + try { + processor = (ChunkProcessor) Class.forName(path).getConstructor() + .newInstance(); + } catch (ClassNotFoundException e) { + throw new UnknownRecordTypeException( + "Unknown recordType:" + recordType, e); + } catch (Exception e) { + throw new UnknownRecordTypeException("error constructing processor", e); + } + + // TODO using a ThreadSafe/reuse flag to actually decide if we want + // to reuse the same processor again and again + register(recordType, processor); + return processor; + } + } + + /** + * Register a specific parser for a {@link ChunkProcessor} implementation. + */ + public static synchronized void register(String recordType, + ChunkProcessor processor) { + log.info("register " + processor.getClass().getName() + + " for this recordType :" + recordType); + if (processors.containsKey(recordType)) { + throw new DuplicateProcessorException( + "Duplicate processor for recordType:" + recordType); + } + ProcessorFactory.processors.put(recordType, processor); + } }