hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r723855 [8/23] - in /hadoop/core/trunk: ./ src/contrib/ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/hadoop-packaging/ src/contrib/chukwa/lib...
Date Fri, 05 Dec 2008 20:30:21 GMT
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Fri Dec  5 12:30:14 2008
@@ -19,15 +19,14 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
@@ -37,166 +36,381 @@
 	static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
 
 	private static final String recordType = "JobLogHistory";
-	private static final String jobEntryTypeField = "entryType";
-	
-	private static String regex = null;
 	private static String internalRegex = null;
-	private static Pattern p = null;
 	private static Pattern ip = null;
-	
-	private Matcher matcher = null;
+
 	private Matcher internalMatcher = null;
-	
-	private SimpleDateFormat sdf = null;
 
 	public JobLogHistoryProcessor()
 	{
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-		
-		internalRegex = "(.*?)=\"(.*?)\"(.*)";
-		regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
-		
+		internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
 		ip = Pattern.compile(internalRegex);
-		p = Pattern.compile(regex);
-		
 		internalMatcher = ip.matcher("-");
-		matcher = p.matcher("-");
 	}
 
 	@Override
 	protected void parse(String recordEntry,
-			OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	 throws Throwable
 	{
 
-		String logLevel = null;
-		String className = null;
-		String body = null;
+//		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+//				+ chunk.getDataType() + "]");
 
-		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
-				+ chunk.getDataType() + "]");
-		
-		
-		matcher.reset(recordEntry);
-		if (matcher.matches())
+		try
 		{
-			log.info("JobLogHistoryProcessor Matches");
 
-			try
-			{
-				Date d = sdf.parse(matcher.group(1).trim());
-				ChukwaRecord record = new ChukwaRecord();
+			HashMap<String, String> keys = new HashMap<String, String>();
+			ChukwaRecord record = null;
+		
+			int firstSep = recordEntry.indexOf(" ");
+			keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
+//			log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+//					+ keys.get("RECORD_TYPE") + "]");
 
-				logLevel = matcher.group(2);
-				className = matcher.group(3);
-				body = matcher.group(4);
+			String body = recordEntry.substring(firstSep);
 
-				key.set("" + d.getTime());
-				
-				record.add(Record.logLevelField, logLevel);
-				record.add(Record.classField, className);
-				
-				int firstSep = body.indexOf(" ");
-				String logEntryType = body.substring(0,firstSep);
-				record.add(jobEntryTypeField, logEntryType);
-				
-				internalMatcher.reset(body.substring(firstSep));
-				String fieldName = null;
-				String fieldValue = null;
-				while (internalMatcher.matches())
-				{
-					fieldName = internalMatcher.group(1).trim();
-					fieldValue = internalMatcher.group(2).trim();
-					record.add(fieldName, fieldValue);
-					log.info("JobLogHistoryProcessor Add field: [" + fieldName + "][" + fieldValue +"]" );
-					internalMatcher.reset(internalMatcher.group(3));
-				}
+			internalMatcher.reset(body);
 
-				buildGenericRecord(record, body, d.getTime(), JobLogHistoryProcessor.recordType);
-				output.collect(key, record);
-				log.info("JobLogHistoryProcessor outputing a record ============>>>>>");
-			} 
-			catch (IOException e)
+//			 String fieldName = null;
+//			 String fieldValue = null;
+
+			while (internalMatcher.matches())
 			{
-				e.printStackTrace();
+
+				keys.put(internalMatcher.group(1).trim(), internalMatcher
+						.group(2).trim());
+
+				// TODO Remove debug info before production
+//				 fieldName = internalMatcher.group(1).trim();
+//				 fieldValue = internalMatcher.group(2).trim();
+//				 log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+//				 "][" + fieldValue +"]" );
+//				 log.info("EOL : [" + internalMatcher.group(3) + "]" );
+				internalMatcher.reset(internalMatcher.group(3));
 			} 
-			catch (ParseException e)
+
+			if (!keys.containsKey("JOBID"))
 			{
-				e.printStackTrace();
+				// Extract JobID from taskID
+				// JOBID  = "job_200804210403_0005"
+				// TASKID = "tip_200804210403_0005_m_000018"
+				String jobId = keys.get("TASKID");
+				int idx1 = jobId.indexOf('_',0);
+				int idx2 = jobId.indexOf('_', idx1+1);
+				idx2 = jobId.indexOf('_', idx2+1);
+				keys.put("JOBID",jobId.substring(idx1+1,idx2));
+//				log.info("JobLogHistoryProcessor Add field: [JOBID]["
+//						+ keys.get("JOBID") + "]");
 			}
+			else
+			{
+				String jobId = keys.get("JOBID").replace("_", "").substring(3);
+				keys.put("JOBID",jobId);
+			}
+			// if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("SUBMIT_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+			// USER="userxxx"
+			// // SUBMIT_TIME="1208760436751"
+			// JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+			//					
+			//					
+			// }
+			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("LAUNCH_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+			// TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+			//					
+			// }
+			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("FINISH_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+			// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+			// FAILED_MAPS="0" FAILED_REDUCES="0"
+			// // COUNTERS="File Systems.Local bytes read:1735053407244,File
+			// Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+			// read:801605644910,File Systems.HDFS bytes written:44135800,
+			// // Job Counters .Launched map tasks:5912,Job Counters .Launched
+			// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+			// Counters .Rack-local map tasks:316,Map-Reduce Framework.
+			// // Map input records:9410696067,Map-Reduce Framework.Map output
+			// records:9410696067,Map-Reduce Framework.Map input
+			// bytes:801599188816,Map-Reduce Framework.Map output
+			// bytes:784427968116,
+			// // Map-Reduce Framework.Combine input records:0,Map-Reduce
+			// Framework.Combine output records:0,Map-Reduce Framework.Reduce
+			// input groups:477265,Map-Reduce Framework.Reduce input
+			// records:739000,
+			// // Map-Reduce Framework.Reduce output records:739000"
+			//					
+			// }
+			// else
+			if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+					&& keys.containsKey("START_TIME"))
+			{
+				// MapAttempt TASK_TYPE="MAP"
+				// TASKID="tip_200804210403_0005_m_000018"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+				// START_TIME="1208760437531"
+				// HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("START_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("START_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/Map/S");
+				output.collect(key, record);
 
-		}
-	}
+			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+					&& keys.containsKey("FINISH_TIME"))
+			{
+				// MapAttempt TASK_TYPE="MAP"
+				// TASKID="tip_200804210403_0005_m_005494"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+				// TASK_STATUS="SUCCESS"
+				// FINISH_TIME="1208760624124"
+				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/Map/E");
+				output.collect(key, record);
+			}
 
-	public String getDataType()
-	{
-		return JobLogHistoryProcessor.recordType;
-	}
+			else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+					&& keys.containsKey("START_TIME"))
+			{
+				// ReduceAttempt TASK_TYPE="REDUCE"
+				// TASKID="tip_200804210403_0005_r_000138"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+				// START_TIME="1208760454885"
+				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("START_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("START_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SHUFFLE/S");
+				output.collect(key, record);
 
-	public static void main(String[] args)
-	{
-		
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-		
-		internalRegex = "(.*?)=\"(.*?)\"(.*)";
-		regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
-		
-		ip = Pattern.compile(internalRegex);
-		p = Pattern.compile(regex);
-		
-		Matcher internalMatcher = ip.matcher("-");
-		Matcher matcher = p.matcher("-");
-	
-		String log = "2008-07-28 23:30:38,865 INFO org.apache.hadoop.chukwa.ChukwaJobHistory: Task TASKID=\"task_200807282329_0001_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1217287838862\" SPLITS=\"/default-rack/somehost3.example.com,/default-rack/somehost2.example.com,/default-rack/somehost.example.com\"";
-		 matcher.reset(log);
-		
-		if (matcher.matches())
-		{
-			System.out.println("JobLogHistoryProcessor Matches");
+			} else if (keys.get("RECORD_TYPE")
+					.equalsIgnoreCase("ReduceAttempt")
+					&& keys.containsKey("FINISH_TIME"))
+			{
+				// ReduceAttempt TASK_TYPE="REDUCE"
+				// TASKID="tip_200804210403_0005_r_000138"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+				// TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+				// SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+				// HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SHUFFLE/E");
+				output.collect(key, record);
+
+				// SORT
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SORT/S");
+				output.collect(key, record);
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SORT/E");
+				output.collect(key, record);
 
-			try
+				// Reduce
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/REDUCE/S");
+				output.collect(key, record);
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/REDUCE/E");
+				output.collect(key, record);
+
+			} else if ( keys.get("RECORD_TYPE").equalsIgnoreCase("Job") )
 			{
-				Date d = sdf.parse(matcher.group(1).trim());
-				System.out.println(d);
-				ChukwaRecord record = new ChukwaRecord();
-
-				String logLevel = matcher.group(2);
-				String className = matcher.group(3);
-				String body = matcher.group(4);
-
-				System.out.println(matcher.group(1));
-				System.out.println(matcher.group(2));
-				System.out.println(matcher.group(3));
-				System.out.println(matcher.group(4));
-				
-				System.out.println(body); 
+				// 1
+				// Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx" 
+				// SUBMIT_TIME="1208760906812" 
+				// JOBCONF="/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
 				
-				record.add(Record.logLevelField, logLevel);
-				record.add(Record.classField, className);
+				// 2
+				// Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816" TOTAL_MAPS="3" TOTAL_REDUCES="7"
 				
-				int firstSep = body.indexOf(" ");
-				System.out.println(firstSep);
-				String logEntryType = body.substring(0,firstSep);
-				record.add(jobEntryTypeField, logEntryType);
-				
-				internalMatcher.reset(body.substring(firstSep));
-				String fieldName = null;
-				String fieldValue = null;
-				while (internalMatcher.matches())
+				// 3
+				// Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
+				// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+				// FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+				// COUNTERS="File Systems.Local bytes read:1735053407244,File
+				// Systems.Local bytes written:2610106384012,File Systems.HDFS
+				// bytes read:801605644910,File Systems.HDFS bytes
+				// written:44135800,
+				// Job Counters .Launched map tasks:5912,Job Counters .Launched
+				// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+				// Counters .Rack-local map tasks:316,Map-Reduce Framework.
+				// Map input records:9410696067,Map-Reduce Framework.Map output
+				// records:9410696067,Map-Reduce Framework.Map input
+				// bytes:801599188816,Map-Reduce Framework.Map output
+				// bytes:784427968116,
+				// Map-Reduce Framework.Combine input records:0,Map-Reduce
+				// Framework.Combine output records:0,Map-Reduce
+				// Framework.Reduce input groups:477265,Map-Reduce
+				// Framework.Reduce input records:739000,
+				// Map-Reduce Framework.Reduce output records:739000"
+
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJob");
+				if (keys.containsKey("COUNTERS"))
 				{
-					fieldName = internalMatcher.group(1).trim();
-					fieldValue = internalMatcher.group(2).trim();
-					record.add(fieldName, fieldValue);
-					System.out.println("JobLogHistoryProcessor Add field: [" + fieldName + "][" + fieldValue +"]" );
-					internalMatcher.reset(internalMatcher.group(3));
+					extractCounters(record, keys.get("COUNTERS"));
 				}
 
+				key = new ChukwaRecordKey();
+				key.setKey("MRJob/" + keys.get("JOBID") );
+				key.setReduceType("MRJobReduceProcessor");
+				
+				record = new ChukwaRecord();
+				record.add(Record.tagsField, chunk.getTags());
+				if (keys.containsKey("SUBMIT_TIME"))
+				{ record.setTime(Long.parseLong(keys.get("SUBMIT_TIME"))); }
+				else if (keys.containsKey("LAUNCH_TIME"))
+				{ record.setTime(Long.parseLong(keys.get("LAUNCH_TIME"))); }
+				else if (keys.containsKey("FINISH_TIME"))
+				{ record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); }
 				
-				System.out.println("JobLogHistoryProcessor outputing a record ============>>>>>");
+				Iterator<String> it = keys.keySet().iterator();
+				while(it.hasNext())
+				{
+					String field = it.next();
+			        record.add(field, keys.get(field));
+				}
+				 
+				 
+				output.collect(key, record);
 			}
-			catch(Exception e)
+
+			if (keys.containsKey("TASK_TYPE")
+					&& keys.containsKey("COUNTERS")
+					&& (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
+							.get("TASK_TYPE").equalsIgnoreCase("MAP")))
 			{
-				e.printStackTrace();
+				// MAP
+				// Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+				// TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+				// COUNTERS="File Systems.Local bytes read:159265655,File
+				// Systems.Local bytes written:318531310,
+				// File Systems.HDFS bytes read:145882417,Map-Reduce
+				// Framework.Map input records:1706604,
+				// Map-Reduce Framework.Map output records:1706604,Map-Reduce
+				// Framework.Map input bytes:145882057,
+				// Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+				// Framework.Combine input records:0,Map-Reduce
+				// Framework.Combine output records:0"
+
+				// REDUCE
+				// Task TASKID="tip_200804210403_0005_r_000524"
+				// TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+				// FINISH_TIME="1208760877072"
+				// COUNTERS="File Systems.Local bytes read:1179319677,File
+				// Systems.Local bytes written:1184474889,File Systems.HDFS
+				// bytes written:59021,
+				// Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+				// Framework.Reduce input records:1000,Map-Reduce
+				// Framework.Reduce output records:1000"
+
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+				extractCounters(record, keys.get("COUNTERS"));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("TASKID", keys.get("TASKID"));
+				record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("MR_Graph +1");
+				output.collect(key, record);
+
 			}
+		} 
+		catch (IOException e)
+		{
+			log.warn("Unable to collect output in JobLogHistoryProcessor ["
+					+ recordEntry + "]", e);
+			e.printStackTrace();
+			throw e;
+		}
+
+	}
+
+	protected void extractCounters(ChukwaRecord record, String input)
+	{
+
+		String[] data = null;
+		String[] counters = input.split(",");
+
+		for (String counter : counters)
+		{
+			data = counter.split(":");
+			record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+					.toUpperCase(), data[1]);
 		}
 	}
+
+	public String getDataType()
+	{
+		return JobLogHistoryProcessor.recordType;
+	}
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.util.Hashtable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Log4jJobHistoryProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
+
+	private static final String recordType = "JobLogHistory";
+	private static String internalRegex = null;
+	private static Pattern ip = null;
+
+	private Matcher internalMatcher = null;
+
+	public Log4jJobHistoryProcessor()
+	{
+		internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+		ip = Pattern.compile(internalRegex);
+		internalMatcher = ip.matcher("-");
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	 throws Throwable
+	{
+
+//		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+//				+ chunk.getDataType() + "]");
+
+		try
+		{
+
+			//String dStr = recordEntry.substring(0, 23);
+			int start = 24;
+			int idx = recordEntry.indexOf(' ', start);
+			// String level = recordEntry.substring(start, idx);
+			start = idx + 1;
+			idx = recordEntry.indexOf(' ', start);
+			// String className = recordEntry.substring(start, idx-1);
+			String body = recordEntry.substring(idx + 1);
+
+			Hashtable<String, String> keys = new Hashtable<String, String>();
+			ChukwaRecord record = null;
+		
+			int firstSep = body.indexOf(" ");
+			keys.put("RECORD_TYPE", body.substring(0, firstSep));
+//			log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+//					+ keys.get("RECORD_TYPE") + "]");
+
+			body = body.substring(firstSep);
+
+			internalMatcher.reset(body);
+
+//			 String fieldName = null;
+//			 String fieldValue = null;
+
+			while (internalMatcher.matches())
+			{
+
+				keys.put(internalMatcher.group(1).trim(), internalMatcher
+						.group(2).trim());
+
+				// TODO Remove debug info before production
+//				 fieldName = internalMatcher.group(1).trim();
+//				 fieldValue = internalMatcher.group(2).trim();
+//				 log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+//				 "][" + fieldValue +"]" );
+//				 log.info("EOL : [" + internalMatcher.group(3) + "]" );
+				internalMatcher.reset(internalMatcher.group(3));
+			} 
+
+			if (!keys.containsKey("JOBID"))
+			{
+				// Extract JobID from taskID
+				// JOBID  = "job_200804210403_0005"
+				// TASKID = "tip_200804210403_0005_m_000018"
+				String jobId = keys.get("TASKID");
+				int idx1 = jobId.indexOf('_',0);
+				int idx2 = jobId.indexOf('_', idx1+1);
+				idx2 = jobId.indexOf('_', idx2+1);
+				keys.put("JOBID","job" + jobId.substring(idx1,idx2));
+//				log.info("JobLogHistoryProcessor Add field: [JOBID]["
+//						+ keys.get("JOBID") + "]");
+			}
+			
+			// if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("SUBMIT_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+			// USER="userxxx"
+			// // SUBMIT_TIME="1208760436751"
+			// JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+			//					
+			//					
+			// }
+			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("LAUNCH_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+			// TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+			//					
+			// }
+			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+			// keys.containsKey("FINISH_TIME"))
+			// {
+			// // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+			// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+			// FAILED_MAPS="0" FAILED_REDUCES="0"
+			// // COUNTERS="File Systems.Local bytes read:1735053407244,File
+			// Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+			// read:801605644910,File Systems.HDFS bytes written:44135800,
+			// // Job Counters .Launched map tasks:5912,Job Counters .Launched
+			// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+			// Counters .Rack-local map tasks:316,Map-Reduce Framework.
+			// // Map input records:9410696067,Map-Reduce Framework.Map output
+			// records:9410696067,Map-Reduce Framework.Map input
+			// bytes:801599188816,Map-Reduce Framework.Map output
+			// bytes:784427968116,
+			// // Map-Reduce Framework.Combine input records:0,Map-Reduce
+			// Framework.Combine output records:0,Map-Reduce Framework.Reduce
+			// input groups:477265,Map-Reduce Framework.Reduce input
+			// records:739000,
+			// // Map-Reduce Framework.Reduce output records:739000"
+			//					
+			// }
+			// else
+			if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+					&& keys.containsKey("START_TIME"))
+			{
+				// MapAttempt TASK_TYPE="MAP"
+				// TASKID="tip_200804210403_0005_m_000018"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+				// START_TIME="1208760437531"
+				// HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("START_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("START_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/Map/S");
+				output.collect(key, record);
+
+			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+					&& keys.containsKey("FINISH_TIME"))
+			{
+				// MapAttempt TASK_TYPE="MAP"
+				// TASKID="tip_200804210403_0005_m_005494"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+				// TASK_STATUS="SUCCESS"
+				// FINISH_TIME="1208760624124"
+				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/Map/E");
+				output.collect(key, record);
+			}
+
+			else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+					&& keys.containsKey("START_TIME"))
+			{
+				// ReduceAttempt TASK_TYPE="REDUCE"
+				// TASKID="tip_200804210403_0005_r_000138"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+				// START_TIME="1208760454885"
+				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("START_TIME")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("START_TIME"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SHUFFLE/S");
+				output.collect(key, record);
+
+			} else if (keys.get("RECORD_TYPE")
+					.equalsIgnoreCase("ReduceAttempt")
+					&& keys.containsKey("FINISH_TIME"))
+			{
+				// ReduceAttempt TASK_TYPE="REDUCE"
+				// TASKID="tip_200804210403_0005_r_000138"
+				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+				// TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+				// SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+				// HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SHUFFLE/E");
+				output.collect(key, record);
+
+				// SORT
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SORT/S");
+				output.collect(key, record);
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/SORT/E");
+				output.collect(key, record);
+
+				// Reduce
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("START_TIME", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/REDUCE/S");
+				output.collect(key, record);
+
+				key = new ChukwaRecordKey();
+				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
+				key.setReduceType("JobLogHistoryReduceProcessor");
+				record = new ChukwaRecord();
+				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+				record.add(Record.tagsField, chunk.getTags());
+//				log.info("JobLogHist/REDUCE/E");
+				output.collect(key, record);
+
+			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
+					&& keys.containsKey("COUNTERS"))
+			{
+				// Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+				// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+				// FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+				// COUNTERS="File Systems.Local bytes read:1735053407244,File
+				// Systems.Local bytes written:2610106384012,File Systems.HDFS
+				// bytes read:801605644910,File Systems.HDFS bytes
+				// written:44135800,
+				// Job Counters .Launched map tasks:5912,Job Counters .Launched
+				// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+				// Counters .Rack-local map tasks:316,Map-Reduce Framework.
+				// Map input records:9410696067,Map-Reduce Framework.Map output
+				// records:9410696067,Map-Reduce Framework.Map input
+				// bytes:801599188816,Map-Reduce Framework.Map output
+				// bytes:784427968116,
+				// Map-Reduce Framework.Combine input records:0,Map-Reduce
+				// Framework.Combine output records:0,Map-Reduce
+				// Framework.Reduce input groups:477265,Map-Reduce
+				// Framework.Reduce input records:739000,
+				// Map-Reduce Framework.Reduce output records:739000"
+
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
+				extractCounters(record, keys.get("COUNTERS"));
+
+				String jobId = keys.get("JOBID").replace("_", "").substring(3);
+				record.add("JobId", "" + jobId);
+
+				// FIXME validate this when HodId will be available
+				if (keys.containsKey("HODID"))
+					{ record.add("HodId", keys.get("HODID")); }
+
+//				log.info("MRJobCounters +1");
+				output.collect(key, record);
+			}
+
+			if (keys.containsKey("TASK_TYPE")
+					&& keys.containsKey("COUNTERS")
+					&& (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
+							.get("TASK_TYPE").equalsIgnoreCase("MAP")))
+			{
+				// MAP
+				// Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+				// TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+				// COUNTERS="File Systems.Local bytes read:159265655,File
+				// Systems.Local bytes written:318531310,
+				// File Systems.HDFS bytes read:145882417,Map-Reduce
+				// Framework.Map input records:1706604,
+				// Map-Reduce Framework.Map output records:1706604,Map-Reduce
+				// Framework.Map input bytes:145882057,
+				// Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+				// Framework.Combine input records:0,Map-Reduce
+				// Framework.Combine output records:0"
+
+				// REDUCE
+				// Task TASKID="tip_200804210403_0005_r_000524"
+				// TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+				// FINISH_TIME="1208760877072"
+				// COUNTERS="File Systems.Local bytes read:1179319677,File
+				// Systems.Local bytes written:1184474889,File Systems.HDFS
+				// bytes written:59021,
+				// Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+				// Framework.Reduce input records:1000,Map-Reduce
+				// Framework.Reduce output records:1000"
+
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+				extractCounters(record, keys.get("COUNTERS"));
+				record.add("JOBID",keys.get("JOBID"));
+				record.add("TASKID", keys.get("TASKID"));
+				record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+				
+//				log.info("MR_Graph +1");
+				output.collect(key, record);
+
+			}
+		} 
+		catch (IOException e)
+		{
+			log.warn("Unable to collect output in JobLogHistoryProcessor ["
+					+ recordEntry + "]", e);
+			e.printStackTrace();
+			throw e;
+		}
+
+	}
+
+	protected void extractCounters(ChukwaRecord record, String input)
+	{
+
+		String[] data = null;
+		String[] counters = input.split(",");
+
+		for (String counter : counters)
+		{
+			data = counter.split(":");
+			record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+					.toUpperCase(), data[1]);
+		}
+	}
+
+	public String getDataType()
+	{
+		return Log4jJobHistoryProcessor.recordType;
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public interface MapProcessor
+{
+	public void process(ChukwaArchiveKey archiveKey,Chunk chunk,OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+
+
+public class MapProcessorFactory
+{
+	static Logger log = Logger.getLogger(MapProcessorFactory.class);	
+	
+	private static HashMap<String,MapProcessor > processors =
+	    new HashMap<String, MapProcessor>(); // registry
+		
+	private MapProcessorFactory()
+	{}
+	
+	public static MapProcessor getProcessor(String parserClass)
+	 throws UnknownRecordTypeException
+	{
+		if (processors.containsKey(parserClass)) 
+		{
+			return processors.get(parserClass);
+		} 
+		else 
+		{
+			MapProcessor processor = null;
+			try 
+			{
+				processor = (MapProcessor)Class.forName(parserClass).getConstructor().newInstance();
+			} 
+			catch(ClassNotFoundException e) 
+			{
+				throw new UnknownRecordTypeException("Unknown parserClass:" + parserClass, e);			
+			}
+			catch(Exception e) 
+			{
+			  throw new UnknownRecordTypeException("error constructing processor", e);
+			}
+			
+			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
+			// to reuse the same processor again and again
+			processors.put(parserClass,processor);
+			return processor;
+		}
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java Fri Dec  5 12:30:14 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 public class PbsInvalidEntry extends Exception

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class PbsNodes extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(PbsNodes.class);
+
+	private static final String rawPBSRecordType = "PbsNodes";
+	private static final String machinePBSRecordType = "MachinePbsNodes";
+	private SimpleDateFormat sdf = null;
+
+	public PbsNodes()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+	}
+
+	@Override
+	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	 throws Throwable
+	{
+		
+//		log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		
+		
+
+		StringBuilder sb = new StringBuilder(); 	 
+		int i = 0;
+		String nodeActivityStatus = null;
+		StringBuilder sbFreeMachines = new StringBuilder();
+		StringBuilder sbUsedMachines = new StringBuilder();
+		StringBuilder sbDownMachines = new StringBuilder();
+		
+		int totalFreeNode = 0;
+		int totalUsedNode = 0;
+		int totalDownNode = 0;
+	
+		String body = null;
+		ChukwaRecord record = null;
+		
+		
+			try
+			{
+				
+				String dStr = recordEntry.substring(0, 23);
+				int start = 24;
+				int idx = recordEntry.indexOf(' ', start);
+				//String level = recordEntry.substring(start, idx);
+				start = idx+1;
+				idx = recordEntry.indexOf(' ', start );
+				//String className = recordEntry.substring(start, idx-1);
+				body = recordEntry.substring(idx+1);
+				
+				
+				Date d = sdf.parse(dStr );
+				
+				String[] lines = body.split("\n");
+				while (i < lines.length)
+				{
+					while ((i < lines.length) && (lines[i].trim().length() > 0))
+					{
+						sb.append(lines[i].trim()).append("\n");
+						i++;
+					}
+
+					 if ( (i< lines.length) && (lines[i].trim().length() > 0) )
+					 {
+					 throw new PbsInvalidEntry(recordEntry);
+					 }
+
+					// Empty line
+					i++;
+
+					if (sb.length() > 0)
+					{
+						body =  sb.toString();
+						// Process all entries for a machine
+						//System.out.println("=========>>> Record [" + body+ "]");
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						
+						buildGenericRecord(record,null,d.getTime(),machinePBSRecordType);
+						parsePbsRecord(body, record);
+						
+						//Output PbsNode record for 1 machine
+						output.collect(key, record);
+						//log.info("PbsNodeProcessor output 1 sub-record");
+						
+						//compute Node Activity information
+						nodeActivityStatus = record.getValue("state");
+						if (nodeActivityStatus != null)
+						{
+							if (nodeActivityStatus.equals("free"))
+							{
+								totalFreeNode ++;
+								sbFreeMachines.append(record.getValue("Machine")).append(",");
+							}
+							else if (nodeActivityStatus.equals("job-exclusive"))
+							{
+								totalUsedNode ++;
+								sbUsedMachines.append(record.getValue("Machine")).append(",");
+							}
+							else
+							{
+								totalDownNode ++;
+								sbDownMachines.append(record.getValue("Machine")).append(",");
+							}					
+						}
+						sb = new StringBuilder();
+					}
+				}
+				
+				// End of parsing
+
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				buildGenericRecord(record,null,d.getTime(),"NodeActivity");
+				
+				record.setTime(d.getTime());
+				record.add("used", ""+totalUsedNode);
+				record.add("free", ""+totalFreeNode);
+				record.add("down", ""+totalDownNode);
+				record.add("usedMachines", sbUsedMachines.toString());
+				record.add("freeMachines", sbFreeMachines.toString());
+				record.add("downMachines", sbDownMachines.toString());
+				
+				output.collect(key,record);
+				//log.info("PbsNodeProcessor output 1 NodeActivity");					
+			}
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+				throw e;
+			}
+			catch (IOException e)
+			{
+				log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + "]", e);
+				e.printStackTrace();
+				throw e;
+			}
+			catch (PbsInvalidEntry e)
+			{
+				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+				e.printStackTrace();
+				throw e;
+			}
+		
+		
+
+		
+	}
+
+	protected static void parsePbsRecord(String recordLine, ChukwaRecord record)
+	{
+		int i = 0;
+		String[] lines = recordLine.split("\n");
+		record.add("Machine", lines[0]);
+		
+		i++;
+		String[] data = null;
+		while (i < lines.length)
+		{
+			data = extractFields(lines[i]);	
+			record.add(data[0].trim(), data[1].trim());
+			if (data[0].trim().equalsIgnoreCase("status"))
+			{
+				parseStatusField(data[1].trim(), record);
+			}
+			i++;
+		}
+	}
+
+	protected static void parseStatusField(String statusField,
+			ChukwaRecord record)
+	{
+		String[] data = null;
+		String[] subFields = statusField.trim().split(",");
+		for (String subflied : subFields)
+		{
+			data = extractFields(subflied);
+			record.add("status-"+data[0].trim(), data[1].trim());
+		}
+	}
+
+
+	static String[] extractFields(String line)
+	{
+		String[] args = new String[2];
+		int index = line.indexOf("=");
+		args[0] = line.substring(0,index );
+		args[1] = line.substring(index + 1);
+
+		return args;
+	}
+
+	public String getDataType()
+	{
+		return PbsNodes.rawPBSRecordType;
+	}
+	
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java Fri Dec  5 12:30:14 2008
@@ -23,9 +23,8 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
@@ -44,22 +43,21 @@
 	public Sar()
 	{
 		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
 		p = Pattern.compile(regex);
 	}
 
 	@Override
-	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
 			Reporter reporter)
+	 throws Throwable
 	{
 		
 		log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		StringBuilder sb = new StringBuilder(); 	 
 		int i = 0;
 		
-		String logLevel = null;
-		String className = null;
-		String body = null;
+//		String logLevel = null;
+//		String className = null;
 		
 		matcher=p.matcher(recordEntry);
 		while (matcher.find())
@@ -70,13 +68,12 @@
 			{
 				Date d = sdf.parse( matcher.group(1).trim());
 				
-				logLevel = matcher.group(2);
-				className = matcher.group(3);
-				String hostname = matcher.group(5);
+//				logLevel = matcher.group(2);
+//				className = matcher.group(3);
 				
 				//TODO create a more specific key structure
 				// part of ChukwaArchiveKey + record index if needed
-				key.set("" + d.getTime());
+				key.setKey("" + d.getTime());
 				
 				String[] lines = recordEntry.split("\n");
 				
@@ -89,43 +86,61 @@
 				}
 				while (i < lines.length)
 				{
-					DatabaseHelper databaseRecord = null;
+					ChukwaRecord record = null;
 					if(lines[i].equals("")) {
 						i++;
 						headers = parseHeader(lines[i]);
 						i++;
 					}
 					String data[] = parseData(lines[i]);
+					
+					//FIXME please validate this
 					if(headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
 						log.debug("Matched Sar-Network");
-						databaseRecord = new DatabaseHelper("system");
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
 					} else if(headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
 						log.debug("Matched Sar-Network");
-						databaseRecord = new DatabaseHelper("system");	
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
 					} else if(headers[1].equals("kbmemfree")) {
 						log.debug("Matched Sar-Memory");
-						databaseRecord = new DatabaseHelper("system");	
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
 					} else if(headers[1].equals("totsck")) {
 						log.debug("Matched Sar-NetworkSockets");
-						databaseRecord = new DatabaseHelper("system");	
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
 					} else if(headers[1].equals("runq-sz")) {
 						log.debug("Matched Sar-LoadAverage");
-						databaseRecord = new DatabaseHelper("system");	
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null,d.getTime(), "SystemMetrics");
 					} else {
 						log.debug("No match:"+headers[1]+" "+headers[2]);
 					}
-					if(databaseRecord!=null) {
+					if(record!=null) {
 						int j=0;
+						
 						log.debug("Data Length: " + data.length);
 	                    while(j<data.length) {
 	                    	log.debug("header:"+headers[j]+" data:"+data[j]);
 	                    	if(!headers[j].equals("Average:")) {
-						        databaseRecord.add(d.getTime(),headers[j],data[j]);
+	                    		record.add(headers[j],data[j]);
 	                    	}
 						    j++;
-	                    }						
-						//Output Sar info to database
-						output.collect(key, databaseRecord.buildChukwaRecord());
+	                    }			
+	                   
+						output.collect(key, record);
 					}
 					i++;
 				}
@@ -133,6 +148,7 @@
 			} catch (Exception e)
 			{
 				e.printStackTrace();
+				throw e;
 			}
 		}
 	}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class SysLog extends AbstractProcessor 
+{
+
+	static Logger log = Logger.getLogger(SysLog.class);
+	private SimpleDateFormat sdf = null;
+
+	public SysLog()
+	{
+		sdf = new SimpleDateFormat("MMM d HH:mm:ss");
+	}
+	
+	
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+  throws Throwable
+  {
+		try
+		{
+			String dStr = recordEntry.substring(0, 15);
+			int start = 15;
+			int idx = recordEntry.indexOf(' ', start);
+			start = idx + 1;
+			idx = recordEntry.indexOf(' ', start);
+			String body = recordEntry.substring(idx + 1);
+			body.replaceAll("\n", "");
+
+			Calendar convertDate = Calendar.getInstance();
+			Date d = sdf.parse(dStr);
+			int year = convertDate.get(Calendar.YEAR);
+			convertDate.setTime(d);
+			convertDate.set(Calendar.YEAR, year);
+			
+			ChukwaRecord record = new ChukwaRecord();
+			buildGenericRecord(record,recordEntry,convertDate.getTime().getTime(),"SysLog");
+			output.collect(key, record);
+		}
+		catch (ParseException e)
+		{
+			e.printStackTrace();
+			log.warn("Wrong format in SysLog [" + recordEntry + "]", e);
+			throw e;
+		}
+		catch (IOException e)
+		{
+			e.printStackTrace();
+			log.warn("Unable to collect output in SysLog [" + recordEntry + "]", e);
+			throw e;
+		}
+
+  }
+
+
+	public String getDataType()
+	{
+		return SysLog.class.getName();
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java Fri Dec  5 12:30:14 2008
@@ -20,19 +20,16 @@
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.Set;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
-import org.apache.hadoop.chukwa.extraction.engine.Record;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
-import java.util.HashMap;
 
 public class Top extends AbstractProcessor
 {
@@ -48,138 +45,118 @@
 	public Top()
 	{
 		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
 		p = Pattern.compile(regex);
 	}
 
 	@Override
-	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
 			Reporter reporter)
+	 throws Throwable
 	{
 		
-		log.info("Top record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		StringBuilder sb = new StringBuilder(); 	 
+		log.debug("Top record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
 		
-		String logLevel = null;
-		String className = null;
-		String body = null;
 		
 		matcher=p.matcher(recordEntry);
 		while (matcher.find())
 		{
-			log.info("Top Processor Matches");
+			log.debug("Top Processor Matches");
 			
 			try
 			{
 				Date d = sdf.parse( matcher.group(1).trim());
-				
-				logLevel = matcher.group(2);
-				className = matcher.group(3);
-				
-				//TODO create a more specific key structure
-				// part of ChukwaArchiveKey + record index if needed
-				key.set("" + d.getTime());
+
+				ChukwaRecord record = new ChukwaRecord();
 				String[] lines = recordEntry.split("\n");
 				int i = 0;
+				if(lines.length<2) {
+					return;
+				}
 				String summaryString = "";
 				while(!lines[i].equals("")) {
 					summaryString = summaryString + lines[i] + "\n";
 				    i++;
 				}
 				i++;
-				String[] headers = lines[i].split("\\s+");
-				HashMap<String, String>summary = parseSummary(summaryString);
-				DatabaseHelper databaseRecord = new DatabaseHelper("system");
-				Iterator<String> ki = summary.keySet().iterator();
-				while(ki.hasNext()) {
-					String key = ki.next();
-				    databaseRecord.add(d.getTime(),key, summary.get(key));
-				}
-				output.collect(key, databaseRecord.buildChukwaRecord());
-				while (i < lines.length)
-				{
-					databaseRecord = null;
-					String data[] = lines[i].split("\\s+",headers.length);
-					if(lines[i].indexOf("PID USER")<0) {
-						databaseRecord = new DatabaseHelper("system");	
-					}
-					if(databaseRecord!=null) {
-						int j=0;
-						log.debug("Data Length: " + data.length);
-	                    while(j<data.length-1) {
-	                    	if(headers[0].equals("")) {
-		                    	log.debug("header:"+headers[j+1]+" data:"+data[j+1]);
-	                    		databaseRecord.add(d.getTime(),headers[j+1],data[j+1]);
-	                    	} else {
-		                    	log.debug("header:"+headers[j+1]+" data:"+data[j]);
-							    databaseRecord.add(d.getTime(),headers[j+1],data[j]);
-	                    	}
-						    j++;
-	                    }						
-						//Output Sar info to database
-						output.collect(key, databaseRecord.buildChukwaRecord());
-					}
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				parseSummary(record,summaryString);
+				this.buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+				output.collect(key, record);
+				
+				StringBuffer buffer = new StringBuffer();
+				//FIXME please validate this
+				while (i < lines.length) {
+					record = null;
+					buffer.append(lines[i]+"\n");
 					i++;
+					
 				}
+				record = new ChukwaRecord();
+				key = new ChukwaRecordKey();
+				this.buildGenericRecord(record, buffer.toString(), d.getTime(), "Top");
+				//Output Top info to database
+				output.collect(key, record);
+
 				// End of parsing
 			} catch (Exception e)
 			{
 				e.printStackTrace();
+				throw e;
 			}
 		}
 	}
 	
-	public HashMap<String, String> parseSummary(String header) {
-		HashMap<String, String> keyValues = new HashMap<String, String>();
+	public void parseSummary(ChukwaRecord record,String header) {
+		HashMap<String, Object> keyValues = new HashMap<String, Object>();
 		String[] headers = header.split("\n");
-		int i=0;
 		Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
 		Matcher matcher = p.matcher(headers[0]);
 		if(matcher.find()) {
-            keyValues.put("uptime",matcher.group(2));
-            keyValues.put("users",matcher.group(3));
+            record.add("uptime",matcher.group(2));
+            record.add("users",matcher.group(3));
 		}
 		p = Pattern.compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
 		matcher = p.matcher(headers[1]);
 		if(matcher.find()) {
-            keyValues.put("tasks_total",matcher.group(1));
-            keyValues.put("tasks_running",matcher.group(2));
-            keyValues.put("tasks_sleeping",matcher.group(3));
-            keyValues.put("tasks_stopped",matcher.group(4));
-            keyValues.put("tasks_zombie",matcher.group(5));
+			record.add("tasks_total",matcher.group(1));
+			record.add("tasks_running",matcher.group(2));
+			record.add("tasks_sleeping",matcher.group(3));
+			record.add("tasks_stopped",matcher.group(4));
+			record.add("tasks_zombie",matcher.group(5));
 		}
 		p = Pattern.compile("Cpu\\(s\\):\\s+(.*?)% us,\\s+(.*?)% sy,\\s+(.*?)% ni,\\s+(.*?)% id,\\s+(.*?)% wa,\\s+(.*?)% hi,\\s+(.*?)% si");
 		matcher = p.matcher(headers[2]);
 		if(matcher.find()) {
-            keyValues.put("cpu_user%",matcher.group(1));
-            keyValues.put("cpu_sys%",matcher.group(2));
-            keyValues.put("cpu_nice%",matcher.group(3));
-            keyValues.put("cpu_wait%",matcher.group(4));
-            keyValues.put("cpu_hi%",matcher.group(5));
-            keyValues.put("cpu_si%",matcher.group(6));
+			record.add("cpu_user%",matcher.group(1));
+			record.add("cpu_sys%",matcher.group(2));
+			record.add("cpu_nice%",matcher.group(3));
+			record.add("cpu_wait%",matcher.group(4));
+			record.add("cpu_hi%",matcher.group(5));
+			record.add("cpu_si%",matcher.group(6));
 		}
 		p = Pattern.compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
 		matcher = p.matcher(headers[3]);
 		if(matcher.find()) {
-			keyValues.put("mem_total",matcher.group(1));
-			keyValues.put("mem_used",matcher.group(2));
-			keyValues.put("mem_free",matcher.group(3));
-			keyValues.put("mem_buffers",matcher.group(4));
+			record.add("mem_total",matcher.group(1));
+			record.add("mem_used",matcher.group(2));
+			record.add("mem_free",matcher.group(3));
+			record.add("mem_buffers",matcher.group(4));
 		}
 		p = Pattern.compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
 		matcher = p.matcher(headers[4]);
 		if(matcher.find()) {
-			keyValues.put("swap_total",matcher.group(1));
-			keyValues.put("swap_used",matcher.group(2));
-			keyValues.put("swap_free",matcher.group(3));
-			keyValues.put("swap_cached",matcher.group(4));
+			record.add("swap_total",matcher.group(1));
+			record.add("swap_used",matcher.group(2));
+			record.add("swap_free",matcher.group(3));
+			record.add("swap_cached",matcher.group(4));
 		}
 		Iterator<String> ki = keyValues.keySet().iterator();
 		while(ki.hasNext()) {
 			String key = ki.next();
-			log.info(key+":"+keyValues.get(key));
+			log.debug(key+":"+keyValues.get(key));
 		}
-		return keyValues;
 	}
 
 	public String getDataType() {

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Torque.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Torque extends AbstractProcessor 
+{
+
+	static Logger log = Logger.getLogger(Torque.class);
+	private SimpleDateFormat sdf = null;
+
+	public Torque()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+	}
+	
+	
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+  throws Throwable
+  {
+		try
+		{
+			String dStr = recordEntry.substring(0, 23);
+			int start = 24;
+			int idx = recordEntry.indexOf(' ', start);
+			start = idx + 1;
+			idx = recordEntry.indexOf(' ', start);
+			String body = recordEntry.substring(idx + 1);
+			body.replaceAll("\n", "");
+			Date d = sdf.parse(dStr);
+			String[] kvpairs = body.split(", ");
+
+			
+			ChukwaRecord record = new ChukwaRecord();
+			String kvpair =  null;
+			String[] halves = null;
+			boolean containRecord=false;
+		    for(int i = 0 ; i < kvpairs.length; ++i) 
+		    {
+		    	kvpair =  kvpairs[i];
+                if(kvpair.indexOf("=")>=0) {
+		    	  halves = kvpair.split("=");
+	    	      record.add(halves[0], halves[1]);
+	    	      containRecord=true;
+                }
+		    }
+		    if(record.containsField("Machine")) {
+		        buildGenericRecord(record,null, d.getTime(), "HodMachine");		    	
+		    } else {
+		        buildGenericRecord(record,null, d.getTime(), "HodJob");
+		    }
+		    if(containRecord) {
+			    output.collect(key, record);
+		    }
+		}
+		catch (ParseException e)
+		{
+			e.printStackTrace();
+			log.warn("Wrong format in Torque [" + recordEntry + "]", e);
+			throw e;
+		}
+		catch (IOException e)
+		{
+			e.printStackTrace();
+			log.warn("Unable to collect output in Torque [" + recordEntry + "]", e);
+			throw e;
+		}
+
+  }
+
+
+	public String getDataType()
+	{
+		return Torque.class.getName();
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,56 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class TsProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(TsProcessor.class);
+	private SimpleDateFormat sdf = null;
+	
+	public TsProcessor()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	 throws Throwable
+	{
+		try
+		{
+			String dStr = recordEntry.substring(0, 23);
+			Date d = sdf.parse(dStr);
+			ChukwaRecord record = new ChukwaRecord();
+			this.buildGenericRecord(record, recordEntry, d.getTime(), chunk.getDataType());
+			output.collect(key, record);
+		} 
+		catch (ParseException e)
+		{
+			log.warn("Unable to parse the date in DefaultProcessor ["
+					+ recordEntry + "]", e);
+			e.printStackTrace();
+			throw e;
+		}
+		catch (IOException e)
+		{
+			log.warn("Unable to collect output in DefaultProcessor ["
+					+ recordEntry + "]", e);
+			e.printStackTrace();
+			throw e;
+		}
+			
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java Fri Dec  5 12:30:14 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 public class UnknownRecordTypeException extends Exception

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatch.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class YWatch extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(YWatch.class);
+
+	private static final String ywatchType = "YWatch";
+	
+	private static String regex= null;
+	
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+
+	public YWatch()
+	{
+		//TODO move that to config
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	 throws Throwable
+	{
+		if (log.isDebugEnabled())
+		{
+			log.debug("YWatchProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		}
+		
+		matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("YWatchProcessor Matches");
+			
+			try
+			{
+				String body = matcher.group(4);
+				
+				try
+				{
+					JSONObject json = new JSONObject(body);
+
+					String poller = json.getString("poller");
+					String host = json.getString("host");
+					String metricName = json.getString("metricName");
+					
+					// Data
+					JSONObject jsonData = json.getJSONObject("data").getJSONObject("data");
+		
+					String jsonTs = null;
+					long ts = Long.parseLong(jsonTs);
+					
+					String jsonValue = null;
+					Iterator<String> it = jsonData.keys();
+					
+					ChukwaRecord record = null;
+					
+					while(it.hasNext())
+					{
+						jsonTs = it.next();
+						jsonValue = jsonData.getString(jsonTs);
+						
+						record = new ChukwaRecord();
+						key = new ChukwaRecordKey();
+						this.buildGenericRecord(record, null, ts, "Ywatch");
+						record.add("poller", poller);
+						record.add("host", host);
+						record.add("metricName", metricName);
+						record.add("value", jsonValue);
+						output.collect(key, record);
+						log.info("YWatchProcessor output 1 metric");
+					}
+					
+				} 
+				catch (IOException e)
+				{
+					log.warn("Unable to collect output in YWatchProcessor [" + recordEntry + "]", e);
+					e.printStackTrace();
+				}
+				catch (JSONException e)
+				{
+					e.printStackTrace();
+					log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
+				}
+				
+			}
+			catch(Exception e)
+			{
+				e.printStackTrace();
+				throw e;
+			}
+		}
+	}
+
+	public String getDataType()
+	{
+		return YWatch.ywatchType;
+	}
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java?rev=723855&r1=723854&r2=723855&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java Fri Dec  5 12:30:14 2008
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
 public class YwatchInvalidEntry extends Exception

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/DuplicateReduceProcessorException.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+public class DuplicateReduceProcessorException  extends RuntimeException
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 7396161798611603019L;
+
+	public DuplicateReduceProcessorException()
+	{
+	}
+
+	public DuplicateReduceProcessorException(String message)
+	{
+		super(message);
+	}
+
+	public DuplicateReduceProcessorException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public DuplicateReduceProcessorException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/IdentityReducer.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,39 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class IdentityReducer implements ReduceProcessor
+{
+
+	@Override
+	public String getDataType()
+	{
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		while(values.hasNext())
+		{
+			try
+			{
+				output.collect(key, values.next());
+			} 
+			catch (IOException e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java?rev=723855&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/JobLogHistoryReduceProcessor.java Fri Dec  5 12:30:14 2008
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class JobLogHistoryReduceProcessor implements ReduceProcessor
+{
+	static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
+	@Override
+	public String getDataType()
+	{
+		return this.getClass().getName();
+	}
+
+	@Override
+	public void process(ChukwaRecordKey key, 
+						Iterator<ChukwaRecord> values,
+						OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+						Reporter reporter)
+	{
+		try
+		{
+			String action = key.getKey();
+			int count = 0;
+			
+			ChukwaRecord record = null;
+			while(values.hasNext())
+			{
+				record = values.next();
+				if (record.containsField("START_TIME"))
+				{
+					count++;
+				}
+				else
+				{
+					count --;
+				}
+			}
+			ChukwaRecordKey newKey = new ChukwaRecordKey();
+			newKey.setKey(""+record.getTime());
+			newKey.setReduceType("MSSRGraph");
+			ChukwaRecord newRecord = new ChukwaRecord();
+			newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
+			newRecord.setTime(record.getTime());
+			newRecord.add("count", "" + count);
+			newRecord.add("JOBID", record.getValue("JOBID"));
+			if (action.indexOf("JobLogHist/Map/") >= 0)
+			{
+				newRecord.add("type", "MAP");
+			}
+			else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0)
+			{
+				newRecord.add("type", "SHUFFLE");
+			}
+			else if (action.indexOf("JobLogHist/SORT/") >= 0)
+			{
+				newRecord.add("type", "SORT");
+			}
+			else if (action.indexOf("JobLogHist/REDUCE/") >= 0)
+			{
+				newRecord.add("type", "REDUCE");
+			}
+				
+			output.collect(newKey, newRecord);
+		} catch (IOException e)
+		{
+			log.warn("Unable to collect output in JobLogHistoryReduceProcessor [" + key + "]", e);
+			e.printStackTrace();
+		}
+
+	}
+
+}



Mime
View raw message