Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 851 invoked from network); 18 Jul 2006 12:08:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 18 Jul 2006 12:08:21 -0000 Received: (qmail 2801 invoked by uid 500); 18 Jul 2006 12:08:21 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 2742 invoked by uid 500); 18 Jul 2006 12:08:21 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 2733 invoked by uid 99); 18 Jul 2006 12:08:21 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Jul 2006 05:08:21 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Jul 2006 05:08:20 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 28A711A981A; Tue, 18 Jul 2006 05:08:00 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r423062 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/tools/ src/java/org/apache/hadoop/tools/Logalyzer.java Date: Tue, 18 Jul 2006 12:07:59 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20060718120800.28A711A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Tue Jul 18 05:07:59 2006 New Revision: 423062 URL: http://svn.apache.org/viewvc?rev=423062&view=rev Log: HADOOP-242. Initial version of Logalyzer. Contributed by Arun. Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Modified: lucene/hadoop/trunk/CHANGES.txt Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=423062&r1=423061&r2=423062&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 18 05:07:59 2006 @@ -47,6 +47,9 @@ lots of small jobs, in order to determine per-task overheads. (Sanjay Dahiya via cutting) +14. HADOOP-342. Add a tool for log analysis: Logalyzer. + (Arun C Murthy via cutting) + Release 0.4.0 - 2006-06-28 Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java?rev=423062&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/tools/Logalyzer.java Tue Jul 18 05:07:59 2006 @@ -0,0 +1,308 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import java.io.*; + +import java.util.Random; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.CopyFiles; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.lib.LongSumReducer; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.Writable; + +import java.util.regex.Pattern; +import java.util.regex.Matcher; + +/** + * Logalyzer: A utility tool for archiving and analyzing hadoop logs. + *

+ * This tool supports archiving and anaylzing (sort/grep) of log-files. + * It takes as input + * a) Input uri which will serve uris of the logs to be archived. + * b) Output directory (not mandatory). + * b) Directory on dfs to archive the logs. + * c) The sort/grep patterns for analyzing the files and separator for boundaries. + * Usage: + * Logalyzer -archive -archiveDir -analysis -logs -grep -sort -separator + *

+ * + * @author Arun C Murthy + */ + +public class Logalyzer { + // Constants + private static Configuration fsConfig = new Configuration(); + + /** A {@link Mapper} that extracts text matching a regular expression. */ + public static class LogRegexMapper extends MapReduceBase implements Mapper { + + private Pattern pattern; + + public void configure(JobConf job) { + pattern = Pattern.compile(job.get("mapred.mapper.regex")); + } + + public void map(WritableComparable key, Writable value, + OutputCollector output, Reporter reporter) + throws IOException { + String text = ((UTF8)value).toString(); + Matcher matcher = pattern.matcher(text); + while (matcher.find()) { + output.collect((UTF8)value, new LongWritable(1)); + } + } + + } + + /** A WritableComparator optimized for UTF8 keys of the logs. */ + public static class LogComparator extends UTF8.Comparator implements Configurable { + + private static Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer"); + private JobConf conf = null; + private String[] sortSpec = null; + private String columnSeparator = null; + + public void setConf(Configuration conf) { + if (conf instanceof JobConf) { + this.conf = (JobConf) conf; + } else { + this.conf = new JobConf(conf); + } + + //Initialize the specification for *comparision* + String sortColumns = this.conf.get("mapred.reducer.sort", null); + if(sortColumns != null) { + sortSpec = sortColumns.split(","); + } + + //Column-separator + columnSeparator = this.conf.get("mapred.reducer.separator", ""); + } + + public Configuration getConf() { + return conf; + } + + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + + if(sortSpec == null) { + return super.compare(b1, s1, l1, b2, s2, l2); + } + + try { + UTF8 logline1 = new UTF8(); + logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + String line1 = logline1.toString(); + String[] logColumns1 = line1.split(columnSeparator); + + UTF8 logline2 = new UTF8(); + logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + String line2 = logline2.toString(); + String[] logColumns2 = line2.split(columnSeparator); + + if(logColumns1 == null || logColumns2 == null) { + return super.compare(b1, s1, l1, b2, s2, l2); + } + + //Compare column-wise according to *sortSpec* + for(int i=0; i < sortSpec.length; ++i) { + int column = (Integer.valueOf(sortSpec[i]).intValue()); + String c1 = logColumns1[column]; + String c2 = logColumns2[column]; + + //Compare columns + int comparision = super.compareBytes( + c1.getBytes(), 0, c1.length(), + c2.getBytes(), 0, c2.length() + ); + + //They differ! + if(comparision != 0) { + return comparision; + } + } + + } catch (IOException ioe) { + LOG.fatal("Caught " + ioe); + return 0; + } + + return 0; + } + + static { + // register this comparator + WritableComparator.define(UTF8.class, new LogComparator()); + } + } + + /** + * doArchive: Workhorse function to archive log-files. + * @param logListURI : The uri which will serve list of log-files to archive. + * @param archiveDirectory : The directory to store archived logfiles. + * @throws IOException + */ + public void + doArchive(String logListURI, String archiveDirectory) + throws IOException + { + String destURL = new String("dfs://" + fsConfig.get("fs.default.name", "local") + + archiveDirectory); + CopyFiles.copy(fsConfig, logListURI, destURL, true, false); + } + + /** + * doAnalyze: + * @param inputFilesDirectory : Directory containing the files to be analyzed. + * @param outputDirectory : Directory to store analysis (output). + * @param grepPattern : Pattern to *grep* for. + * @param sortColumns : Sort specification for output. + * @param columnSeparator : Column separator. + * @throws IOException + */ + public void + doAnalyze(String inputFilesDirectory, String outputDirectory, + String grepPattern, String sortColumns, String columnSeparator) + throws IOException + { + Path grepInput = new Path(inputFilesDirectory); + + Path analysisOutput = null; + if(outputDirectory.equals("")) { + analysisOutput = new Path(inputFilesDirectory, "logalyzer_" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + } else { + analysisOutput = new Path(outputDirectory); + } + + JobConf grepJob = new JobConf(fsConfig); + grepJob.setJobName("logalyzer-grep-sort"); + + grepJob.setInputPath(grepInput); + grepJob.setInputFormat(TextInputFormat.class); + grepJob.setInputKeyClass(LongWritable.class); + grepJob.setInputValueClass(UTF8.class); + + grepJob.setMapperClass(LogRegexMapper.class); + grepJob.set("mapred.mapper.regex", grepPattern); + grepJob.set("mapred.reducer.sort", sortColumns); + grepJob.set("mapred.reducer.separator", columnSeparator); + + grepJob.setCombinerClass(LongSumReducer.class); + grepJob.setReducerClass(LongSumReducer.class); + + grepJob.setOutputPath(analysisOutput); + grepJob.setOutputFormat(TextOutputFormat.class); + grepJob.setOutputKeyClass(UTF8.class); + grepJob.setOutputValueClass(LongWritable.class); + grepJob.setOutputKeyComparatorClass(LogComparator.class); + + grepJob.setNumReduceTasks(1); // write a single file + + JobClient.runJob(grepJob); + } + + public static void main(String[] args) { + + Log LOG = LogFactory.getLog("org.apache.hadoop.tools.Logalyzer"); + + String version = "Logalyzer.0.0.1"; + String usage = "Usage: Logalyzer [-archive -logs ] " + + "-archiveDir " + + "-grep -sort -separator " + + "-analysis "; + + System.out.println(version); + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + + //Command line arguments + boolean archive = false; + boolean grep = false; + boolean sort = false; + + String archiveDir = ""; + String logListURI = ""; + String grepPattern = ".*"; + String sortColumns = ""; + String columnSeparator = " "; + String outputDirectory = ""; + + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-archive")) { + archive = true; + } else if (args[i].equals("-archiveDir")) { + archiveDir = args[++i]; + } else if (args[i].equals("-grep")) { + grep = true; + grepPattern = args[++i]; + } else if (args[i].equals("-logs")) { + logListURI = args[++i]; + } else if (args[i].equals("-sort")) { + sort = true; + sortColumns = args[++i]; + } else if (args[i].equals("-separator")) { + columnSeparator = args[++i]; + } else if (args[i].equals("-analysis")) { + outputDirectory = args[++i]; + } + } + + LOG.info("analysisDir = " + outputDirectory); + LOG.info("archiveDir = " + archiveDir); + LOG.info("logListURI = " + logListURI); + LOG.info("grepPattern = " + grepPattern); + LOG.info("sortColumns = " + sortColumns); + LOG.info("separator = " + columnSeparator); + + try { + Logalyzer logalyzer = new Logalyzer(); + + // Archive? + if (archive) { + logalyzer.doArchive(logListURI, archiveDir); + } + + // Analyze? + if (grep || sort) { + logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator); + } + } catch (IOException ioe) { + ioe.printStackTrace(); + System.exit(-1); + } + + } //main + +} //class Logalyzer