Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 4197 invoked from network); 11 Mar 2009 22:40:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Mar 2009 22:40:48 -0000 Received: (qmail 21619 invoked by uid 500); 11 Mar 2009 22:40:48 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 21608 invoked by uid 500); 11 Mar 2009 22:40:48 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 21572 invoked by uid 99); 11 Mar 2009 22:40:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2009 15:40:48 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2009 22:40:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A8F022388D22; Wed, 11 Mar 2009 22:39:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r752666 [7/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop... Date: Wed, 11 Mar 2009 22:39:32 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090311223943.A8F022388D22@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Wed Mar 11 22:39:26 2009 @@ -18,11 +18,11 @@ package org.apache.hadoop.chukwa.extraction.demux; + import java.io.IOException; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Calendar; - import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -34,251 +34,237 @@ // First version of the Spill // need some polishing -public class MoveToRepository -{ - static Logger log = Logger.getLogger(MoveToRepository.class); - - static ChukwaConfiguration conf = null; - static FileSystem fs = null; - static final String HadoopLogDir = "_logs"; - static final String hadoopTempDir = "_temporary"; - static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd"); - static Calendar calendar = Calendar.getInstance(); - - static void processClutserDirectory(Path srcDir,String destDir) throws Exception - { - log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir +")"); - FileStatus fstat = fs.getFileStatus(srcDir); - - if (!fstat.isDir()) - { - throw new IOException(srcDir + " is not a directory!"); - } - else - { - FileStatus[] datasourceDirectories = fs.listStatus(srcDir); - - for(FileStatus datasourceDirectory : datasourceDirectories) - { - log.info(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir()); - if (!datasourceDirectory.isDir()) - { - throw new IOException("Top level datasource directory should be a directory :" + datasourceDirectory.getPath()); - } - - String dirName = datasourceDirectory.getPath().getName(); - Path destPath = new Path(destDir + "/" + dirName); - log.info("dest directory path: " + destPath); - log.info("processClutserDirectory processing Datasource: (" + dirName +")"); - processDatasourceDirectory(srcDir.getName(),datasourceDirectory.getPath(),destDir + "/" + dirName); - } - } - } - - static void processDatasourceDirectory(String cluster,Path srcDir,String destDir) throws Exception - { - String fileName = null; - int fileDay = 0; - int fileHour = 0; - int fileMin = 0; - - FileStatus[] recordFiles = fs.listStatus(srcDir); - for(FileStatus recordFile : recordFiles) - { - // dataSource_20080915_18_15.1.evt - // _.1.evt - - fileName = recordFile.getPath().getName(); - log.info("processDatasourceDirectory processing RecordFile: (" + fileName +")"); - log.info("fileName: " + fileName); - - - int l = fileName.length(); - String dataSource = srcDir.getName(); - log.info("Datasource: " + dataSource); - - if (fileName.endsWith(".D.evt")) - { - // Hadoop_dfs_datanode_20080919.D.evt - - fileDay = Integer.parseInt(fileName.substring(l-14,l-6)); - writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),dataSource + "_" +fileDay); - // mark this directory for Daily rotate (re-process) - addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource); - } - else if (fileName.endsWith(".H.evt")) - { - // Hadoop_dfs_datanode_20080925_1.H.evt - // Hadoop_dfs_datanode_20080925_12.H.evt - - String day = null; - String hour = null; - if (fileName.charAt(l-8) == '_') - { - day = fileName.substring(l-16,l-8); - log.info("day->" + day); - hour = "" +fileName.charAt(l-7); - log.info("hour->" +hour); - } - else - { - day = fileName.substring(l-17,l-9); - log.info("day->" +day); - hour = fileName.substring(l-8,l-6); - log.info("hour->" +hour); - } - fileDay = Integer.parseInt(day); - fileHour = Integer.parseInt(hour); - // rotate there so spill - writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/", recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour ); - // mark this directory for daily rotate - addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource); - } - else if (fileName.endsWith(".R.evt")) - { - if (fileName.charAt(l-11) == '_') - { - fileDay = Integer.parseInt(fileName.substring(l-19,l-11)); - fileHour = Integer.parseInt(""+fileName.charAt(l-10)); - fileMin = Integer.parseInt(fileName.substring(l-8,l-6)); - } - else - { - fileDay = Integer.parseInt(fileName.substring(l-20,l-12)); - fileHour = Integer.parseInt(fileName.substring(l-11,l-9)); - fileMin = Integer.parseInt(fileName.substring(l-8,l-6)); - } - - log.info("fileDay: " + fileDay); - log.info("fileHour: " + fileHour); - log.info("fileMin: " + fileMin); - writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin, recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour +"_" +fileMin); - // mark this directory for hourly rotate - addDirectory4Rolling( false,fileDay , fileHour, cluster , dataSource); - } - else - { - throw new RuntimeException("Wrong fileName format! [" + fileName+"]"); - } - } - } - - static void addDirectory4Rolling(boolean isDailyOnly, int day,int hour,String cluster, String dataSource) throws IOException - { - // TODO get root directory from config - String rollingDirectory = "/chukwa/rolling/"; - - Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster +"/" + dataSource); - if (!fs.exists(path)) - { fs.mkdirs(path);} - - if (!isDailyOnly) - { - path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/" + cluster +"/" + dataSource); - if (!fs.exists(path)) - { fs.mkdirs(path);} - } - } - - static void writeRecordFile(String destDir,Path recordFile,String fileName) throws IOException - { - boolean done = false; - int count = 1; - do - { - Path destDirPath = new Path(destDir ); - Path destFilePath = new Path(destDir + "/" + fileName + "." + count + ".evt" ); - - if (!fs.exists(destDirPath)) - { - fs.mkdirs(destDirPath); - log.info(">>>>>>>>>>>> create Dir" + destDirPath); - } - - if (!fs.exists(destFilePath)) - { - log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "+ destFilePath); - //fs.rename(recordFile,destFilePath); - FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf); - //FileUtil.replaceFile(new File(recordFile.toUri()), new File(destFilePath.toUri())); - done = true; - log.info(">>>>>>>>>>>> after Rename" + destFilePath); - } - else - { - log.info("Start MoveToRepository main()"); - } - count ++; - // Just put a limit here - // TODO read from config - if (count > 1000) - { - throw new IOException("too many files in this directory: " + destDir); - } - } while (!done); - } - - static boolean checkRotate(String directoryAsString, boolean createDirectoryIfNotExist) throws IOException - { - Path directory = new Path(directoryAsString); - boolean exist = fs.exists(directory); - - if (! exist ) - { - if (createDirectoryIfNotExist== true) - { fs.mkdirs(directory); } - return false; - } - else - { - return fs.exists(new Path(directoryAsString + "/rotateDone")); - } - } - - /** - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception - { - conf = new ChukwaConfiguration(); - String fsName = conf.get("writer.hdfs.filesystem"); - fs = FileSystem.get(new URI(fsName), conf); - - Path srcDir = new Path(args[0]); - String destDir = args[1]; - - log.info("Start MoveToRepository main()"); - - FileStatus fstat = fs.getFileStatus(srcDir); - - if (!fstat.isDir()) - { - throw new IOException(srcDir + " is not a directory!"); - } - else - { - FileStatus[] clusters = fs.listStatus(srcDir); - // Run a moveOrMerge on all clusters - String name = null; - for(FileStatus cluster : clusters) - { - name = cluster.getPath().getName(); - // Skip hadoop M/R outputDir - if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) ) - { - continue; - } - log.info("main procesing Cluster (" + cluster.getPath().getName() +")"); - processClutserDirectory(cluster.getPath(),destDir + "/" + cluster.getPath().getName()); - - // Delete the demux's cluster dir - FileUtil.fullyDelete(fs,cluster.getPath()); - } - } - - log.info("Done with MoveToRepository main()"); +public class MoveToRepository { + static Logger log = Logger.getLogger(MoveToRepository.class); + + static ChukwaConfiguration conf = null; + static FileSystem fs = null; + static final String HadoopLogDir = "_logs"; + static final String hadoopTempDir = "_temporary"; + static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd"); + static Calendar calendar = Calendar.getInstance(); + + static void processClutserDirectory(Path srcDir, String destDir) + throws Exception { + log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir + + ")"); + FileStatus fstat = fs.getFileStatus(srcDir); + + if (!fstat.isDir()) { + throw new IOException(srcDir + " is not a directory!"); + } else { + FileStatus[] datasourceDirectories = fs.listStatus(srcDir); + + for (FileStatus datasourceDirectory : datasourceDirectories) { + log.info(datasourceDirectory.getPath() + " isDir?" + + datasourceDirectory.isDir()); + if (!datasourceDirectory.isDir()) { + throw new IOException( + "Top level datasource directory should be a directory :" + + datasourceDirectory.getPath()); + } + + String dirName = datasourceDirectory.getPath().getName(); + Path destPath = new Path(destDir + "/" + dirName); + log.info("dest directory path: " + destPath); + log.info("processClutserDirectory processing Datasource: (" + dirName + + ")"); + processDatasourceDirectory(srcDir.getName(), datasourceDirectory + .getPath(), destDir + "/" + dirName); + } + } + } + + static void processDatasourceDirectory(String cluster, Path srcDir, + String destDir) throws Exception { + String fileName = null; + int fileDay = 0; + int fileHour = 0; + int fileMin = 0; + + FileStatus[] recordFiles = fs.listStatus(srcDir); + for (FileStatus recordFile : recordFiles) { + // dataSource_20080915_18_15.1.evt + // _.1.evt + + fileName = recordFile.getPath().getName(); + log.info("processDatasourceDirectory processing RecordFile: (" + fileName + + ")"); + log.info("fileName: " + fileName); + + int l = fileName.length(); + String dataSource = srcDir.getName(); + log.info("Datasource: " + dataSource); + + if (fileName.endsWith(".D.evt")) { + // Hadoop_dfs_datanode_20080919.D.evt + + fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6)); + writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(), + dataSource + "_" + fileDay); + // mark this directory for Daily rotate (re-process) + addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource); + } else if (fileName.endsWith(".H.evt")) { + // Hadoop_dfs_datanode_20080925_1.H.evt + // Hadoop_dfs_datanode_20080925_12.H.evt + + String day = null; + String hour = null; + if (fileName.charAt(l - 8) == '_') { + day = fileName.substring(l - 16, l - 8); + log.info("day->" + day); + hour = "" + fileName.charAt(l - 7); + log.info("hour->" + hour); + } else { + day = fileName.substring(l - 17, l - 9); + log.info("day->" + day); + hour = fileName.substring(l - 8, l - 6); + log.info("hour->" + hour); + } + fileDay = Integer.parseInt(day); + fileHour = Integer.parseInt(hour); + // rotate there so spill + writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/", + recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour); + // mark this directory for daily rotate + addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource); + } else if (fileName.endsWith(".R.evt")) { + if (fileName.charAt(l - 11) == '_') { + fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11)); + fileHour = Integer.parseInt("" + fileName.charAt(l - 10)); + fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6)); + } else { + fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12)); + fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9)); + fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6)); + } + + log.info("fileDay: " + fileDay); + log.info("fileHour: " + fileHour); + log.info("fileMin: " + fileMin); + writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/" + + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_" + + fileHour + "_" + fileMin); + // mark this directory for hourly rotate + addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource); + } else { + throw new RuntimeException("Wrong fileName format! [" + fileName + "]"); + } + } + } + + static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour, + String cluster, String dataSource) throws IOException { + // TODO get root directory from config + String rollingDirectory = "/chukwa/rolling/"; + + Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster + + "/" + dataSource); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + + if (!isDailyOnly) { + path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/" + + cluster + "/" + dataSource); + if (!fs.exists(path)) { + fs.mkdirs(path); + } + } + } + + static void writeRecordFile(String destDir, Path recordFile, String fileName) + throws IOException { + boolean done = false; + int count = 1; + do { + Path destDirPath = new Path(destDir); + Path destFilePath = new Path(destDir + "/" + fileName + "." + count + + ".evt"); + + if (!fs.exists(destDirPath)) { + fs.mkdirs(destDirPath); + log.info(">>>>>>>>>>>> create Dir" + destDirPath); + } + + if (!fs.exists(destFilePath)) { + log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- " + + destFilePath); + // fs.rename(recordFile,destFilePath); + FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf); + // FileUtil.replaceFile(new File(recordFile.toUri()), new + // File(destFilePath.toUri())); + done = true; + log.info(">>>>>>>>>>>> after Rename" + destFilePath); + } else { + log.info("Start MoveToRepository main()"); + } + count++; + // Just put a limit here + // TODO read from config + if (count > 1000) { + throw new IOException("too many files in this directory: " + destDir); + } + } while (!done); + } + + static boolean checkRotate(String directoryAsString, + boolean createDirectoryIfNotExist) throws IOException { + Path directory = new Path(directoryAsString); + boolean exist = fs.exists(directory); + + if (!exist) { + if (createDirectoryIfNotExist == true) { + fs.mkdirs(directory); + } + return false; + } else { + return fs.exists(new Path(directoryAsString + "/rotateDone")); + } + } + + /** + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + conf = new ChukwaConfiguration(); + String fsName = conf.get("writer.hdfs.filesystem"); + fs = FileSystem.get(new URI(fsName), conf); + + Path srcDir = new Path(args[0]); + String destDir = args[1]; + + log.info("Start MoveToRepository main()"); + + FileStatus fstat = fs.getFileStatus(srcDir); + + if (!fstat.isDir()) { + throw new IOException(srcDir + " is not a directory!"); + } else { + FileStatus[] clusters = fs.listStatus(srcDir); + // Run a moveOrMerge on all clusters + String name = null; + for (FileStatus cluster : clusters) { + name = cluster.getPath().getName(); + // Skip hadoop M/R outputDir + if ((name.intern() == HadoopLogDir.intern()) + || (name.intern() == hadoopTempDir.intern())) { + continue; + } + log + .info("main procesing Cluster (" + cluster.getPath().getName() + + ")"); + processClutserDirectory(cluster.getPath(), destDir + "/" + + cluster.getPath().getName()); + + // Delete the demux's cluster dir + FileUtil.fullyDelete(fs, cluster.getPath()); + } + } + + log.info("Done with MoveToRepository main()"); - } + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java Wed Mar 11 22:39:26 2009 @@ -18,8 +18,8 @@ package org.apache.hadoop.chukwa.extraction.demux; -import java.io.IOException; +import java.io.IOException; import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -29,115 +29,105 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; -public class RecordMerger extends Thread -{ - static Logger log = Logger.getLogger(RecordMerger.class); - ChukwaConfiguration conf = null; - FileSystem fs = null; - String[] mergeArgs = null; - Tool tool = null; - boolean deleteRawData = false; - - public RecordMerger(ChukwaConfiguration conf,FileSystem fs,Tool tool,String[] mergeArgs,boolean deleteRawData) - { - this.conf = conf; - this.fs = fs; - this.tool = tool; - this.mergeArgs = mergeArgs; - this.deleteRawData = deleteRawData; - } - @Override - public void run() - { - System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]"); - int res; - try - { - res = ToolRunner.run(conf,tool, mergeArgs); - System.out.println("MR exit status: " + res); - if (res == 0) - { - writeRecordFile(mergeArgs[1]+"/part-00000",mergeArgs[2],mergeArgs[3]); - - // delete input - if (deleteRawData) - { - FileUtil.fullyDelete(fs,new Path(mergeArgs[0])); - - Path hours = new Path(mergeArgs[2]) ; - FileStatus[] hoursOrMinutesFS = fs.listStatus(hours); - for(FileStatus hourOrMinuteFS : hoursOrMinutesFS) - { - String dirName = hourOrMinuteFS.getPath().getName(); - - try - { - Integer.parseInt(dirName); - FileUtil.fullyDelete(fs,new Path(mergeArgs[2] + "/" + dirName)); - if (log.isDebugEnabled() ) - { log.debug("Deleting Hour directory: " + mergeArgs[2] + "/" + dirName); } - } - catch(NumberFormatException e) { /* Not an Hour or Minutes directory- Do nothing */ } - } - } - - // delete rolling tag - FileUtil.fullyDelete(fs, new Path(mergeArgs[3])); - // delete M/R temp directory - FileUtil.fullyDelete(fs, new Path(mergeArgs[1])); - } - else - { - throw new RuntimeException("Error in M/R merge operation!"); - } - - } - catch (Exception e) - { - e.printStackTrace(); - throw new RuntimeException("Error in M/R merge operation!",e); - } - } - - - void writeRecordFile(String input,String outputDir,String fileName) throws IOException - { - boolean done = false; - int count = 1; - Path recordFile = new Path(input); - do - { - Path destDirPath = new Path(outputDir ); - Path destFilePath = new Path(outputDir + "/" + fileName + "." + count + ".evt" ); - - if (!fs.exists(destDirPath)) - { - fs.mkdirs(destDirPath); - log.info(">>>>>>>>>>>> create Dir" + destDirPath); - } - - if (!fs.exists(destFilePath)) - { - boolean res = fs.rename(recordFile,destFilePath); - - if (res == false) - { - log.info(">>>>>>>>>>>> Use standard copy rename failded"); - FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf); - } - done = true; - } - else - { - log.info("Start MoveToRepository main()"); - } - count ++; - // Just put a limit here - // TODO read from config - if (count > 1000) - { - throw new IOException("too many files in this directory: " + destDirPath); - } - } while (!done); - } +public class RecordMerger extends Thread { + static Logger log = Logger.getLogger(RecordMerger.class); + ChukwaConfiguration conf = null; + FileSystem fs = null; + String[] mergeArgs = null; + Tool tool = null; + boolean deleteRawData = false; + + public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool, + String[] mergeArgs, boolean deleteRawData) { + this.conf = conf; + this.fs = fs; + this.tool = tool; + this.mergeArgs = mergeArgs; + this.deleteRawData = deleteRawData; + } + + @Override + public void run() { + System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]"); + int res; + try { + res = ToolRunner.run(conf, tool, mergeArgs); + System.out.println("MR exit status: " + res); + if (res == 0) { + writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2], + mergeArgs[3]); + + // delete input + if (deleteRawData) { + FileUtil.fullyDelete(fs, new Path(mergeArgs[0])); + + Path hours = new Path(mergeArgs[2]); + FileStatus[] hoursOrMinutesFS = fs.listStatus(hours); + for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) { + String dirName = hourOrMinuteFS.getPath().getName(); + + try { + Integer.parseInt(dirName); + FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName)); + if (log.isDebugEnabled()) { + log.debug("Deleting Hour directory: " + mergeArgs[2] + "/" + + dirName); + } + } catch (NumberFormatException e) { /* + * Not an Hour or Minutes + * directory- Do nothing + */ + } + } + } + + // delete rolling tag + FileUtil.fullyDelete(fs, new Path(mergeArgs[3])); + // delete M/R temp directory + FileUtil.fullyDelete(fs, new Path(mergeArgs[1])); + } else { + throw new RuntimeException("Error in M/R merge operation!"); + } + + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Error in M/R merge operation!", e); + } + } + + void writeRecordFile(String input, String outputDir, String fileName) + throws IOException { + boolean done = false; + int count = 1; + Path recordFile = new Path(input); + do { + Path destDirPath = new Path(outputDir); + Path destFilePath = new Path(outputDir + "/" + fileName + "." + count + + ".evt"); + + if (!fs.exists(destDirPath)) { + fs.mkdirs(destDirPath); + log.info(">>>>>>>>>>>> create Dir" + destDirPath); + } + + if (!fs.exists(destFilePath)) { + boolean res = fs.rename(recordFile, destFilePath); + + if (res == false) { + log.info(">>>>>>>>>>>> Use standard copy rename failded"); + FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf); + } + done = true; + } else { + log.info("Start MoveToRepository main()"); + } + count++; + // Just put a limit here + // TODO read from config + if (count > 1000) { + throw new IOException("too many files in this directory: " + + destDirPath); + } + } while (!done); + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java Wed Mar 11 22:39:26 2009 @@ -18,9 +18,9 @@ package org.apache.hadoop.chukwa.extraction.demux; + import org.apache.hadoop.chukwa.extraction.engine.Record; -public interface TaggerPlugin -{ - public void tag(String line, Record record); +public interface TaggerPlugin { + public void tag(String line, Record record); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java Wed Mar 11 22:39:26 2009 @@ -1,20 +1,22 @@ package org.apache.hadoop.chukwa.extraction.demux.processor; -import java.io.IOException; +import java.io.IOException; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -public class ChukwaOutputCollector implements OutputCollector -{ +public class ChukwaOutputCollector implements + OutputCollector { private OutputCollector outputCollector = null; private Reporter reporter = null; private String groupName = null; - - public ChukwaOutputCollector(String groupName,OutputCollector outputCollector,Reporter reporter) - { + + public ChukwaOutputCollector( + String groupName, + OutputCollector outputCollector, + Reporter reporter) { this.reporter = reporter; this.outputCollector = outputCollector; this.groupName = groupName; @@ -22,12 +24,10 @@ @Override public void collect(ChukwaRecordKey key, ChukwaRecord value) - throws IOException - { + throws IOException { this.outputCollector.collect(key, value); reporter.incrCounter(groupName, "total records", 1); - reporter.incrCounter(groupName, key.getReduceType() +" records" , 1); + reporter.incrCounter(groupName, key.getReduceType() + " records", 1); } - } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java Wed Mar 11 22:39:26 2009 @@ -18,72 +18,58 @@ package org.apache.hadoop.chukwa.extraction.demux.processor; + import java.text.SimpleDateFormat; import java.util.Calendar; -public class Util -{ - static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd"); - - static Calendar calendar = Calendar.getInstance(); - static int currentDay = 0; - static int currentHour = 0; - - static - { - synchronized(calendar) - { - calendar.setTimeInMillis( System.currentTimeMillis()); - currentDay = Integer.parseInt(day.format(calendar.getTime())); - currentHour = calendar.get(Calendar.HOUR_OF_DAY); - } - } - - public static String generateTimeOutput(long timestamp) - { - int workingDay = 0; - int workingHour = 0; - - String output = null; - - int minutes = 0; - synchronized(calendar) - { - calendar.setTimeInMillis( timestamp); - workingDay = Integer.parseInt(day.format(calendar.getTime())); - workingHour = calendar.get(Calendar.HOUR_OF_DAY); - minutes = calendar.get(Calendar.MINUTE); - } - - if (workingDay != currentDay) - { - output = "_" + workingDay + ".D.evt"; - } - else - { - if (workingHour != currentHour) - { - output = "_" +workingDay + "_" + workingHour + ".H.evt"; - } - else - { - output = "_" + workingDay + "_" + workingHour + "_"; - int dec = minutes/10; - output += dec ; - - int m = minutes - (dec*10); - if (m < 5) - { - output += "0.R.evt"; - } - else - { - output += "5.R.evt"; - } - } - } +public class Util { + static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd"); + static Calendar calendar = Calendar.getInstance(); + static int currentDay = 0; + static int currentHour = 0; + + static { + synchronized (calendar) { + calendar.setTimeInMillis(System.currentTimeMillis()); + currentDay = Integer.parseInt(day.format(calendar.getTime())); + currentHour = calendar.get(Calendar.HOUR_OF_DAY); + } + } + + public static String generateTimeOutput(long timestamp) { + int workingDay = 0; + int workingHour = 0; + + String output = null; + + int minutes = 0; + synchronized (calendar) { + calendar.setTimeInMillis(timestamp); + workingDay = Integer.parseInt(day.format(calendar.getTime())); + workingHour = calendar.get(Calendar.HOUR_OF_DAY); + minutes = calendar.get(Calendar.MINUTE); + } + + if (workingDay != currentDay) { + output = "_" + workingDay + ".D.evt"; + } else { + if (workingHour != currentHour) { + output = "_" + workingDay + "_" + workingHour + ".H.evt"; + } else { + output = "_" + workingDay + "_" + workingHour + "_"; + int dec = minutes / 10; + output += dec; + + int m = minutes - (dec * 10); + if (m < 5) { + output += "0.R.evt"; + } else { + output += "5.R.evt"; + } + } + } - return output; - } + return output; + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,8 +18,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.util.Calendar; +import java.util.Calendar; import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; @@ -30,10 +30,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public abstract class AbstractProcessor implements MapProcessor -{ +public abstract class AbstractProcessor implements MapProcessor { static Logger log = Logger.getLogger(AbstractProcessor.class); - + Calendar calendar = Calendar.getInstance(); byte[] bytes; int[] recordOffsets; @@ -48,24 +47,19 @@ OutputCollector output = null; Reporter reporter = null; - public AbstractProcessor() - { + public AbstractProcessor() { } protected abstract void parse(String recordEntry, OutputCollector output, Reporter reporter) throws Throwable; - protected void saveChunkInError(Throwable throwable) - { - if (chunkInErrorSaved == false) - { - try - { + protected void saveChunkInError(Throwable throwable) { + if (chunkInErrorSaved == false) { + try { ChunkSaver.saveChunk(chunk, throwable, output, reporter); chunkInErrorSaved = true; - } catch (Exception e) - { + } catch (Exception e) { e.printStackTrace(); } } @@ -73,31 +67,26 @@ } public void process(ChukwaArchiveKey archiveKey, Chunk chunk, - OutputCollector output, Reporter reporter) - { + OutputCollector output, Reporter reporter) { chunkInErrorSaved = false; - + this.archiveKey = archiveKey; this.output = output; this.reporter = reporter; - + reset(chunk); - - while (hasNext()) - { - try - { + + while (hasNext()) { + try { parse(nextLine(), output, reporter); - } catch (Throwable e) - { + } catch (Throwable e) { saveChunkInError(e); } } } protected void buildGenericRecord(ChukwaRecord record, String body, - long timestamp, String dataSource) - { + long timestamp, String dataSource) { calendar.setTimeInMillis(timestamp); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); @@ -107,8 +96,7 @@ + timestamp); key.setReduceType(dataSource); - if (body != null) - { + if (body != null) { record.add(Record.bodyField, body); } record.setTime(timestamp); @@ -119,8 +107,7 @@ } - protected void reset(Chunk chunk) - { + protected void reset(Chunk chunk) { this.chunk = chunk; this.bytes = chunk.getData(); this.recordOffsets = chunk.getRecordOffsets(); @@ -128,13 +115,11 @@ startOffset = 0; } - protected boolean hasNext() - { + protected boolean hasNext() { return (currentPos < recordOffsets.length); } - protected String nextLine() - { + protected String nextLine() { String log = new String(bytes, startOffset, (recordOffsets[currentPos] - startOffset + 1)); startOffset = recordOffsets[currentPos] + 1; Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,14 +18,16 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -public interface ChunkProcessor -{ - public String getDataType(); - public void process(Chunk chunk,OutputCollector output, Reporter reporter); +public interface ChunkProcessor { + public String getDataType(); + + public void process(Chunk chunk, OutputCollector output, + Reporter reporter); } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java Wed Mar 11 22:39:26 2009 @@ -18,8 +18,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.util.Calendar; +import java.util.Calendar; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; @@ -30,17 +30,15 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class ChunkSaver -{ +public class ChunkSaver { static Logger log = Logger.getLogger(ChunkSaver.class); + public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable, - OutputCollector output, Reporter reporter) - { - try - { + OutputCollector output, Reporter reporter) { + try { reporter.incrCounter("DemuxError", "count", 1); reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1); - + ChukwaRecord record = new ChukwaRecord(); long ts = System.currentTimeMillis(); Calendar calendar = Calendar.getInstance(); @@ -68,22 +66,15 @@ output.collect(key, record); return record; - } - catch (Throwable e) - { + } catch (Throwable e) { e.printStackTrace(); - try - { - log.warn("Unable to save a chunk: tags: " - + chunk.getTags() + " - source:" - + chunk.getSource() + " - dataType: " - + chunk.getDataType() + " - Stream: " - + chunk.getStreamName() + " - SeqId: " - + chunk.getSeqID() + " - Data: " - + new String(chunk.getData()) ); - } - catch (Throwable e1) - { + try { + log.warn("Unable to save a chunk: tags: " + chunk.getTags() + + " - source:" + chunk.getSource() + " - dataType: " + + chunk.getDataType() + " - Stream: " + chunk.getStreamName() + + " - SeqId: " + chunk.getSeqID() + " - Data: " + + new String(chunk.getData())); + } catch (Throwable e1) { e.printStackTrace(); } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java Wed Mar 11 22:39:26 2009 @@ -18,31 +18,27 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -public class DFInvalidRecord extends Exception -{ - /** +public class DFInvalidRecord extends Exception { + + /** * */ - private static final long serialVersionUID = 1254238125122522523L; + private static final long serialVersionUID = 1254238125122522523L; + + public DFInvalidRecord() { + } - public DFInvalidRecord() - { - } - - public DFInvalidRecord(String arg0) - { - super(arg0); - } - - public DFInvalidRecord(Throwable arg0) - { - super(arg0); - } - - public DFInvalidRecord(String arg0, Throwable arg1) - { - super(arg0, arg1); - } + public DFInvalidRecord(String arg0) { + super(arg0); + } + + public DFInvalidRecord(Throwable arg0) { + super(arg0); + } + + public DFInvalidRecord(String arg0, Throwable arg1) { + super(arg0, arg1); + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,39 +18,34 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.io.IOException; +import java.io.IOException; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class DebugOutputProcessor extends AbstractProcessor -{ - static Logger log = Logger.getLogger(DebugOutputProcessor.class); - public static final String recordType = "Debug"; - - @Override - public void parse(String line, OutputCollector output, - Reporter reporter) - { - log.info("record: [" + line + "] type[" + chunk.getDataType() + "]"); - - ChukwaRecord record = new ChukwaRecord(); - buildGenericRecord(record,line, System.currentTimeMillis(),recordType); - key.setKey("" + chunk.getSeqID()); - try - { - output.collect(key, record); - } catch (IOException e) - { - e.printStackTrace(); - } - } - - public String getDataType() - { - return DebugOutputProcessor.recordType; - } +public class DebugOutputProcessor extends AbstractProcessor { + static Logger log = Logger.getLogger(DebugOutputProcessor.class); + public static final String recordType = "Debug"; + + @Override + public void parse(String line, + OutputCollector output, Reporter reporter) { + log.info("record: [" + line + "] type[" + chunk.getDataType() + "]"); + + ChukwaRecord record = new ChukwaRecord(); + buildGenericRecord(record, line, System.currentTimeMillis(), recordType); + key.setKey("" + chunk.getSeqID()); + try { + output.collect(key, record); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public String getDataType() { + return DebugOutputProcessor.recordType; + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java Wed Mar 11 22:39:26 2009 @@ -1,32 +1,28 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -import java.io.IOException; +import java.io.IOException; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class DefaultProcessor extends AbstractProcessor -{ - static Logger log = Logger.getLogger(DefaultProcessor.class); - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) - { - try - { - ChukwaRecord record = new ChukwaRecord(); - this.buildGenericRecord(record, recordEntry, archiveKey.getTimePartition(), chunk.getDataType()); - output.collect(key, record); - } - catch (IOException e) - { - log.warn("Unable to collect output in DefaultProcessor [" - + recordEntry + "]", e); - e.printStackTrace(); - } - } +public class DefaultProcessor extends AbstractProcessor { + static Logger log = Logger.getLogger(DefaultProcessor.class); + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) { + try { + ChukwaRecord record = new ChukwaRecord(); + this.buildGenericRecord(record, recordEntry, archiveKey + .getTimePartition(), chunk.getDataType()); + output.collect(key, record); + } catch (IOException e) { + log.warn("Unable to collect output in DefaultProcessor [" + recordEntry + + "]", e); + e.printStackTrace(); + } + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java Wed Mar 11 22:39:26 2009 @@ -18,107 +18,97 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class Df extends AbstractProcessor -{ - static Logger log = Logger.getLogger(Df.class); - private static final String[] headerSplitCols = { "Filesystem", "1K-blocks", - "Used", "Available", "Use%", "Mounted", "on" }; - private static final String[] headerCols = { "Filesystem", "1K-blocks", - "Used", "Available", "Use%", "Mounted on" }; - private SimpleDateFormat sdf = null; - - public Df() - { - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - } - - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) - throws Throwable - { - - try - { - String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - // String level = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - // String className = recordEntry.substring(start, idx-1); - String body = recordEntry.substring(idx + 1); - - Date d = sdf.parse(dStr); - String[] lines = body.split("\n"); - - String[] outputCols = lines[0].split("[\\s]++"); - - if (outputCols.length != headerSplitCols.length - || outputCols[0].intern() != headerSplitCols[0].intern() - || outputCols[1].intern() != headerSplitCols[1].intern() - || outputCols[2].intern() != headerSplitCols[2].intern() - || outputCols[3].intern() != headerSplitCols[3].intern() - || outputCols[4].intern() != headerSplitCols[4].intern() - || outputCols[5].intern() != headerSplitCols[5].intern() - || outputCols[6].intern() != headerSplitCols[6].intern() - ) - { - throw new DFInvalidRecord("Wrong output format (header) [" - + recordEntry + "]"); - } - - String[] values = null; - - // Data - ChukwaRecord record = null; - - for (int i = 1; i < lines.length; i++) - { - values = lines[i].split("[\\s]++"); - key = new ChukwaRecordKey(); - record = new ChukwaRecord(); - this.buildGenericRecord(record, null, d.getTime(), "Df"); - - record.add(headerCols[0], values[0]); - record.add(headerCols[1], values[1]); - record.add(headerCols[2], values[2]); - record.add(headerCols[3], values[3]); - record.add(headerCols[4], values[4].substring(0, values[4].length()-1)); // Remove % - record.add(headerCols[5], values[5]); - - output.collect(key, record); - } - - //log.info("DFProcessor output 1 DF record"); - } catch (ParseException e) - { - e.printStackTrace(); - log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); - throw e; - } catch (IOException e) - { - e.printStackTrace(); - log.warn("Unable to collect output in DFProcessor [" + recordEntry - + "]", e); - throw e; - } catch (DFInvalidRecord e) - { - e.printStackTrace(); - log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); - throw e; - } - } +public class Df extends AbstractProcessor { + static Logger log = Logger.getLogger(Df.class); + private static final String[] headerSplitCols = { "Filesystem", "1K-blocks", + "Used", "Available", "Use%", "Mounted", "on" }; + private static final String[] headerCols = { "Filesystem", "1K-blocks", + "Used", "Available", "Use%", "Mounted on" }; + private SimpleDateFormat sdf = null; + + public Df() { + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + } + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + + try { + String dStr = recordEntry.substring(0, 23); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + // String level = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + // String className = recordEntry.substring(start, idx-1); + String body = recordEntry.substring(idx + 1); + + Date d = sdf.parse(dStr); + String[] lines = body.split("\n"); + + String[] outputCols = lines[0].split("[\\s]++"); + + if (outputCols.length != headerSplitCols.length + || outputCols[0].intern() != headerSplitCols[0].intern() + || outputCols[1].intern() != headerSplitCols[1].intern() + || outputCols[2].intern() != headerSplitCols[2].intern() + || outputCols[3].intern() != headerSplitCols[3].intern() + || outputCols[4].intern() != headerSplitCols[4].intern() + || outputCols[5].intern() != headerSplitCols[5].intern() + || outputCols[6].intern() != headerSplitCols[6].intern()) { + throw new DFInvalidRecord("Wrong output format (header) [" + + recordEntry + "]"); + } + + String[] values = null; + + // Data + ChukwaRecord record = null; + + for (int i = 1; i < lines.length; i++) { + values = lines[i].split("[\\s]++"); + key = new ChukwaRecordKey(); + record = new ChukwaRecord(); + this.buildGenericRecord(record, null, d.getTime(), "Df"); + + record.add(headerCols[0], values[0]); + record.add(headerCols[1], values[1]); + record.add(headerCols[2], values[2]); + record.add(headerCols[3], values[3]); + record.add(headerCols[4], values[4] + .substring(0, values[4].length() - 1)); // Remove % + record.add(headerCols[5], values[5]); + + output.collect(key, record); + } + + // log.info("DFProcessor output 1 DF record"); + } catch (ParseException e) { + e.printStackTrace(); + log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); + throw e; + } catch (IOException e) { + e.printStackTrace(); + log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]", + e); + throw e; + } catch (DFInvalidRecord e) { + e.printStackTrace(); + log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); + throw e; + } + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java Wed Mar 11 22:39:26 2009 @@ -18,30 +18,27 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; -public class DuplicateProcessorException extends RuntimeException -{ - /** +public class DuplicateProcessorException extends RuntimeException { + + /** * */ - private static final long serialVersionUID = 3890267797961057789L; + private static final long serialVersionUID = 3890267797961057789L; - public DuplicateProcessorException() - {} + public DuplicateProcessorException() { + } - public DuplicateProcessorException(String message) - { - super(message); - } - - public DuplicateProcessorException(Throwable cause) - { - super(cause); - } - - public DuplicateProcessorException(String message, Throwable cause) - { - super(message, cause); - } + public DuplicateProcessorException(String message) { + super(message); + } + + public DuplicateProcessorException(Throwable cause) { + super(cause); + } + + public DuplicateProcessorException(String message, Throwable cause) { + super(message, cause); + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,79 +18,66 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; +public class HadoopLogProcessor extends AbstractProcessor { + static Logger log = Logger.getLogger(HadoopLogProcessor.class); - -public class HadoopLogProcessor extends AbstractProcessor -{ - static Logger log = Logger.getLogger(HadoopLogProcessor.class); - - private static final String recordType = "HadoopLog"; - private static final String nameNodeType = "NameNode"; - private static final String dataNodeType = "DataNode"; - private static final String auditType = "Audit"; - private SimpleDateFormat sdf = null; - - - public HadoopLogProcessor() - { - //TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); - } - - @Override - public void parse(String recordEntry, OutputCollector output, - Reporter reporter) - throws Throwable - { - try - { - String dStr = recordEntry.substring(0, 23); - Date d = sdf.parse(dStr); - ChukwaRecord record = new ChukwaRecord(); - - if (this.chunk.getStreamName().indexOf("datanode") > 0) { - buildGenericRecord(record,recordEntry,d.getTime(),dataNodeType); - } else if (this.chunk.getStreamName().indexOf("namenode") > 0) { - buildGenericRecord(record,recordEntry,d.getTime(),nameNodeType); - } else if (this.chunk.getStreamName().indexOf("audit") > 0) { - buildGenericRecord(record,recordEntry,d.getTime(),auditType); - } else { - buildGenericRecord(record,recordEntry,d.getTime(),recordType); - } - - - output.collect(key, record); - } - catch (ParseException e) - { - log.warn("Unable to parse the date in DefaultProcessor [" - + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - catch (IOException e) - { - log.warn("Unable to collect output in DefaultProcessor [" - + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - } - - public String getDataType() - { - return HadoopLogProcessor.recordType; - } + private static final String recordType = "HadoopLog"; + private static final String nameNodeType = "NameNode"; + private static final String dataNodeType = "DataNode"; + private static final String auditType = "Audit"; + private SimpleDateFormat sdf = null; + + public HadoopLogProcessor() { + // TODO move that to config + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + } + + @Override + public void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + try { + String dStr = recordEntry.substring(0, 23); + Date d = sdf.parse(dStr); + ChukwaRecord record = new ChukwaRecord(); + + if (this.chunk.getStreamName().indexOf("datanode") > 0) { + buildGenericRecord(record, recordEntry, d.getTime(), dataNodeType); + } else if (this.chunk.getStreamName().indexOf("namenode") > 0) { + buildGenericRecord(record, recordEntry, d.getTime(), nameNodeType); + } else if (this.chunk.getStreamName().indexOf("audit") > 0) { + buildGenericRecord(record, recordEntry, d.getTime(), auditType); + } else { + buildGenericRecord(record, recordEntry, d.getTime(), recordType); + } + + output.collect(key, record); + } catch (ParseException e) { + log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry + + "]", e); + e.printStackTrace(); + throw e; + } catch (IOException e) { + log.warn("Unable to collect output in DefaultProcessor [" + recordEntry + + "]", e); + e.printStackTrace(); + throw e; + } + } + + public String getDataType() { + return HadoopLogProcessor.recordType; + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Wed Mar 11 22:39:26 2009 @@ -18,13 +18,13 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Iterator; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; @@ -35,86 +35,86 @@ public class HadoopMetricsProcessor extends AbstractProcessor { - static Logger log = Logger.getLogger(HadoopMetricsProcessor.class); - static final String chukwaTimestampField = "chukwa_timestamp"; - static final String contextNameField = "contextName"; - static final String recordNameField = "recordName"; - - private SimpleDateFormat sdf = null; - - public HadoopMetricsProcessor() { - // TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - } - - @SuppressWarnings("unchecked") - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) throws Throwable { - try { - String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - // String level = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - // String className = recordEntry.substring(start, idx-1); - String body = recordEntry.substring(idx + 1); - body.replaceAll("\n", ""); - // log.info("record [" + recordEntry + "] body [" + body +"]"); - Date d = sdf.parse(dStr); - - JSONObject json = new JSONObject(body); - - ChukwaRecord record = new ChukwaRecord(); - String datasource = null; - String recordName = null; - - Iterator ki = json.keys(); - while (ki.hasNext()) { - String keyName = ki.next(); - if (chukwaTimestampField.intern() == keyName.intern()) { - d = new Date(json.getLong(keyName)); - Calendar cal = Calendar.getInstance(); - cal.setTimeInMillis(d.getTime()); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - d.setTime(cal.getTimeInMillis()); - } else if (contextNameField.intern() == keyName.intern()) { - datasource = "Hadoop_" + json.getString(keyName); - } else if (recordNameField.intern() == keyName.intern()) { - recordName = json.getString(keyName); - record.add(keyName, json.getString(keyName)); - } else { - record.add(keyName, json.getString(keyName)); - } - } - - datasource = datasource + "_" + recordName; - buildGenericRecord(record, null, d.getTime(), datasource); - output.collect(key, record); - } catch (ParseException e) { - e.printStackTrace(); - log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry - + "]", e); - throw e; - } catch (IOException e) { - e.printStackTrace(); - log.warn("Unable to collect output in HadoopMetricsProcessor [" - + recordEntry + "]", e); - throw e; - } catch (JSONException e) { - e.printStackTrace(); - log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry - + "]", e); - throw e; - } - - } - - public String getDataType() { - return HadoopMetricsProcessor.class.getName(); - } + static Logger log = Logger.getLogger(HadoopMetricsProcessor.class); + static final String chukwaTimestampField = "chukwa_timestamp"; + static final String contextNameField = "contextName"; + static final String recordNameField = "recordName"; + + private SimpleDateFormat sdf = null; + + public HadoopMetricsProcessor() { + // TODO move that to config + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + } + + @SuppressWarnings("unchecked") + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + try { + String dStr = recordEntry.substring(0, 23); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + // String level = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + // String className = recordEntry.substring(start, idx-1); + String body = recordEntry.substring(idx + 1); + body.replaceAll("\n", ""); + // log.info("record [" + recordEntry + "] body [" + body +"]"); + Date d = sdf.parse(dStr); + + JSONObject json = new JSONObject(body); + + ChukwaRecord record = new ChukwaRecord(); + String datasource = null; + String recordName = null; + + Iterator ki = json.keys(); + while (ki.hasNext()) { + String keyName = ki.next(); + if (chukwaTimestampField.intern() == keyName.intern()) { + d = new Date(json.getLong(keyName)); + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(d.getTime()); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + d.setTime(cal.getTimeInMillis()); + } else if (contextNameField.intern() == keyName.intern()) { + datasource = "Hadoop_" + json.getString(keyName); + } else if (recordNameField.intern() == keyName.intern()) { + recordName = json.getString(keyName); + record.add(keyName, json.getString(keyName)); + } else { + record.add(keyName, json.getString(keyName)); + } + } + + datasource = datasource + "_" + recordName; + buildGenericRecord(record, null, d.getTime(), datasource); + output.collect(key, record); + } catch (ParseException e) { + e.printStackTrace(); + log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", + e); + throw e; + } catch (IOException e) { + e.printStackTrace(); + log.warn("Unable to collect output in HadoopMetricsProcessor [" + + recordEntry + "]", e); + throw e; + } catch (JSONException e) { + e.printStackTrace(); + log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", + e); + throw e; + } + + } + + public String getDataType() { + return HadoopMetricsProcessor.class.getName(); + } } Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Wed Mar 11 22:39:26 2009 @@ -18,124 +18,118 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.text.SimpleDateFormat; import java.util.Date; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; -public class Iostat extends AbstractProcessor -{ - static Logger log = Logger.getLogger(Iostat.class); - public final String recordType = this.getClass().getName(); - - private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)"; - private static Pattern p = null; - - private Matcher matcher = null; - private SimpleDateFormat sdf = null; - - public Iostat() - { - //TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - p = Pattern.compile(regex); - } - - @Override - protected void parse(String recordEntry, OutputCollector output, - Reporter reporter) - throws Throwable - { - - log.debug("Iostat record: [" + recordEntry + "] type[" + chunk.getDataType() + "]"); - int i = 0; - - matcher=p.matcher(recordEntry); - while (matcher.find()) - { - log.debug("Iostat Processor Matches"); - - try - { - Date d = sdf.parse( matcher.group(1).trim()); - - - - String[] lines = recordEntry.split("\n"); - String[] headers = null; - for(int skip=0;skip<2;skip++) { - i++; - while ( i < lines.length && lines[i].indexOf("avg-cpu")<0) { - // Skip the first output because the numbers are averaged from system boot up - log.debug("skip line:"+lines[i]); - i++; - } - } - while (i < lines.length) - { - ChukwaRecord record = null; - - if(lines[i].indexOf("avg-cpu")>=0 || lines[i].indexOf("Device")>=0) { - headers = parseHeader(lines[i]); - i++; - } - String data[] = parseData(lines[i]); - if(headers[0].equals("avg-cpu:")) { - log.debug("Matched CPU-Utilization"); - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); - } else if(headers[0].equals("Device:")) { - log.debug("Matched Iostat"); - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); - } else { - log.debug("No match:"+headers[0]); - } - if(record!=null) { - int j=0; - log.debug("Data Length: " + data.length); - while(j3) { - output.collect(key, record); - } - } - i++; - } - // End of parsing - } catch (Exception e) - { - e.printStackTrace(); - throw e; - } - } - } - - public String[] parseHeader(String header) { - String[] headers = header.split("\\s+"); - return headers; - } - - public String[] parseData(String dataLine) { - String[] data = dataLine.split("\\s+"); - return data; - } - - public String getDataType() { - return recordType; - } +public class Iostat extends AbstractProcessor { + static Logger log = Logger.getLogger(Iostat.class); + public final String recordType = this.getClass().getName(); + + private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)"; + private static Pattern p = null; + + private Matcher matcher = null; + private SimpleDateFormat sdf = null; + + public Iostat() { + // TODO move that to config + sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + p = Pattern.compile(regex); + } + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + + log.debug("Iostat record: [" + recordEntry + "] type[" + + chunk.getDataType() + "]"); + int i = 0; + + matcher = p.matcher(recordEntry); + while (matcher.find()) { + log.debug("Iostat Processor Matches"); + + try { + Date d = sdf.parse(matcher.group(1).trim()); + + String[] lines = recordEntry.split("\n"); + String[] headers = null; + for (int skip = 0; skip < 2; skip++) { + i++; + while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) { + // Skip the first output because the numbers are averaged from + // system boot up + log.debug("skip line:" + lines[i]); + i++; + } + } + while (i < lines.length) { + ChukwaRecord record = null; + + if (lines[i].indexOf("avg-cpu") >= 0 + || lines[i].indexOf("Device") >= 0) { + headers = parseHeader(lines[i]); + i++; + } + String data[] = parseData(lines[i]); + if (headers[0].equals("avg-cpu:")) { + log.debug("Matched CPU-Utilization"); + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); + } else if (headers[0].equals("Device:")) { + log.debug("Matched Iostat"); + record = new ChukwaRecord(); + key = new ChukwaRecordKey(); + buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); + } else { + log.debug("No match:" + headers[0]); + } + if (record != null) { + int j = 0; + log.debug("Data Length: " + data.length); + while (j < data.length) { + log.debug("header:" + headers[j] + " data:" + data[j]); + if (!headers[j].equals("avg-cpu:")) { + record.add(headers[j], data[j]); + } + j++; + } + record.setTime(d.getTime()); + if (data.length > 3) { + output.collect(key, record); + } + } + i++; + } + // End of parsing + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + } + + public String[] parseHeader(String header) { + String[] headers = header.split("\\s+"); + return headers; + } + + public String[] parseData(String dataLine) { + String[] data = dataLine.split("\\s+"); + return data; + } + + public String getDataType() { + return recordType; + } } \ No newline at end of file Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=752666&r1=752665&r2=752666&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Wed Mar 11 22:39:26 2009 @@ -1,15 +1,14 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; + import java.io.File; import java.io.FileOutputStream; import java.util.Calendar; import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; - import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; - import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; @@ -22,99 +21,101 @@ import org.w3c.dom.Text; public class JobConfProcessor extends AbstractProcessor { - static Logger log = Logger.getLogger(JobConfProcessor.class); - static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?"); - static Pattern hodPattern = Pattern.compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)"); - static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)"); - @Override - protected void parse(String recordEntry, - OutputCollector output, - Reporter reporter) - throws Throwable - { - Long time = 0L; - Random randomNumber = new Random(); - String tags = this.chunk.getTags(); - - Matcher matcher = timePattern.matcher(tags); - if (matcher.matches()) { - time = Long.parseLong(matcher.group(2)); - } - String capp = this.chunk.getApplication(); - String hodID = ""; - String jobID = ""; - matcher = hodPattern.matcher(capp); - if(matcher.matches()) { - hodID=matcher.group(3); + static Logger log = Logger.getLogger(JobConfProcessor.class); + static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?"); + static Pattern hodPattern = Pattern + .compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)"); + static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)"); + + @Override + protected void parse(String recordEntry, + OutputCollector output, Reporter reporter) + throws Throwable { + Long time = 0L; + Random randomNumber = new Random(); + String tags = this.chunk.getTags(); + + Matcher matcher = timePattern.matcher(tags); + if (matcher.matches()) { + time = Long.parseLong(matcher.group(2)); + } + String capp = this.chunk.getApplication(); + String hodID = ""; + String jobID = ""; + matcher = hodPattern.matcher(capp); + if (matcher.matches()) { + hodID = matcher.group(3); + } + matcher = jobPattern.matcher(capp); + if (matcher.matches()) { + jobID = matcher.group(2); + } + ChukwaRecord record = new ChukwaRecord(); + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory + .newInstance(); + // ignore all comments inside the xml file + docBuilderFactory.setIgnoringComments(true); + try { + DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); + Document doc = null; + String fileName = "test_" + randomNumber.nextInt(); + File tmp = new File(fileName); + FileOutputStream out = new FileOutputStream(tmp); + out.write(recordEntry.getBytes()); + out.close(); + doc = builder.parse(fileName); + Element root = doc.getDocumentElement(); + if (!"configuration".equals(root.getTagName())) + log.fatal("bad conf file: top-level element not "); + NodeList props = root.getChildNodes(); + + for (int i = 0; i < props.getLength(); i++) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) + continue; + Element prop = (Element) propNode; + if (!"property".equals(prop.getTagName())) + log.warn("bad conf file: element not "); + NodeList fields = prop.getChildNodes(); + String attr = null; + String value = null; + boolean finalParameter = false; + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("name".equals(field.getTagName()) && field.hasChildNodes()) + attr = ((Text) field.getFirstChild()).getData().trim(); + if ("value".equals(field.getTagName()) && field.hasChildNodes()) + value = ((Text) field.getFirstChild()).getData(); + if ("final".equals(field.getTagName()) && field.hasChildNodes()) + finalParameter = "true".equals(((Text) field.getFirstChild()) + .getData()); } - matcher = jobPattern.matcher(capp); - if(matcher.matches()) { - jobID=matcher.group(2); + + // Ignore this parameter if it has already been marked as 'final' + if (attr != null && value != null) { + record.add(attr, value); } - ChukwaRecord record = new ChukwaRecord(); - DocumentBuilderFactory docBuilderFactory - = DocumentBuilderFactory.newInstance(); - //ignore all comments inside the xml file - docBuilderFactory.setIgnoringComments(true); - try { - DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc = null; - String fileName = "test_"+randomNumber.nextInt(); - File tmp = new File(fileName); - FileOutputStream out = new FileOutputStream(tmp); - out.write(recordEntry.getBytes()); - out.close(); - doc = builder.parse(fileName); - Element root = doc.getDocumentElement(); - if (!"configuration".equals(root.getTagName())) - log.fatal("bad conf file: top-level element not "); - NodeList props = root.getChildNodes(); - - for (int i = 0; i < props.getLength(); i++) { - Node propNode = props.item(i); - if (!(propNode instanceof Element)) - continue; - Element prop = (Element)propNode; - if (!"property".equals(prop.getTagName())) - log.warn("bad conf file: element not "); - NodeList fields = prop.getChildNodes(); - String attr = null; - String value = null; - boolean finalParameter = false; - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) - continue; - Element field = (Element)fieldNode; - if ("name".equals(field.getTagName()) && field.hasChildNodes()) - attr = ((Text)field.getFirstChild()).getData().trim(); - if ("value".equals(field.getTagName()) && field.hasChildNodes()) - value = ((Text)field.getFirstChild()).getData(); - if ("final".equals(field.getTagName()) && field.hasChildNodes()) - finalParameter = "true".equals(((Text)field.getFirstChild()).getData()); - } - - // Ignore this parameter if it has already been marked as 'final' - if (attr != null && value != null) { - record.add(attr, value); - } - } - buildGenericRecord(record,null, time,"JobConf"); - calendar.setTimeInMillis(time); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - calendar.set(Calendar.MILLISECOND, 0); - key.setKey(""+ calendar.getTimeInMillis() + "/" + hodID +"." + jobID + "/" + time); - - output.collect(key,record); - tmp.delete(); - } catch(Exception e) { - e.printStackTrace(); - throw e; - } - } - - public String getDataType() { - return Torque.class.getName(); - } + } + buildGenericRecord(record, null, time, "JobConf"); + calendar.setTimeInMillis(time); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + key.setKey("" + calendar.getTimeInMillis() + "/" + hodID + "." + jobID + + "/" + time); + + output.collect(key, record); + tmp.delete(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + public String getDataType() { + return Torque.class.getName(); + } }