hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r723855 [7/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...
Date Fri, 05 Dec 2008 20:30:21 GMT
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Fri Dec  5 12:30:14 2008
@@ -19,17 +19,23 @@
 package org.apache.hadoop.chukwa.extraction.demux;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -37,9 +43,9 @@
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
@@ -47,25 +53,97 @@
 public class Demux extends Configured implements Tool
 {
 	static Logger log = Logger.getLogger(Demux.class);
-	
+	static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
+	 
 	public static class MapClass extends MapReduceBase implements
-			Mapper<ChukwaArchiveKey, ChunkImpl , Text, ChukwaRecord>
+			Mapper<ChukwaArchiveKey, ChunkImpl , ChukwaRecordKey, ChukwaRecord>
 	{
+		JobConf jobConf = null;
 		
+		@Override
+		public void configure(JobConf jobConf)
+		{
+			super.configure(jobConf);
+			this.jobConf = jobConf;
+		}
+
 		public void map(ChukwaArchiveKey key, ChunkImpl chunk,
-				OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+				OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
 				throws IOException
 		{
-			try {
-				log.info("Entry: ["+ chunk.getData() + "] EventType: [" + chunk.getDataType() + "]");
+		
+		  ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxMapOutput",output,reporter);
+			try 
+			{
+				long duration = System.currentTimeMillis();
+				if (log.isDebugEnabled())
+				{
+					log.debug("Entry: ["+ chunk.getData() + "] EventType: [" + chunk.getDataType() + "]");	
+				}
+				String processorClass = jobConf.get(chunk.getDataType(), 
+							"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+			 
+				if (!processorClass.equalsIgnoreCase("Drop"))
+				{
+				  reporter.incrCounter("DemuxMapInput", "total chunks", 1);
+				  reporter.incrCounter("DemuxMapInput", chunk.getDataType() + " chunks" , 1);
+          
+					MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
+					processor.process(key,chunk, chukwaOutputCollector, reporter);
+					if (log.isDebugEnabled())
+					{	
+						duration = System.currentTimeMillis() - duration;
+						log.debug("Demux:Map dataType:" + chunk.getDataType() + 
+							" duration:" + duration + " processor:" + processorClass + " recordCount:" + chunk.getRecordOffsets().length );
+					}
+					
+				}
+				else
+				{
+					log.info("action:Demux, dataType:" + chunk.getDataType() +
+							" duration:0 processor:Drop recordCount:" + chunk.getRecordOffsets().length );
+				}
 				
-				ProcessorFactory.getProcessor(chunk.getDataType()).process(chunk, output, reporter);
-			} catch(Exception e) {
+			} 
+			catch(Exception e) 
+			{
+				log.warn("Exception in Demux:MAP", e);
 				e.printStackTrace();
 			}
 		}
 	}
 
+	 public static class ReduceClass extends MapReduceBase implements
+			Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord>
+	{
+		public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+				OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+				Reporter reporter) throws IOException
+		{
+		  ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxReduceOutput",output,reporter);
+			try 
+			{
+				long duration = System.currentTimeMillis();
+        reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
+        reporter.incrCounter("DemuxReduceInput",  key.getReduceType() +" total distinct keys" , 1);
+        
+        ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,values, chukwaOutputCollector, reporter);
+
+				if (log.isDebugEnabled())
+				{	
+					duration = System.currentTimeMillis() - duration;
+					log.debug("Demux:Reduce, dataType:" + key.getReduceType() +" duration:" + duration);
+				}
+				
+			} 
+			catch(Exception e) 
+			{
+				log.warn("Exception in Demux:Reduce", e);
+				e.printStackTrace();
+			}
+		}
+	}
+	
 	static int printUsage() {
 		System.out
 				.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
@@ -76,17 +154,21 @@
 	public int run(String[] args) throws Exception
 	{
 		JobConf conf = new JobConf(getConf(), Demux.class);
-
-		conf.setJobName("Chukwa-Demux");
+		 conf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+		
+		conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
 		conf.setInputFormat(SequenceFileInputFormat.class);
 		conf.setMapperClass(Demux.MapClass.class);
 		conf.setPartitionerClass(ChukwaRecordPartitioner.class);
-		conf.setReducerClass(IdentityReducer.class);
+		conf.setReducerClass(Demux.ReduceClass.class);
 
-		conf.setOutputKeyClass(Text.class);
+		conf.setOutputKeyClass(ChukwaRecordKey.class);
 		conf.setOutputValueClass(ChukwaRecord.class);
 		conf.setOutputFormat(ChukwaRecordOutputFormat.class);
-		//
+
+//    conf.setCompressMapOutput(true);
+ //   conf.setMapOutputCompressorClass(LzoCodec.class);
+		
 		
 		List<String> other_args = new ArrayList<String>();
 		for (int i = 0; i < args.length; ++i) {

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Logger;
+
+// TODO do an abstract class for all rolling 
+public class HourlyChukwaRecordRolling extends Configured implements Tool
+{
+	static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
+	
+	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+	static ChukwaConfiguration conf = null;
+	static FileSystem fs = null;
+	static final String HadoopLogDir = "_logs";
+	static final String hadoopTempDir = "_temporary";
+	
+	static boolean rollInSequence = true;
+	static boolean deleteRawdata = false;
+
+	public static void usage()
+	{
+		System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+		System.exit(-1);
+	}
+	
+	
+	public static void buildHourlyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay, int workingHour) throws IOException
+	{
+		// process
+		Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour) ;
+		FileStatus[] clustersFS = fs.listStatus(hourPath);
+		for(FileStatus clusterFs : clustersFS)
+		{
+			String cluster = clusterFs.getPath().getName();
+			
+			Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster) ;
+			FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+			for(FileStatus dataSourceFS : dataSourcesFS)
+			{
+				String dataSource = dataSourceFS.getPath().getName();
+				// Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
+				
+				// put the rotate flag
+				fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/rotateDone"));
+				
+				// rotate
+				// Merge
+				String[] mergeArgs = new String[5];
+				// input
+				mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/*/*.evt";
+				// temp dir
+				mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "_" + System.currentTimeMillis() ;
+				// final output dir
+				mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour ;
+				// final output fileName
+				mergeArgs[3] =  dataSource +"_" + workingDay +"_" + workingHour;
+				// delete rolling directory
+				mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster + "/" + dataSource; 
+						
+				
+				log.info("HourlyChukwaRecordRolling 0: " +  mergeArgs[0] );
+				log.info("HourlyChukwaRecordRolling 1: " +  mergeArgs[1] );
+				log.info("HourlyChukwaRecordRolling 2: " +  mergeArgs[2] );
+				log.info("HourlyChukwaRecordRolling 3: " +  mergeArgs[3] );
+				log.info("HourlyChukwaRecordRolling 4: " +  mergeArgs[4] );
+				
+				RecordMerger merge = new RecordMerger(conf,fs,new HourlyChukwaRecordRolling(), mergeArgs,deleteRawdata);
+				List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+				if (rollInSequence)
+				{ merge.run(); }
+				else
+				{ 
+					allMerge.add(merge);
+					merge.start();
+				}
+				
+				// join all Threads
+				if (!rollInSequence)
+				{
+					while(allMerge.size() > 0)
+					{
+						RecordMerger m = allMerge.remove(0);
+						try
+						{ m.join(); } 
+						catch (InterruptedException e) {}
+					}
+				} // End if (!rollInSequence)
+				
+				// Delete the processed dataSourceFS
+				FileUtil.fullyDelete(fs,dataSourceFS.getPath());
+				
+			} // End for(FileStatus dataSourceFS : dataSourcesFS)
+			
+			// Delete the processed clusterFs
+			FileUtil.fullyDelete(fs,clusterFs.getPath());
+			
+		} // End for(FileStatus clusterFs : clustersFS)
+		
+		// Delete the processed hour
+		FileUtil.fullyDelete(fs,hourPath);
+	}
+	
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	public static void main(String[] args) throws  Exception
+	{
+		conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		fs = FileSystem.get(new URI(fsName), conf);
+		
+		// TODO read from config
+		String rollingFolder = "/chukwa/rolling/";
+		String chukwaMainRepository = "/chukwa/repos/";
+		String tempDir = "/chukwa/temp/hourlyRolling/";
+		
+		
+		// TODO do a real parameter parsing
+		if (args.length != 4)
+		 { usage(); }
+		
+		if (!args[0].equalsIgnoreCase("rollInSequence"))
+		 { usage(); }
+		
+		if (!args[2].equalsIgnoreCase("deleteRawdata"))
+		 { usage(); }
+			
+		if (args[1].equalsIgnoreCase("true"))
+		 { rollInSequence = true; }
+		else
+		 { rollInSequence = false; }
+		
+		if (args[3].equalsIgnoreCase("true"))
+		 { deleteRawdata = true; }
+		else
+		 { deleteRawdata = false; }
+		
+
+		
+		Calendar calendar = Calendar.getInstance();	
+		int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+		int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+		log.info("CurrentDay: " + currentDay);
+		log.info("currentHour" + currentHour);
+	
+		Path rootFolder = new Path(rollingFolder + "/hourly/") ;
+		
+		FileStatus[] daysFS = fs.listStatus(rootFolder);
+		for(FileStatus dayFS : daysFS)
+		{
+			try
+			{ 
+				log.info("dayFs:" + dayFS.getPath().getName());
+				int workingDay = Integer.parseInt(dayFS.getPath().getName());
+				
+				Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay) ;
+				FileStatus[] hoursFS = fs.listStatus(hourlySrc);
+				for(FileStatus hourFS : hoursFS)
+				{
+					String workinhHourStr = hourFS.getPath().getName();
+					int workingHour = Integer.parseInt(workinhHourStr);
+					if ( 	
+							(workingDay < currentDay) || // all previous days
+							( (workingDay == currentDay) &&  (workingHour < currentHour) ) // Up to the last hour
+						) 
+					{
+
+						buildHourlyFiles(chukwaMainRepository,tempDir,rollingFolder, workingDay,workingHour);
+						
+					} // End if ( (workingDay < currentDay) || ( (workingDay == currentDay) &&  (intHour < currentHour) ) )
+				} // End for(FileStatus hourFS : hoursFS)		
+			} // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+			catch(NumberFormatException e)
+			{ /* Not a standard Day directory skip */ }
+			
+		} // for(FileStatus dayFS : daysFS)		
+	}
+	
+	
+	public int run(String[] args) throws Exception
+	{
+		JobConf conf = new JobConf(getConf(), HourlyChukwaRecordRolling.class);
+
+		conf.setJobName("HourlyChukwa-Rolling");
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setReducerClass(IdentityReducer.class);
+
+				
+		conf.setOutputKeyClass(ChukwaRecordKey.class);
+		conf.setOutputValueClass(ChukwaRecord.class);
+		conf.setOutputFormat(SequenceFileOutputFormat.class);
+		
+		conf.set("mapred.compress.map.output", "true");
+		conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
+		conf.set("mapred.output.compress", "true");
+		conf.set("mapred.output.compression.type", "BLOCK");
+		
+		
+		log.info("HourlyChukwaRecordRolling input: " +  args[0] );
+		log.info("HourlyChukwaRecordRolling output: " +  args[1] );
+		
+		
+		FileInputFormat.setInputPaths(conf, args[0]);
+		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+		JobClient.runJob(conf);
+		return 0;
+	}
+	
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java Fri Dec  5 12:30:14 2008
@@ -23,17 +23,19 @@
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.util.Tool;
@@ -41,6 +43,11 @@
 
 public class MoveOrMergeRecordFile extends Configured implements Tool
 {
+	static ChukwaConfiguration conf = null;
+	static FileSystem fs = null;
+	static final String HadoopLogDir = "_logs";
+	static final String hadoopTempDir = "_temporary";
+	
 	public int run(String[] args) throws Exception
 	{
 		JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
@@ -54,9 +61,9 @@
 		//conf.setPartitionerClass(ChukwaPartitioner.class);
 		//conf.setOutputFormat(ChukwaOutputFormat.class);
 		
-		conf.setOutputKeyClass(Text.class);
+		conf.setOutputKeyClass(ChukwaRecordKey.class);
 		conf.setOutputValueClass(ChukwaRecord.class);
-		
+		conf.setOutputFormat(SequenceFileOutputFormat.class);
 		
 		FileInputFormat.setInputPaths(conf, args[0]);
 		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
@@ -65,23 +72,15 @@
 		return 0;
 	}
 
-	/**
-	 * @param args
-	 * @throws Exception 
-	 */
-	public static void main(String[] args) throws Exception
+	
+    static void moveOrMergeOneCluster(Path srcDir,String destDir) throws Exception
 	{
-		ChukwaConfiguration conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		FileSystem fs = FileSystem.get(new URI(fsName), conf);
-		Path srcDir = new Path(args[0]);
-		String destDir = args[1];
-		
+		System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + "," + destDir +")");
 		FileStatus fstat = fs.getFileStatus(srcDir);
 		
 		if (!fstat.isDir())
 		{
-			throw new IOException(args[0] + " is not a directory!");
+			throw new IOException(srcDir + " is not a directory!");
 		}
 		else
 		{
@@ -96,11 +95,6 @@
 				
 				String dirName = datasourceDirectory.getPath().getName();
 				
-				if (dirName.equals("_logs"))
-				{
-					continue;
-				}
-				
 				Path destPath = new Path(destDir + "/" + dirName);
 				System.out.println("dest directory path: " + destPath);
 				
@@ -121,7 +115,8 @@
 					if (!fs.exists(destFilePath))
 					{
 						System.out.println("Moving File: [" + destFilePath +"]");
-						fs.rename(eventFilePath, destFilePath);
+						// Copy to final Location 
+						FileUtil.copy(fs,eventFilePath,fs,destFilePath,false,false,conf);
 					}
 					else
 					{
@@ -132,33 +127,110 @@
 						// Create MR input Dir
 						fs.mkdirs(mrPath);
 						// Move Input files 
-						fs.rename(eventFilePath, new Path(strMrPath+"/1.done"));
-						fs.rename(destFilePath, new Path(strMrPath+"/2.done"));
+						FileUtil.copy(fs,eventFilePath,fs,new Path(strMrPath+"/1.evt"),false,false,conf);
+						fs.rename(destFilePath, new Path(strMrPath+"/2.evt"));
 						
 						// Merge
 						String[] mergeArgs = new String[2];
 						mergeArgs[0] = strMrPath;
 						mergeArgs[1] = strMrPath + "/mrOutput";
-						System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
-						int res = ToolRunner.run(new ChukwaConfiguration(),new MoveOrMergeRecordFile(), mergeArgs);
-						System.out.println("MR exit status: " + res);
-						if (res == 0)
-						{
-							System.out.println("\t Moving output file : to [" + destFilePath +"]");
-							fs.rename(new Path(mergeArgs[1]+"/part-00000"), destFilePath);
-						}
-						else
-						{
-							throw new RuntimeException("Error in M/R merge operation!");
-						}
+						DoMerge merge = new DoMerge(conf,fs,eventFilePath,destFilePath,mergeArgs);
+						merge.start();
 					}
 				}
 			}
 		}
-		System.out.println("Done with mapred main()");
+
+	}
+    
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	public static void main(String[] args) throws Exception
+	{
+		conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		fs = FileSystem.get(new URI(fsName), conf);
+		
+		Path srcDir = new Path(args[0]);
+		String destDir = args[1];
+		
+		
+		FileStatus fstat = fs.getFileStatus(srcDir);
+		
+		if (!fstat.isDir())
+		{
+			throw new IOException(srcDir + " is not a directory!");
+		}
+		else
+		{
+			FileStatus[] clusters = fs.listStatus(srcDir);
+			// Run a moveOrMerge on all clusters
+			String name = null;
+			for(FileStatus cluster : clusters)
+			{
+				name = cluster.getPath().getName();
+				// Skip hadoop outDir
+				if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
+				{
+					continue;
+				}
+				moveOrMergeOneCluster(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
+			}
+		}
+		System.out.println("Done with moveOrMerge main()");
 	}
 }
 
+
+class DoMerge extends Thread
+{
+	ChukwaConfiguration conf = null;
+	FileSystem fs = null;
+	String[] mergeArgs = new String[2];
+	Path destFilePath = null;
+	Path eventFilePath = null;
+	public DoMerge(ChukwaConfiguration conf,FileSystem fs,
+			Path eventFilePath,Path destFilePath,String[] mergeArgs)
+	{
+		this.conf = conf;
+		this.fs = fs;
+		this.eventFilePath = eventFilePath;
+		this.destFilePath = destFilePath;
+		this.mergeArgs = mergeArgs;
+	}
+	@Override
+	public void run()
+	{
+		System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
+		int res;
+		try
+		{
+			res = ToolRunner.run(new ChukwaConfiguration(),new MoveOrMergeRecordFile(), mergeArgs);
+			System.out.println("MR exit status: " + res);
+			if (res == 0)
+			{
+				System.out.println("\t Moving output file : to [" + destFilePath +"]");
+				FileUtil.copy(fs,new Path(mergeArgs[1]+"/part-00000"),fs,destFilePath,false,false,conf);
+				fs.rename(new Path(mergeArgs[1]+"/part-00000"), eventFilePath);
+			}
+			else
+			{
+				throw new RuntimeException("Error in M/R merge operation!");
+			}
+
+		} 
+		catch (Exception e)
+		{
+			e.printStackTrace();
+			throw new RuntimeException("Error in M/R merge operation!",e);
+		}
+	}
+	
+}
+
+
 class EventFileFilter implements PathFilter {
 	  public boolean accept(Path path) {
 	    return (path.toString().endsWith(".evt"));

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+// TODO
+// First version of the Spill
+// need some polishing
+
+public class MoveToRepository
+{
+	static Logger log = Logger.getLogger(MoveToRepository.class);
+	
+	static ChukwaConfiguration conf = null;
+	static FileSystem fs = null;
+	static final String HadoopLogDir = "_logs";
+	static final String hadoopTempDir = "_temporary";
+	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+	static Calendar calendar = Calendar.getInstance();
+	
+	static void processClutserDirectory(Path srcDir,String destDir) throws Exception
+	{
+		log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir +")");
+		FileStatus fstat = fs.getFileStatus(srcDir);
+		
+		if (!fstat.isDir())
+		{
+			throw new IOException(srcDir + " is not a directory!");
+		}
+		else
+		{
+			FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+			
+			for(FileStatus datasourceDirectory : datasourceDirectories)
+			{
+				log.info(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
+				if (!datasourceDirectory.isDir())
+				{
+					throw new IOException("Top level datasource directory should be a directory :" + datasourceDirectory.getPath());
+				}
+				
+				String dirName = datasourceDirectory.getPath().getName();
+				Path destPath = new Path(destDir + "/" + dirName);
+				log.info("dest directory path: " + destPath);
+				log.info("processClutserDirectory processing Datasource: (" + dirName +")");
+				processDatasourceDirectory(srcDir.getName(),datasourceDirectory.getPath(),destDir + "/" + dirName);
+			}
+		}
+	}
+	
+	static void processDatasourceDirectory(String cluster,Path srcDir,String destDir) throws Exception
+	{
+		String fileName = null;
+		int fileDay  = 0;
+		int fileHour = 0;
+		int fileMin  = 0;
+		
+		FileStatus[] recordFiles = fs.listStatus(srcDir);
+		for(FileStatus recordFile : recordFiles)
+		{
+			//   dataSource_20080915_18_15.1.evt
+			// <datasource>_<yyyyMMdd_HH_mm>.1.evt
+			
+			fileName = recordFile.getPath().getName();
+			log.info("processDatasourceDirectory processing RecordFile: (" + fileName +")");
+			log.info("fileName: " + fileName);
+			
+			
+			int l = fileName.length();
+			String dataSource = srcDir.getName();
+			log.info("Datasource: " + dataSource);
+			
+			if (fileName.endsWith(".D.evt"))
+			{
+				// Hadoop_dfs_datanode_20080919.D.evt
+				
+				fileDay = Integer.parseInt(fileName.substring(l-14,l-6));
+				writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),dataSource + "_" +fileDay);
+				// mark this directory for Daily rotate (re-process)
+				addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
+			}
+			else if (fileName.endsWith(".H.evt"))
+			{ 
+				// Hadoop_dfs_datanode_20080925_1.H.evt
+				// Hadoop_dfs_datanode_20080925_12.H.evt
+				
+				String day = null;
+			    String hour = null;
+			    if (fileName.charAt(l-8) == '_')
+			    {
+			    	day = fileName.substring(l-16,l-8);
+			    	log.info("day->" + day);
+			    	hour = "" +fileName.charAt(l-7);
+			    	log.info("hour->" +hour);
+			    }
+			    else
+			    {
+			    	day = fileName.substring(l-17,l-9);
+			    	log.info("day->" +day);
+			    	hour = fileName.substring(l-8,l-6);
+			    	log.info("hour->" +hour);
+			    }
+			    fileDay = Integer.parseInt(day);
+			    fileHour = Integer.parseInt(hour);
+			    // rotate there so spill
+				writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/", recordFile.getPath(),dataSource + "_"  +fileDay+ "_" + fileHour );
+				// mark this directory for daily rotate
+				addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
+			}
+			else if (fileName.endsWith(".R.evt"))
+			{
+				if (fileName.charAt(l-11) == '_')
+				{
+					fileDay = Integer.parseInt(fileName.substring(l-19,l-11));
+					fileHour = Integer.parseInt(""+fileName.charAt(l-10));
+					fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
+				}
+				else
+				{
+					fileDay = Integer.parseInt(fileName.substring(l-20,l-12));
+					fileHour = Integer.parseInt(fileName.substring(l-11,l-9));
+					fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
+				}
+
+				log.info("fileDay: " + fileDay);
+				log.info("fileHour: " + fileHour);
+				log.info("fileMin: " + fileMin);
+				writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin, recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour +"_" +fileMin);
+				// mark this directory for hourly rotate
+				addDirectory4Rolling( false,fileDay , fileHour, cluster , dataSource);
+			}
+			else
+			{
+				throw new RuntimeException("Wrong fileName format! [" + fileName+"]");
+			}
+		}
+	}
+			
+	static void addDirectory4Rolling(boolean isDailyOnly, int day,int hour,String cluster, String dataSource) throws IOException
+	{
+		// TODO get root directory from config
+		String rollingDirectory = "/chukwa/rolling/";
+		
+		Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster +"/" + dataSource);
+		if (!fs.exists(path))
+			{ fs.mkdirs(path);}
+		
+		if (!isDailyOnly)
+		{
+			path = new Path(rollingDirectory + "/hourly/" + day + "/"  + hour + "/" + cluster +"/" + dataSource);
+			if (!fs.exists(path))
+				{ fs.mkdirs(path);}
+		}
+	}
+	
+	static void writeRecordFile(String destDir,Path recordFile,String fileName) throws IOException
+	{
+		boolean done = false;
+		int count = 1;
+		do
+		{
+			Path destDirPath = new Path(destDir );
+			Path destFilePath = new Path(destDir + "/" + fileName + "." + count + ".evt" );
+			
+			if (!fs.exists(destDirPath))
+			{
+				fs.mkdirs(destDirPath);
+				log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+			}
+			
+			if (!fs.exists(destFilePath))
+			{
+				log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "+ destFilePath);
+				//fs.rename(recordFile,destFilePath);
+				FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
+				//FileUtil.replaceFile(new File(recordFile.toUri()), new File(destFilePath.toUri()));
+				done = true;
+				log.info(">>>>>>>>>>>> after Rename" + destFilePath);
+			}
+			else
+			{
+				log.info("Start MoveToRepository main()");
+			}
+			count ++;
+			// Just put a limit here
+			// TODO read from config
+			if (count > 1000)
+			{
+				throw new IOException("too many files in this directory: " + destDir);
+			}
+		} while (!done);
+	}
+	
+	static boolean checkRotate(String directoryAsString, boolean createDirectoryIfNotExist) throws IOException
+	{
+		Path directory = new Path(directoryAsString);
+		boolean exist = fs.exists(directory);
+	
+		if (! exist )
+		{
+			if (createDirectoryIfNotExist== true)
+				{ fs.mkdirs(directory); }
+			return false;
+		}
+		else
+		{
+			return fs.exists(new Path(directoryAsString + "/rotateDone"));
+		}
+	}
+				
+	/**
+	 * @param args
+	 * @throws Exception 
+	 */
+	public static void main(String[] args) throws Exception
+	{
+		conf = new ChukwaConfiguration();
+		String fsName = conf.get("writer.hdfs.filesystem");
+		fs = FileSystem.get(new URI(fsName), conf);
+		
+		Path srcDir = new Path(args[0]);
+		String destDir = args[1];
+		
+		log.info("Start MoveToRepository main()");
+		
+		FileStatus fstat = fs.getFileStatus(srcDir);
+		
+		if (!fstat.isDir())
+		{
+			throw new IOException(srcDir + " is not a directory!");
+		}
+		else
+		{
+			FileStatus[] clusters = fs.listStatus(srcDir);
+			// Run a moveOrMerge on all clusters
+			String name = null;
+			for(FileStatus cluster : clusters)
+			{
+				name = cluster.getPath().getName();
+				// Skip hadoop M/R outputDir
+				if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
+				{
+					continue;
+				}
+				log.info("main procesing Cluster (" + cluster.getPath().getName() +")");
+				processClutserDirectory(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
+				
+				// Delete the demux's cluster dir 
+				FileUtil.fullyDelete(fs,cluster.getPath());
+			}
+		}
+		
+		log.info("Done with MoveToRepository main()");
+
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+public class RecordMerger extends Thread
+{
+	static Logger log = Logger.getLogger(RecordMerger.class);
+	ChukwaConfiguration conf = null;
+	FileSystem fs = null;
+	String[] mergeArgs = null;
+	Tool tool = null;
+	boolean deleteRawData = false;
+	
+	public RecordMerger(ChukwaConfiguration conf,FileSystem fs,Tool tool,String[] mergeArgs,boolean deleteRawData)
+	{
+		this.conf = conf;
+		this.fs = fs;
+		this.tool = tool;
+		this.mergeArgs = mergeArgs;
+		this.deleteRawData = deleteRawData;
+	}
+	@Override
+	public void run()
+	{
+		System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
+		int res;
+		try
+		{
+			res = ToolRunner.run(conf,tool, mergeArgs);
+			System.out.println("MR exit status: " + res);
+			if (res == 0)
+			{
+				writeRecordFile(mergeArgs[1]+"/part-00000",mergeArgs[2],mergeArgs[3]);
+				
+				// delete input
+				if (deleteRawData)
+				 { 
+					FileUtil.fullyDelete(fs,new Path(mergeArgs[0])); 
+					
+					Path hours = new Path(mergeArgs[2]) ;
+					FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
+					for(FileStatus hourOrMinuteFS : hoursOrMinutesFS)
+					{
+						String dirName = hourOrMinuteFS.getPath().getName();
+						
+						try
+						{ 
+							Integer.parseInt(dirName);
+							FileUtil.fullyDelete(fs,new Path(mergeArgs[2] + "/" + dirName)); 
+							if (log.isDebugEnabled() )
+								{ log.debug("Deleting Hour directory: " + mergeArgs[2] + "/" + dirName); }
+						}
+						catch(NumberFormatException e) { /* Not an Hour or Minutes directory- Do nothing */ }
+					}
+				 }
+				
+				// delete rolling tag
+				FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
+				// delete M/R temp directory
+				FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
+			}
+			else
+			{
+				throw new RuntimeException("Error in M/R merge operation!");
+			}
+
+		} 
+		catch (Exception e)
+		{
+			e.printStackTrace();
+			throw new RuntimeException("Error in M/R merge operation!",e);
+		}
+	}
+	
+	
+	void writeRecordFile(String input,String outputDir,String fileName) throws IOException
+	{
+		boolean done = false;
+		int count = 1;
+		Path recordFile = new Path(input);
+		do
+		{
+			Path destDirPath = new Path(outputDir );
+			Path destFilePath = new Path(outputDir + "/" + fileName + "." + count + ".evt" );
+			
+			if (!fs.exists(destDirPath))
+			{
+				fs.mkdirs(destDirPath);
+				log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+			}
+			
+			if (!fs.exists(destFilePath))
+			{
+				boolean res = fs.rename(recordFile,destFilePath);
+				
+				if (res == false)
+				{
+					log.info(">>>>>>>>>>>> Use standard copy rename failded");
+					FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
+				}
+				done = true;
+			}
+			else
+			{
+				log.info("Start MoveToRepository main()");
+			}
+			count ++;
+			// Just put a limit here
+			// TODO read from config
+			if (count > 1000)
+			{
+				throw new IOException("too many files in this directory: " + destDirPath);
+			}
+		} while (!done);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,33 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ChukwaOutputCollector implements OutputCollector<ChukwaRecordKey, ChukwaRecord>
+{
+  private OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector = null;
+  private Reporter reporter = null;
+  private String groupName = null;
+  
+  public ChukwaOutputCollector(String groupName,OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector,Reporter reporter)
+  {
+    this.reporter = reporter;
+    this.outputCollector = outputCollector;
+    this.groupName = groupName;
+  }
+
+  @Override
+  public void collect(ChukwaRecordKey key, ChukwaRecord value)
+      throws IOException
+  {
+    this.outputCollector.collect(key, value);
+    reporter.incrCounter(groupName, "total records", 1);
+    reporter.incrCounter(groupName,  key.getReduceType() +" records" , 1);
+  }
+
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+
+public class Util
+{
+	static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
+	
+	static Calendar calendar = Calendar.getInstance();
+	static int currentDay = 0;
+	static int currentHour = 0;
+	
+	static
+	{
+		synchronized(calendar)
+		{
+			calendar.setTimeInMillis( System.currentTimeMillis());
+			currentDay = Integer.parseInt(day.format(calendar.getTime()));
+			currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+		}
+	}
+	
+	public static String generateTimeOutput(long timestamp)
+	{
+		int workingDay = 0;
+		int workingHour = 0;
+
+		String output = null;
+
+		int minutes = 0;
+		synchronized(calendar)
+		{
+			calendar.setTimeInMillis( timestamp);
+			workingDay = Integer.parseInt(day.format(calendar.getTime()));
+			workingHour = calendar.get(Calendar.HOUR_OF_DAY);
+			minutes = calendar.get(Calendar.MINUTE);
+		}
+		
+		if (workingDay != currentDay)
+		{
+			output = "_" + workingDay + ".D.evt";
+		}
+		else
+		{
+			if (workingHour != currentHour)
+			{
+				output = "_" +workingDay + "_" +  workingHour + ".H.evt";
+			}
+			else
+			{
+				output = "_" + workingDay + "_" +  workingHour + "_";
+				int dec = minutes/10;
+				output +=  dec ;
+
+				int m = minutes - (dec*10);
+				if (m < 5) 
+				{ 
+					output += "0.R.evt";
+				} 
+				else 
+				{
+					output += "5.R.evt";
+				}
+			}
+		}
+
+
+		return output;
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Fri Dec  5 12:30:14 2008
@@ -20,78 +20,125 @@
 
 import java.util.Calendar;
 
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 import org.apache.hadoop.chukwa.util.RecordConstants;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
 
-public abstract class  AbstractProcessor implements ChunkProcessor
+public abstract class AbstractProcessor implements MapProcessor
 {
-	Calendar calendar = Calendar.getInstance();
-	Chunk chunk = null;
-	byte[] bytes;
-	int[] recordOffsets ;
-	int currentPos = 0;
-	int startOffset = 0;
-	Text key = new Text();
-	
-	public AbstractProcessor()
-	{}
-	
-	protected abstract void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output, Reporter reporter);
-	
-	
-	public void process(Chunk chunk,OutputCollector<Text, ChukwaRecord> output, Reporter reporter)	{
-		reset(chunk);
-		while (hasNext()) {
-			parse(nextLine(), output, reporter);
-		}
-	}
-	
-	
-	protected void buildGenericRecord(ChukwaRecord record, String body,long timestamp,String dataSource)	{
-		calendar.setTimeInMillis( timestamp);
-		String fileName = dataSource + "/" + dataSource + new java.text.SimpleDateFormat("_yyyy_MM_dd_HH").format(calendar.getTime());
-		int minutes = calendar.get(Calendar.MINUTE);
-		int dec = minutes/10;
-		fileName += "_" + dec ;
-		
-		int m = minutes - (dec*10);
-		if (m < 5) { 
-		  fileName += "0.evt";
-		} else {
-		  fileName += "5.evt";
-		}
-
-		record.setTime(timestamp);
-		record.add(Record.rawField, body);
-		record.add(Record.dataSourceField, dataSource);
-		record.add(Record.destinationField, fileName);
-		record.add(Record.sourceField, chunk.getSource());
-		record.add(Record.streamNameField, chunk.getStreamName());
-		record.add(Record.typeField, chunk.getDataType());
-	}
-
-	
-	protected void reset(Chunk chunk)	{
-		this.chunk = chunk;
-		this.bytes = chunk.getData();
-		this.recordOffsets = chunk.getRecordOffsets();
-		currentPos = 0;
-		startOffset = 0;
-	}
-	
-	protected boolean hasNext() {
-		return (currentPos < recordOffsets.length);
-	}
-	
-	protected String nextLine()	{
-		String log = new String(bytes,startOffset,(recordOffsets[currentPos]-startOffset));
-		startOffset = recordOffsets[currentPos] + 1;
-		currentPos ++;
-		return RecordConstants.recoverRecordSeparators("\n", log);
-	}
+  static Logger log = Logger.getLogger(AbstractProcessor.class);
+  
+  Calendar calendar = Calendar.getInstance();
+  byte[] bytes;
+  int[] recordOffsets;
+  int currentPos = 0;
+  int startOffset = 0;
+
+  ChukwaArchiveKey archiveKey = null;
+  ChukwaRecordKey key = new ChukwaRecordKey();
+  Chunk chunk = null;
+
+  boolean chunkInErrorSaved = false;
+  OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
+  Reporter reporter = null;
+
+  public AbstractProcessor()
+  {
+  }
+
+  protected abstract void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable;
+
+  protected void saveChunkInError(Throwable throwable)
+  {
+    if (chunkInErrorSaved == false)
+    {
+      try
+      {
+        ChunkSaver.saveChunk(chunk, throwable, output, reporter);
+        chunkInErrorSaved = true;
+      } catch (Exception e)
+      {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+  public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+  {
+    chunkInErrorSaved = false;
+    
+    this.archiveKey = archiveKey;
+    this.output = output;
+    this.reporter = reporter;
+    
+    reset(chunk);
+    
+    while (hasNext())
+    {
+      try
+      {
+        parse(nextLine(), output, reporter);
+      } catch (Throwable e)
+      {
+        saveChunkInError(e);
+      }
+    }
+  }
+
+  protected void buildGenericRecord(ChukwaRecord record, String body,
+      long timestamp, String dataSource)
+  {
+    calendar.setTimeInMillis(timestamp);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MILLISECOND, 0);
+
+    key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
+        + timestamp);
+    key.setReduceType(dataSource);
+
+    if (body != null)
+    {
+      record.add(Record.bodyField, body);
+    }
+    record.setTime(timestamp);
+
+    record.add(Record.tagsField, chunk.getTags());
+    record.add(Record.sourceField, chunk.getSource());
+    record.add(Record.applicationField, chunk.getApplication());
+
+  }
+
+  protected void reset(Chunk chunk)
+  {
+    this.chunk = chunk;
+    this.bytes = chunk.getData();
+    this.recordOffsets = chunk.getRecordOffsets();
+    currentPos = 0;
+    startOffset = 0;
+  }
+
+  protected boolean hasNext()
+  {
+    return (currentPos < recordOffsets.length);
+  }
+
+  protected String nextLine()
+  {
+    String log = new String(bytes, startOffset, (recordOffsets[currentPos]
+        - startOffset + 1));
+    startOffset = recordOffsets[currentPos] + 1;
+    currentPos++;
+    return RecordConstants.recoverRecordSeparators("\n", log);
+  }
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.Calendar;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class ChunkSaver
+{
+  static Logger log = Logger.getLogger(ChunkSaver.class);
+  public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+  {
+    try
+    {
+      reporter.incrCounter("DemuxError", "count", 1);
+      reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1);
+ 
+      ChukwaRecord record = new ChukwaRecord();
+      long ts = System.currentTimeMillis();
+      Calendar calendar = Calendar.getInstance();
+      calendar.setTimeInMillis(ts);
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      ChukwaRecordKey key = new ChukwaRecordKey();
+      key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getDataType()
+          + "/" + chunk.getSource() + "/" + ts);
+      key.setReduceType(chunk.getDataType() + "InError");
+
+      record.setTime(ts);
+
+      record.add(Record.tagsField, chunk.getTags());
+      record.add(Record.sourceField, chunk.getSource());
+      record.add(Record.applicationField, chunk.getApplication());
+
+      DataOutputBuffer ob = new DataOutputBuffer(chunk
+          .getSerializedSizeEstimate());
+      chunk.write(ob);
+      record.add(Record.chunkDataField, new String(ob.getData()));
+      record.add(Record.chunkExceptionField, ExceptionUtil
+          .getStackTrace(throwable));
+      output.collect(key, record);
+
+      return record;
+    }
+    catch (Throwable e) 
+    {
+      e.printStackTrace();
+      try
+      {
+        log.warn("Unable to save a chunk: tags: " 
+            + chunk.getTags()   + " - source:"
+            + chunk.getSource() + " - dataType: "
+            + chunk.getDataType() + " - Stream: " 
+            + chunk.getStreamName() + " - SeqId: "
+            + chunk.getSeqID() + " - Data: " 
+            + new String(chunk.getData()) );
+      }
+      catch (Throwable e1) 
+      {
+        e.printStackTrace();
+      }
+    }
+    return null;
+  }
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java Fri Dec  5 12:30:14 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 public class DFInvalidRecord extends Exception

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java Fri Dec  5 12:30:14 2008
@@ -21,7 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
@@ -32,14 +32,14 @@
 	public static final String recordType = "Debug";
 		
 	@Override
-	public void parse(String line, OutputCollector<Text, ChukwaRecord> output,
+	public void parse(String line, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
 			Reporter reporter)
 	{
 		log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
 		
 		ChukwaRecord record = new ChukwaRecord();
 		buildGenericRecord(record,line, System.currentTimeMillis(),recordType);
-		key.set("" + chunk.getSeqID());
+		key.setKey("" + chunk.getSeqID());
 		try
 		{
 			output.collect(key, record);

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,32 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class DefaultProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(DefaultProcessor.class);
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		try
+		{
+			ChukwaRecord record = new ChukwaRecord();
+			this.buildGenericRecord(record, recordEntry, archiveKey.getTimePartition(), chunk.getDataType());
+			output.collect(key, record);
+		} 
+		catch (IOException e)
+		{
+			log.warn("Unable to collect output in DefaultProcessor ["
+					+ recordEntry + "]", e);
+			e.printStackTrace();
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Df extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(Df.class);
+	private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
+			"Used", "Available", "Use%", "Mounted", "on" };
+	private static final String[] headerCols = { "Filesystem", "1K-blocks",
+    "Used", "Available", "Use%", "Mounted on" };
+	private SimpleDateFormat sdf = null;
+
+	public Df()
+	{
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	throws Throwable
+	{
+	  
+		try
+		{
+			String dStr = recordEntry.substring(0, 23);
+			int start = 24;
+			int idx = recordEntry.indexOf(' ', start);
+			// String level = recordEntry.substring(start, idx);
+			start = idx + 1;
+			idx = recordEntry.indexOf(' ', start);
+			// String className = recordEntry.substring(start, idx-1);
+			String body = recordEntry.substring(idx + 1);
+
+			Date d = sdf.parse(dStr);
+			String[] lines = body.split("\n");
+
+			String[] outputCols = lines[0].split("[\\s]++");
+			
+			if (outputCols.length != headerSplitCols.length
+					|| outputCols[0].intern() != headerSplitCols[0].intern()
+					|| outputCols[1].intern() != headerSplitCols[1].intern()
+					|| outputCols[2].intern() != headerSplitCols[2].intern()
+					|| outputCols[3].intern() != headerSplitCols[3].intern()
+					|| outputCols[4].intern() != headerSplitCols[4].intern()
+					|| outputCols[5].intern() != headerSplitCols[5].intern()
+			    || outputCols[6].intern() != headerSplitCols[6].intern()
+			    )
+			{
+			  throw new DFInvalidRecord("Wrong output format (header) ["
+						+ recordEntry + "]");
+			}
+
+			String[] values = null;
+
+			// Data
+			ChukwaRecord record = null;
+
+			for (int i = 1; i < lines.length; i++)
+			{
+				values = lines[i].split("[\\s]++");
+				key = new ChukwaRecordKey();
+				record = new ChukwaRecord();
+				this.buildGenericRecord(record, null, d.getTime(), "Df");
+
+				record.add(headerCols[0], values[0]);
+				record.add(headerCols[1], values[1]);
+				record.add(headerCols[2], values[2]);
+				record.add(headerCols[3], values[3]);
+				record.add(headerCols[4], values[4].substring(0, values[4].length()-1)); // Remove %
+				record.add(headerCols[5], values[5]);
+
+				output.collect(key, record);
+			}
+
+			//log.info("DFProcessor output 1 DF record");
+		} catch (ParseException e)
+		{
+			e.printStackTrace();
+			log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+			throw e;
+		} catch (IOException e)
+		{
+			e.printStackTrace();
+			log.warn("Unable to collect output in DFProcessor [" + recordEntry
+					+ "]", e);
+			throw e;
+		} catch (DFInvalidRecord e)
+		{
+			e.printStackTrace();
+			log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+			throw e;
+		}
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java Fri Dec  5 12:30:14 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 public class DuplicateProcessorException extends RuntimeException

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Fri Dec  5 12:30:14 2008
@@ -22,12 +22,9 @@
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.Record;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
@@ -41,11 +38,7 @@
 	private static final String recordType = "HadoopLog";
 	private static final String nameNodeType = "NameNode";
 	private static final String dataNodeType = "DataNode";
-	
-	private static String regex= null;
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
+	private static final String auditType = "Audit";
 	private SimpleDateFormat sdf = null;
 	
 	
@@ -53,54 +46,46 @@
 	{
 		//TODO move that to config
 		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): ((.*)\n*)*\\z";
-		p = Pattern.compile(regex);
-		matcher = p.matcher("-");
 	}
 	
 	@Override
-	public void parse(String line, OutputCollector<Text, ChukwaRecord> output,
+	public void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
 			Reporter reporter)
+	 throws Throwable
 	{
-		log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
-		
-		ChukwaRecord record = new ChukwaRecord();
-		
-		matcher.reset(line);
-		if (matcher.matches())
-		{
 			try
 			{
-				Date d = sdf.parse( matcher.group(0).trim());
-				if (this.chunk.getStreamName().indexOf("datanode") > 0)
-				{
-					buildGenericRecord(record,line,d.getTime(),dataNodeType);
-				}
-				else if (this.chunk.getStreamName().indexOf("namenode") > 0)
-				{
-					buildGenericRecord(record,line,d.getTime(),nameNodeType);
-				}
-				else
-				{
-					buildGenericRecord(record,line,d.getTime(),recordType);
+				String dStr = recordEntry.substring(0, 23);
+				Date d = sdf.parse(dStr);
+				ChukwaRecord record = new ChukwaRecord();
+
+				if (this.chunk.getStreamName().indexOf("datanode") > 0) {
+					buildGenericRecord(record,recordEntry,d.getTime(),dataNodeType);
+				} else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
+					buildGenericRecord(record,recordEntry,d.getTime(),nameNodeType);
+				} else if (this.chunk.getStreamName().indexOf("audit") > 0) {
+					buildGenericRecord(record,recordEntry,d.getTime(),auditType);
+				} else {
+					buildGenericRecord(record,recordEntry,d.getTime(),recordType);
 				}
 				
-				key.set("" + d.getTime());
-				record.add(Record.logLevelField, "" +matcher.group(2));
-				record.add(Record.classField, "" +matcher.group(3));
-				record.add(Record.bodyField, "" +matcher.group(4));
+				
 				output.collect(key, record);
 			}
 			catch (ParseException e)
 			{
+				log.warn("Unable to parse the date in DefaultProcessor ["
+						+ recordEntry + "]", e);
 				e.printStackTrace();
+				throw e;
 			}
 			catch (IOException e)
 			{
+				log.warn("Unable to collect output in DefaultProcessor ["
+						+ recordEntry + "]", e);
 				e.printStackTrace();
-			}
-		}
-		
+				throw e;
+			}		
 	}
 
 	public String getDataType()

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Fri Dec  5 12:30:14 2008
@@ -1,95 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Date;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.Iterator;
 
-import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
 
-public class HadoopMetricsProcessor extends AbstractProcessor 
-{
+public class HadoopMetricsProcessor extends AbstractProcessor {
 
 	static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
-	
-	private static String regex= null;
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
+	static final String chukwaTimestampField = "chukwa_timestamp";
+	static final String contextNameField = "contextName";
+	static final String recordNameField = "recordName";
+
 	private SimpleDateFormat sdf = null;
 
-	public HadoopMetricsProcessor()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
-		p = Pattern.compile(regex);
-		matcher = p.matcher("-");
+	public HadoopMetricsProcessor() {
+		// TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
 	}
-	
-	
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
-  {
-	  
-	  matcher.reset(recordEntry);
-		if (matcher.matches())
-		{
-			log.info("HadoopMetricsProcessor Matches");
-			
-			try
-			{
-				Date d = sdf.parse( matcher.group(0).trim());
-				key.set("" + d.getTime());
-				String body = matcher.group(4);
-				
-				 String[] kvpairs = body.split(" ");
-				    
-				// database
-				DatabaseHelper databaseRecord = new DatabaseHelper("HadoopMetrics");
-				
-			    for(int i = 1 ; i < kvpairs.length; ++i) 
-			    {
-			      String kvpair =  kvpairs[i];
-			      String[] halves = kvpair.split("=");
-			      if(halves[0].equals("chukwa_timestamp"))
-			      {
-			        key.set(halves[1]);
-			      }
-			      else
-			      {
-			    	  databaseRecord.add(d.getTime(), halves[0], halves[1]);
-			      }
-			    }
-			    
-			    //Output NodeActivity info to database
-				output.collect(key, databaseRecord.buildChukwaRecord());
-				log.info("HadoopMetricsProcessor output 1 Hadoop's Metric to database");
-			}
-			catch (ParseException e)
-			{
-				e.printStackTrace();
-				log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", e);
-			}
-			catch (IOException e)
-			{
-				e.printStackTrace();
-				log.warn("Unable to collect output in HadoopMetricsProcessor [" + recordEntry + "]", e);
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable {
+		try {
+			String dStr = recordEntry.substring(0, 23);
+			int start = 24;
+			int idx = recordEntry.indexOf(' ', start);
+			// String level = recordEntry.substring(start, idx);
+			start = idx + 1;
+			idx = recordEntry.indexOf(' ', start);
+			// String className = recordEntry.substring(start, idx-1);
+			String body = recordEntry.substring(idx + 1);
+			body.replaceAll("\n", "");
+			// log.info("record [" + recordEntry + "] body [" + body +"]");
+			Date d = sdf.parse(dStr);
+
+			JSONObject json = new JSONObject(body);
+
+			ChukwaRecord record = new ChukwaRecord();
+			String datasource = null;
+			String recordName = null;
+
+			Iterator<String> ki = json.keys();
+			while (ki.hasNext()) {
+				String keyName = ki.next();
+				if (chukwaTimestampField.intern() == keyName.intern()) {
+					d = new Date(json.getLong(keyName));
+					Calendar cal = Calendar.getInstance();
+					cal.setTimeInMillis(System.currentTimeMillis());
+					cal.set(Calendar.SECOND, 0);
+					cal.set(Calendar.MILLISECOND, 0);
+					d.setTime(cal.getTimeInMillis());
+				} else if (contextNameField.intern() == keyName.intern()) {
+					datasource = "Hadoop_" + json.getString(keyName);
+				} else if (recordNameField.intern() == keyName.intern()) {
+					recordName = json.getString(keyName);
+					record.add(keyName, json.getString(keyName));
+				} else {
+					record.add(keyName, json.getString(keyName));
+				}
 			}
+
+			datasource = datasource + "_" + recordName;
+			buildGenericRecord(record, null, d.getTime(), datasource);
+			output.collect(key, record);
+		} catch (ParseException e) {
+			e.printStackTrace();
+			log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
+					+ "]", e);
+			throw e;
+		} catch (IOException e) {
+			e.printStackTrace();
+			log.warn("Unable to collect output in HadoopMetricsProcessor ["
+					+ recordEntry + "]", e);
+			throw e;
+		} catch (JSONException e) {
+			e.printStackTrace();
+			log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
+					+ "]", e);
+			throw e;
 		}
-  }
 
+	}
 
-	public String getDataType()
-	{
+	public String getDataType() {
 		return HadoopMetricsProcessor.class.getName();
 	}
 

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Fri Dec  5 12:30:14 2008
@@ -23,10 +23,8 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.Record;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
@@ -45,23 +43,19 @@
 	public Iostat()
 	{
 		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
 		p = Pattern.compile(regex);
 	}
 
 	@Override
-	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
 			Reporter reporter)
+  throws Throwable
 	{
 		
 		log.debug("Iostat record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		StringBuilder sb = new StringBuilder(); 	 
 		int i = 0;
-		
-		String logLevel = null;
-		String className = null;
-		String body = null;
-		
+
 		matcher=p.matcher(recordEntry);
 		while (matcher.find())
 		{
@@ -71,55 +65,54 @@
 			{
 				Date d = sdf.parse( matcher.group(1).trim());
 				
-				logLevel = matcher.group(2);
-				className = matcher.group(3);
-				String hostname = matcher.group(5);
-				
-				//TODO create a more specific key structure
-				// part of ChukwaArchiveKey + record index if needed
-				key.set("" + d.getTime());
+
 				
 				String[] lines = recordEntry.split("\n");
-				int skip=0;
-				i++;
 				String[] headers = null;
-				while (skip<2 && i < lines.length) {
-					// Skip the first output because the numbers are averaged from system boot up
-					if(lines[i].indexOf("avg-cpu:")>0) {
-						skip++;
-					}
-					i++;					
+				for(int skip=0;skip<2;skip++) {
+				    i++;
+					while ( i < lines.length && lines[i].indexOf("avg-cpu")<0) {
+					    // Skip the first output because the numbers are averaged from system boot up
+					    log.debug("skip line:"+lines[i]);
+					    i++;					
+				    }
 				}
 				while (i < lines.length)
 				{
-					DatabaseHelper databaseRecord = null;
-					if(lines[i].equals("")) {
-						i++;
+					ChukwaRecord record = null;
+					
+					if(lines[i].indexOf("avg-cpu")>=0 || lines[i].indexOf("Device")>=0) {
 						headers = parseHeader(lines[i]);
 						i++;
 					}
 					String data[] = parseData(lines[i]);
 					if(headers[0].equals("avg-cpu:")) {
 						log.debug("Matched CPU-Utilization");
-						databaseRecord = new DatabaseHelper("system");
+						record = new ChukwaRecord();
+					  key = new ChukwaRecordKey();
+					  buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
 					} else if(headers[0].equals("Device:")) {
 						log.debug("Matched Iostat");
-						databaseRecord = new DatabaseHelper("system");	
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+					  buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
 					} else {
 						log.debug("No match:"+headers[0]);
 					}
-					if(databaseRecord!=null) {
+					if(record!=null) {
 						int j=0;
 						log.debug("Data Length: " + data.length);
-	                    while(j<data.length) {
-	                    	log.debug("header:"+headers[j]+" data:"+data[j]);
-	                    	if(!headers[j].equals("avg-cpu:")) {
-						        databaseRecord.add(d.getTime(),headers[j],data[j]);
-	                    	}
-						    j++;
-	                    }						
-						//Output Sar info to database
-						output.collect(key, databaseRecord.buildChukwaRecord());
+			            while(j<data.length) {
+			            	log.debug("header:"+headers[j]+" data:"+data[j]);
+			            	if(!headers[j].equals("avg-cpu:")) {
+			            		record.add(headers[j],data[j]);
+			            	}
+						  j++;
+			            }
+			            record.setTime(d.getTime());
+			            if(data.length>3) {
+						    output.collect(key, record);
+			            }
 					}
 					i++;
 				}
@@ -127,6 +120,7 @@
 			} catch (Exception e)
 			{
 				e.printStackTrace();
+				throw e;
 			}
 		}
 	}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,120 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Calendar;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+public class JobConfProcessor extends AbstractProcessor {
+	static Logger log = Logger.getLogger(JobConfProcessor.class);
+	static  Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
+    static  Pattern hodPattern = Pattern.compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
+    static  Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) 
+	 throws Throwable
+	{
+		Long time = 0L;
+		Random randomNumber = new Random();
+		String tags = this.chunk.getTags();
+
+		Matcher matcher = timePattern.matcher(tags);
+		if (matcher.matches()) {
+			time = Long.parseLong(matcher.group(2));
+		}
+		String capp = this.chunk.getApplication();
+    	String hodID = "";
+    	String jobID = "";
+        matcher = hodPattern.matcher(capp);
+        if(matcher.matches()) {
+        	hodID=matcher.group(3);
+        }
+        matcher = jobPattern.matcher(capp);
+        if(matcher.matches()) {
+        	jobID=matcher.group(2);
+        }
+		ChukwaRecord record = new ChukwaRecord();
+	    DocumentBuilderFactory docBuilderFactory 
+		    = DocumentBuilderFactory.newInstance();
+	    //ignore all comments inside the xml file
+	    docBuilderFactory.setIgnoringComments(true);
+	    try {
+	        DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ 	        Document doc = null;
+ 	        String fileName = "test_"+randomNumber.nextInt();
+ 	        File tmp = new File(fileName);
+ 	        FileOutputStream out = new FileOutputStream(tmp);
+ 	        out.write(recordEntry.getBytes());
+ 	        out.close();
+		    doc = builder.parse(fileName);
+		    Element root = doc.getDocumentElement();
+		    if (!"configuration".equals(root.getTagName()))
+		        log.fatal("bad conf file: top-level element not <configuration>");
+		    NodeList props = root.getChildNodes();
+		
+		    for (int i = 0; i < props.getLength(); i++) {
+		        Node propNode = props.item(i);
+		        if (!(propNode instanceof Element))
+		            continue;
+		        Element prop = (Element)propNode;
+		        if (!"property".equals(prop.getTagName()))
+		            log.warn("bad conf file: element not <property>");
+		        NodeList fields = prop.getChildNodes();
+		        String attr = null;
+		        String value = null;
+		        boolean finalParameter = false;
+		        for (int j = 0; j < fields.getLength(); j++) {
+		            Node fieldNode = fields.item(j);
+		            if (!(fieldNode instanceof Element))
+		                continue;
+		            Element field = (Element)fieldNode;
+		            if ("name".equals(field.getTagName()) && field.hasChildNodes())
+		                attr = ((Text)field.getFirstChild()).getData().trim();
+		            if ("value".equals(field.getTagName()) && field.hasChildNodes())
+		                value = ((Text)field.getFirstChild()).getData();
+		            if ("final".equals(field.getTagName()) && field.hasChildNodes())
+		                finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
+		        }
+		        
+		        // Ignore this parameter if it has already been marked as 'final'
+		        if (attr != null && value != null) {
+		            record.add(attr, value);
+		        }
+		    }
+		    buildGenericRecord(record,null, time,"JobConf");
+			calendar.setTimeInMillis(time);
+			calendar.set(Calendar.MINUTE, 0);
+			calendar.set(Calendar.SECOND, 0);
+			calendar.set(Calendar.MILLISECOND, 0);			
+			key.setKey(""+ calendar.getTimeInMillis() + "/" + hodID +"." + jobID + "/" + time);
+				        
+            output.collect(key,record);
+            tmp.delete();
+	    } catch(Exception e) {
+	        e.printStackTrace();	
+	        throw e;
+	    }
+	}
+	
+	public String getDataType() {
+		return Torque.class.getName();
+	}
+}



Mime
View raw message