hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685353 [10/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/ src/contrib/chukwa/dist/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/h...
Date Tue, 12 Aug 2008 22:35:23 GMT
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class DebugOutputProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(DebugOutputProcessor.class);
+	public static final String recordType = "Debug";
+		
+	@Override
+	public void parse(String line, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
+		
+		ChukwaRecord record = new ChukwaRecord();
+		buildGenericRecord(record,line, System.currentTimeMillis(),recordType);
+		key.set("" + chunk.getSeqID());
+		try
+		{
+			output.collect(key, record);
+		} catch (IOException e)
+		{
+			e.printStackTrace();
+		}
+	}
+
+	public String getDataType()
+	{
+		return DebugOutputProcessor.recordType;
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,29 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+public class DuplicateProcessorException extends RuntimeException
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 3890267797961057789L;
+
+	public DuplicateProcessorException()
+	{}
+
+	public DuplicateProcessorException(String message)
+	{
+		super(message);
+	}
+
+	public DuplicateProcessorException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public DuplicateProcessorException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+
+
+public class HadoopLogProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(HadoopLogProcessor.class);
+	
+	private static final String recordType = "HadoopLog";
+	private static final String nameNodeType = "NameNode";
+	private static final String dataNodeType = "DataNode";
+	
+	private static String regex= null;
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+	
+	
+	public HadoopLogProcessor()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): ((.*)\n*)*\\z";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+	
+	@Override
+	public void parse(String line, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
+		
+		ChukwaRecord record = new ChukwaRecord();
+		
+		matcher.reset(line);
+		if (matcher.matches())
+		{
+			try
+			{
+				Date d = sdf.parse( matcher.group(0).trim());
+				if (this.chunk.getStreamName().indexOf("datanode") > 0)
+				{
+					buildGenericRecord(record,line,d.getTime(),dataNodeType);
+				}
+				else if (this.chunk.getStreamName().indexOf("namenode") > 0)
+				{
+					buildGenericRecord(record,line,d.getTime(),nameNodeType);
+				}
+				else
+				{
+					buildGenericRecord(record,line,d.getTime(),recordType);
+				}
+				
+				key.set("" + d.getTime());
+				record.add(Record.logLevelField, "" +matcher.group(2));
+				record.add(Record.classField, "" +matcher.group(3));
+				record.add(Record.bodyField, "" +matcher.group(4));
+				output.collect(key, record);
+			}
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+			}
+			catch (IOException e)
+			{
+				e.printStackTrace();
+			}
+		}
+		
+	}
+
+	public String getDataType()
+	{
+		return HadoopLogProcessor.recordType;
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,96 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class HadoopMetricsProcessor extends AbstractProcessor 
+{
+
+	static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
+	
+	private static String regex= null;
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public HadoopMetricsProcessor()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+	
+	
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+  {
+	  
+	  matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("HadoopMetricsProcessor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(0).trim());
+				key.set("" + d.getTime());
+				String body = matcher.group(4);
+				
+				 String[] kvpairs = body.split(" ");
+				    
+				// database
+				DatabaseHelper databaseRecord = new DatabaseHelper("HadoopMetrics");
+				
+			    for(int i = 1 ; i < kvpairs.length; ++i) 
+			    {
+			      String kvpair =  kvpairs[i];
+			      String[] halves = kvpair.split("=");
+			      if(halves[0].equals("chukwa_timestamp"))
+			      {
+			        key.set(halves[1]);
+			      }
+			      else
+			      {
+			    	  databaseRecord.add(d.getTime(), halves[0], halves[1]);
+			      }
+			    }
+			    
+			    //Output NodeActivity info to database
+				output.collect(key, databaseRecord.buildChukwaRecord());
+				log.info("HadoopMetricsProcessor output 1 Hadoop's Metric to database");
+			}
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+				log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", e);
+			}
+			catch (IOException e)
+			{
+				e.printStackTrace();
+				log.warn("Unable to collect output in HadoopMetricsProcessor [" + recordEntry + "]", e);
+			}
+		}
+  }
+
+
+	public String getDataType()
+	{
+		return HadoopMetricsProcessor.class.getName();
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Iostat extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(Iostat.class);
+	public final String recordType = this.getClass().getName();
+	
+	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public Iostat()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		p = Pattern.compile(regex);
+	}
+
+	@Override
+	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		
+		log.debug("Iostat record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		StringBuilder sb = new StringBuilder(); 	 
+		int i = 0;
+		
+		String logLevel = null;
+		String className = null;
+		String body = null;
+		
+		matcher=p.matcher(recordEntry);
+		while (matcher.find())
+		{
+			log.debug("Iostat Processor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(1).trim());
+				
+				logLevel = matcher.group(2);
+				className = matcher.group(3);
+				String hostname = matcher.group(5);
+				
+				//TODO create a more specific key structure
+				// part of ChukwaArchiveKey + record index if needed
+				key.set("" + d.getTime());
+				
+				String[] lines = recordEntry.split("\n");
+				int skip=0;
+				i++;
+				String[] headers = null;
+				while (skip<2 && i < lines.length) {
+					// Skip the first output because the numbers are averaged from system boot up
+					if(lines[i].indexOf("avg-cpu:")>0) {
+						skip++;
+					}
+					i++;					
+				}
+				while (i < lines.length)
+				{
+					DatabaseHelper databaseRecord = null;
+					if(lines[i].equals("")) {
+						i++;
+						headers = parseHeader(lines[i]);
+						i++;
+					}
+					String data[] = parseData(lines[i]);
+					if(headers[0].equals("avg-cpu:")) {
+						log.debug("Matched CPU-Utilization");
+						databaseRecord = new DatabaseHelper("system");
+					} else if(headers[0].equals("Device:")) {
+						log.debug("Matched Iostat");
+						databaseRecord = new DatabaseHelper("system");	
+					} else {
+						log.debug("No match:"+headers[0]);
+					}
+					if(databaseRecord!=null) {
+						int j=0;
+						log.debug("Data Length: " + data.length);
+	                    while(j<data.length) {
+	                    	log.debug("header:"+headers[j]+" data:"+data[j]);
+	                    	if(!headers[j].equals("avg-cpu:")) {
+						        databaseRecord.add(d.getTime(),headers[j],data[j]);
+	                    	}
+						    j++;
+	                    }						
+						//Output Sar info to database
+						output.collect(key, databaseRecord.buildChukwaRecord());
+					}
+					i++;
+				}
+				// End of parsing
+			} catch (Exception e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	public String[] parseHeader(String header) {
+		String[] headers = header.split("\\s+");
+		return headers;
+	}
+
+	public String[] parseData(String dataLine) {
+		String[] data = dataLine.split("\\s+");
+		return data;
+	}
+
+	public String getDataType() {
+		return recordType;
+	}
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class JobLogHistoryProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
+
+	private static final String recordType = "JobLogHistory";
+	private static final String jobEntryTypeField = "entryType";
+	
+	private static String regex = null;
+	private static String internalRegex = null;
+	private static Pattern p = null;
+	private static Pattern ip = null;
+	
+	private Matcher matcher = null;
+	private Matcher internalMatcher = null;
+	
+	private SimpleDateFormat sdf = null;
+
+	public JobLogHistoryProcessor()
+	{
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		
+		internalRegex = "(.*?)=\"(.*?)\"(.*)";
+		regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		
+		ip = Pattern.compile(internalRegex);
+		p = Pattern.compile(regex);
+		
+		internalMatcher = ip.matcher("-");
+		matcher = p.matcher("-");
+	}
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<Text, ChukwaRecord> output, Reporter reporter)
+	{
+
+		String logLevel = null;
+		String className = null;
+		String body = null;
+
+		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+				+ chunk.getDataType() + "]");
+		
+		
+		matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("JobLogHistoryProcessor Matches");
+
+			try
+			{
+				Date d = sdf.parse(matcher.group(1).trim());
+				ChukwaRecord record = new ChukwaRecord();
+
+				logLevel = matcher.group(2);
+				className = matcher.group(3);
+				body = matcher.group(4);
+
+				key.set("" + d.getTime());
+				
+				record.add(Record.logLevelField, logLevel);
+				record.add(Record.classField, className);
+				
+				int firstSep = body.indexOf(" ");
+				String logEntryType = body.substring(0,firstSep);
+				record.add(jobEntryTypeField, logEntryType);
+				
+				internalMatcher.reset(body.substring(firstSep));
+				String fieldName = null;
+				String fieldValue = null;
+				while (internalMatcher.matches())
+				{
+					fieldName = internalMatcher.group(1).trim();
+					fieldValue = internalMatcher.group(2).trim();
+					record.add(fieldName, fieldValue);
+					log.info("JobLogHistoryProcessor Add field: [" + fieldName + "][" + fieldValue +"]" );
+					internalMatcher.reset(internalMatcher.group(3));
+				}
+
+				buildGenericRecord(record, body, d.getTime(), JobLogHistoryProcessor.recordType);
+				output.collect(key, record);
+				log.info("JobLogHistoryProcessor outputing a record ============>>>>>");
+			} 
+			catch (IOException e)
+			{
+				e.printStackTrace();
+			} 
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+			}
+
+		}
+	}
+
+	public String getDataType()
+	{
+		return JobLogHistoryProcessor.recordType;
+	}
+
+	public static void main(String[] args)
+	{
+		
+		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		
+		internalRegex = "(.*?)=\"(.*?)\"(.*)";
+		regex = "([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		
+		ip = Pattern.compile(internalRegex);
+		p = Pattern.compile(regex);
+		
+		Matcher internalMatcher = ip.matcher("-");
+		Matcher matcher = p.matcher("-");
+	
+		String log = "2008-07-28 23:30:38,865 INFO org.apache.hadoop.chukwa.ChukwaJobHistory: Task TASKID=\"task_200807282329_0001_m_000000\" TASK_TYPE=\"MAP\" START_TIME=\"1217287838862\" SPLITS=\"/default-rack/somehost3.example.com,/default-rack/somehost2.example.com,/default-rack/somehost.example.com\"";
+		 matcher.reset(log);
+		
+		if (matcher.matches())
+		{
+			System.out.println("JobLogHistoryProcessor Matches");
+
+			try
+			{
+				Date d = sdf.parse(matcher.group(1).trim());
+				System.out.println(d);
+				ChukwaRecord record = new ChukwaRecord();
+
+				String logLevel = matcher.group(2);
+				String className = matcher.group(3);
+				String body = matcher.group(4);
+
+				System.out.println(matcher.group(1));
+				System.out.println(matcher.group(2));
+				System.out.println(matcher.group(3));
+				System.out.println(matcher.group(4));
+				
+				System.out.println(body); 
+				
+				record.add(Record.logLevelField, logLevel);
+				record.add(Record.classField, className);
+				
+				int firstSep = body.indexOf(" ");
+				System.out.println(firstSep);
+				String logEntryType = body.substring(0,firstSep);
+				record.add(jobEntryTypeField, logEntryType);
+				
+				internalMatcher.reset(body.substring(firstSep));
+				String fieldName = null;
+				String fieldValue = null;
+				while (internalMatcher.matches())
+				{
+					fieldName = internalMatcher.group(1).trim();
+					fieldValue = internalMatcher.group(2).trim();
+					record.add(fieldName, fieldValue);
+					System.out.println("JobLogHistoryProcessor Add field: [" + fieldName + "][" + fieldValue +"]" );
+					internalMatcher.reset(internalMatcher.group(3));
+				}
+
+				
+				System.out.println("JobLogHistoryProcessor outputing a record ============>>>>>");
+			}
+			catch(Exception e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,29 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+public class PbsInvalidEntry extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 9154096600390233023L;
+
+	public PbsInvalidEntry()
+	{}
+
+	public PbsInvalidEntry(String message)
+	{
+		super(message);
+	}
+
+	public PbsInvalidEntry(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public PbsInvalidEntry(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodesProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodesProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodesProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodesProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,256 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class PbsNodesProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(PbsNodesProcessor.class);
+
+	private static final String rawPBSRecordType = "PbsNodes";
+	private static final String machinePBSRecordType = "MachinePbsNodes";
+	
+	private static String regex= null;
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public PbsNodesProcessor()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): ((.*)\n*)*\\z";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+
+	@Override
+	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		
+		log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		
+		
+
+		StringBuilder sb = new StringBuilder(); 	 
+		int i = 0;
+		String nodeActivityStatus = null;
+		int totalFreeNode = 0;
+		int totalUsedNode = 0;
+		int totalDownNode = 0;
+		
+		String logLevel = null;
+		String className = null;
+		String body = null;
+		ChukwaRecord record = null;
+		
+		matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("PbsNodeProcessor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(0).trim());
+				
+				logLevel = matcher.group(2);
+				className = matcher.group(3);
+				body = matcher.group(4);
+				
+				// Raw PBS output
+				/*
+			    record = new ChukwaRecord();
+				buildGenericRecord(record,recordEntry,d.getTime(),rawPBSRecordType);
+				
+				//TODO create a more specific key structure
+				// part of ChukwaArchiveKey + record index if needed
+				key.set("" + d.getTime());
+				
+				record.add(Record.logLevelField, logLevel);
+				record.add(Record.classField, className);
+				record.add(Record.bodyField, body);
+				//Output PbsNode record for all machines (raw information)
+				output.collect(key, record);
+				log.info("PbsNodeProcessor output 1 record");
+				*/
+				
+				// TODO if we're not saving the PBS' output, we don't need to parse and save it ...
+				String[] lines = recordEntry.split("\n");
+				while (i < lines.length)
+				{
+					while ((i < lines.length) && (lines[i].trim().length() > 0))
+					{
+						sb.append(lines[i].trim()).append("\n");
+						i++;
+					}
+
+					 if ( (i< lines.length) && (lines[i].trim().length() > 0) )
+					 {
+					 throw new PbsInvalidEntry(recordEntry);
+					 }
+
+					// Empty line
+					i++;
+
+					if (sb.length() > 0)
+					{
+						body =  sb.toString();
+						// Process all entries for a machine
+						System.out.println("=========>>> Record [" + body+ "]");
+						
+						record = new ChukwaRecord();
+						buildGenericRecord(record,body,d.getTime(),machinePBSRecordType);
+						
+						// TODO Change the key information
+						key.set("" + d.getTime());
+						record.add(Record.logLevelField, logLevel);
+						record.add(Record.classField, className);
+						record.add(Record.bodyField, body);
+						
+						parsePbsRecord(body, record);
+						
+						//Output PbsNode record for 1 machine
+						output.collect(key, record);
+						log.info("PbsNodeProcessor output 1 sub-record");
+						//compute Node Activity information
+						nodeActivityStatus = record.getValue("state");
+						if (nodeActivityStatus != null)
+						{
+							if (nodeActivityStatus.equals("free"))
+							{
+								totalFreeNode ++;
+							}
+							else if (nodeActivityStatus.equals("job-exclusive"))
+							{
+								totalUsedNode ++;
+							}
+							else
+							{
+								totalDownNode ++;
+							}					
+						}
+						sb = new StringBuilder();
+					}
+				}
+				
+				// End of parsing
+
+				// database (long timestamp,String table,String tag,int tableType,int sqlType)
+				DatabaseHelper databaseRecord = new DatabaseHelper("NodeActivity");
+				
+				// Data
+				databaseRecord.add(d.getTime(),"Used",""+totalUsedNode);
+				databaseRecord.add(d.getTime(),"Free",""+totalFreeNode);
+				databaseRecord.add(d.getTime(),"Down",""+totalDownNode);
+				
+				//Output NodeActivity info to database
+				output.collect(key, databaseRecord.buildChukwaRecord());
+				log.info("PbsNodeProcessor output 1 NodeActivity to database");
+				
+				// INFO if you need to save NodeActivity info to HDFS
+				// use the following block
+
+//				// Save Node Activity information
+//				String nodeActivity = "NodeActivity:  totalFreeNode=" + totalFreeNode 
+//				+ ",totalUsedNode=" + totalUsedNode 
+//				+ ",totalDownNode=" + totalDownNode;
+//
+//				record = new ChukwaRecord();
+//				buildGenericRecord(record,nodeActivity,d.getTime(),nodeActivityRecordType);
+//				//Used,Free,Down
+//				record.add("Used",""+totalUsedNode);
+//				record.add("Free",""+totalFreeNode);
+//				record.add("Down",""+totalDownNode);
+//				//Output NodeActivity info to HDFS
+//				output.collect(key, record);
+//				log.info("PbsNodeProcessor output 1 NodeActivity to HDFS");
+					
+			}
+			catch (ParseException e)
+			{
+				e.printStackTrace();
+				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+			}
+			catch (IOException e)
+			{
+				log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + "]", e);
+				e.printStackTrace();
+			}
+			catch (PbsInvalidEntry e)
+			{
+				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+				e.printStackTrace();
+			}
+		}
+
+		
+	}
+
+	protected static void parsePbsRecord(String recordLine, ChukwaRecord record)
+	{
+		int i = 0;
+		String[] lines = recordLine.split("\n");
+		System.out.println("Machine=" + lines[i]);
+		i++;
+		String[] data = null;
+		while (i < lines.length)
+		{
+			data = extractFields(lines[i]);
+			System.out.println("[" + data[0].trim() + "] ==> ["
+					+ data[1].trim() + "]");
+			
+			record.add(data[0].trim(), data[1].trim());
+			
+			if (data[0].trim().equalsIgnoreCase("status"))
+			{
+				parseStatusField(data[1].trim(), record);
+			}
+			i++;
+		}
+	}
+
+	protected static void parseStatusField(String statusField,
+			ChukwaRecord record)
+	{
+		String[] data = null;
+		String[] subFields = statusField.trim().split(",");
+		for (String subflied : subFields)
+		{
+			data = extractFields(subflied);
+			record.add(data[0].trim(), data[1].trim());
+			System.out.println("[status." + data[0].trim() + "] ==> ["
+					+ data[1].trim() + "]");
+		}
+	}
+
+
+	static String[] extractFields(String line)
+	{
+		String[] args = new String[2];
+		int index = line.indexOf("=");
+		args[0] = line.substring(0,index );
+		args[1] = line.substring(index + 1);
+
+		return args;
+	}
+
+	public String getDataType()
+	{
+		return PbsNodesProcessor.rawPBSRecordType;
+	}
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+
+
+public class ProcessorFactory
+{
+	static Logger log = Logger.getLogger(ProcessorFactory.class);
+	
+	// TODO
+	//	add new mapper package at the end.
+	//	We should have a more generic way to do this.
+	//	Ex: read from config
+	//	list of alias
+	//	and
+	//	alias -> processor class
+	
+	
+	private static HashMap<String,ChunkProcessor > processors =
+	    new HashMap<String, ChunkProcessor>(); // registry
+		
+	private ProcessorFactory()
+	{}
+	
+	public static ChunkProcessor getProcessor(String recordType)
+	 throws UnknownRecordTypeException
+	{
+		String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"+recordType;
+		if (processors.containsKey(recordType)) {
+			return processors.get(recordType);
+		} else {
+			ChunkProcessor processor = null;
+			try {
+				processor = (ChunkProcessor)Class.forName(path).getConstructor().newInstance();
+			} catch(ClassNotFoundException e) {
+				throw new UnknownRecordTypeException("Unknown recordType:" + recordType, e);			
+			} catch(Exception e) {
+			  throw new UnknownRecordTypeException("error constructing processor", e);
+			}
+			
+			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
+			// to reuse the same processor again and again
+			register(recordType,processor);
+			return processor;
+		}
+	}
+	
+	  /** Register a specific parser for a {@link ChunkProcessor}
+	   * implementation. */
+	  public static synchronized void register(String recordType,
+	                                         ChunkProcessor processor) 
+	  {
+		  log.info("register " + processor.getClass().getName() + " for this recordType :" + recordType);
+		  if (processors.containsKey(recordType))
+			{
+			  throw new DuplicateProcessorException("Duplicate processor for recordType:" + recordType);
+			}
+		  ProcessorFactory.processors.put(recordType, processor);
+	  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+public class Sar extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(Sar.class);
+	public final String recordType = this.getClass().getName();
+
+	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public Sar()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		p = Pattern.compile(regex);
+	}
+
+	@Override
+	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		
+		log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		StringBuilder sb = new StringBuilder(); 	 
+		int i = 0;
+		
+		String logLevel = null;
+		String className = null;
+		String body = null;
+		
+		matcher=p.matcher(recordEntry);
+		while (matcher.find())
+		{
+			log.debug("Sar Processor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(1).trim());
+				
+				logLevel = matcher.group(2);
+				className = matcher.group(3);
+				String hostname = matcher.group(5);
+				
+				//TODO create a more specific key structure
+				// part of ChukwaArchiveKey + record index if needed
+				key.set("" + d.getTime());
+				
+				String[] lines = recordEntry.split("\n");
+				
+				
+				String[] headers = null;
+				while(i < (lines.length-1) && lines[i+1].indexOf("Average:")<0) {
+					// Skip to the average lines
+					log.debug("skip:"+lines[i]);
+				    i++;
+				}
+				while (i < lines.length)
+				{
+					DatabaseHelper databaseRecord = null;
+					if(lines[i].equals("")) {
+						i++;
+						headers = parseHeader(lines[i]);
+						i++;
+					}
+					String data[] = parseData(lines[i]);
+					if(headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) {
+						log.debug("Matched Sar-Network");
+						databaseRecord = new DatabaseHelper("system");
+					} else if(headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) {
+						log.debug("Matched Sar-Network");
+						databaseRecord = new DatabaseHelper("system");	
+					} else if(headers[1].equals("kbmemfree")) {
+						log.debug("Matched Sar-Memory");
+						databaseRecord = new DatabaseHelper("system");	
+					} else if(headers[1].equals("totsck")) {
+						log.debug("Matched Sar-NetworkSockets");
+						databaseRecord = new DatabaseHelper("system");	
+					} else if(headers[1].equals("runq-sz")) {
+						log.debug("Matched Sar-LoadAverage");
+						databaseRecord = new DatabaseHelper("system");	
+					} else {
+						log.debug("No match:"+headers[1]+" "+headers[2]);
+					}
+					if(databaseRecord!=null) {
+						int j=0;
+						log.debug("Data Length: " + data.length);
+	                    while(j<data.length) {
+	                    	log.debug("header:"+headers[j]+" data:"+data[j]);
+	                    	if(!headers[j].equals("Average:")) {
+						        databaseRecord.add(d.getTime(),headers[j],data[j]);
+	                    	}
+						    j++;
+	                    }						
+						//Output Sar info to database
+						output.collect(key, databaseRecord.buildChukwaRecord());
+					}
+					i++;
+				}
+				// End of parsing
+			} catch (Exception e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	public String[] parseHeader(String header) {
+		String[] headers = header.split("\\s+");
+		return headers;
+	}
+
+	public String[] parseData(String dataLine) {
+		String[] data = dataLine.split("\\s+");
+		return data;
+	}
+
+	public String getDataType() {
+		return recordType;
+	}
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Top.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import java.util.HashMap;
+
+public class Top extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(Top.class);
+	public final String recordType = this.getClass().getName();
+
+	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): ";
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public Top()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		p = Pattern.compile(regex);
+	}
+
+	@Override
+	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		
+		log.info("Top record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		StringBuilder sb = new StringBuilder(); 	 
+		
+		String logLevel = null;
+		String className = null;
+		String body = null;
+		
+		matcher=p.matcher(recordEntry);
+		while (matcher.find())
+		{
+			log.info("Top Processor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(1).trim());
+				
+				logLevel = matcher.group(2);
+				className = matcher.group(3);
+				
+				//TODO create a more specific key structure
+				// part of ChukwaArchiveKey + record index if needed
+				key.set("" + d.getTime());
+				String[] lines = recordEntry.split("\n");
+				int i = 0;
+				String summaryString = "";
+				while(!lines[i].equals("")) {
+					summaryString = summaryString + lines[i] + "\n";
+				    i++;
+				}
+				i++;
+				String[] headers = lines[i].split("\\s+");
+				HashMap<String, String>summary = parseSummary(summaryString);
+				DatabaseHelper databaseRecord = new DatabaseHelper("system");
+				Iterator<String> ki = summary.keySet().iterator();
+				while(ki.hasNext()) {
+					String key = ki.next();
+				    databaseRecord.add(d.getTime(),key, summary.get(key));
+				}
+				output.collect(key, databaseRecord.buildChukwaRecord());
+				while (i < lines.length)
+				{
+					databaseRecord = null;
+					String data[] = lines[i].split("\\s+",headers.length);
+					if(lines[i].indexOf("PID USER")<0) {
+						databaseRecord = new DatabaseHelper("system");	
+					}
+					if(databaseRecord!=null) {
+						int j=0;
+						log.debug("Data Length: " + data.length);
+	                    while(j<data.length-1) {
+	                    	if(headers[0].equals("")) {
+		                    	log.debug("header:"+headers[j+1]+" data:"+data[j+1]);
+	                    		databaseRecord.add(d.getTime(),headers[j+1],data[j+1]);
+	                    	} else {
+		                    	log.debug("header:"+headers[j+1]+" data:"+data[j]);
+							    databaseRecord.add(d.getTime(),headers[j+1],data[j]);
+	                    	}
+						    j++;
+	                    }						
+						//Output Sar info to database
+						output.collect(key, databaseRecord.buildChukwaRecord());
+					}
+					i++;
+				}
+				// End of parsing
+			} catch (Exception e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+	
+	public HashMap<String, String> parseSummary(String header) {
+		HashMap<String, String> keyValues = new HashMap<String, String>();
+		String[] headers = header.split("\n");
+		int i=0;
+		Pattern p = Pattern.compile("top - (.*?) up (.*?),\\s+(\\d+) users");
+		Matcher matcher = p.matcher(headers[0]);
+		if(matcher.find()) {
+            keyValues.put("uptime",matcher.group(2));
+            keyValues.put("users",matcher.group(3));
+		}
+		p = Pattern.compile("Tasks:\\s+(\\d+) total,\\s+(\\d+) running,\\s+(\\d+) sleeping,\\s+(\\d+) stopped,\\s+(\\d+) zombie");
+		matcher = p.matcher(headers[1]);
+		if(matcher.find()) {
+            keyValues.put("tasks_total",matcher.group(1));
+            keyValues.put("tasks_running",matcher.group(2));
+            keyValues.put("tasks_sleeping",matcher.group(3));
+            keyValues.put("tasks_stopped",matcher.group(4));
+            keyValues.put("tasks_zombie",matcher.group(5));
+		}
+		p = Pattern.compile("Cpu\\(s\\):\\s+(.*?)% us,\\s+(.*?)% sy,\\s+(.*?)% ni,\\s+(.*?)% id,\\s+(.*?)% wa,\\s+(.*?)% hi,\\s+(.*?)% si");
+		matcher = p.matcher(headers[2]);
+		if(matcher.find()) {
+            keyValues.put("cpu_user%",matcher.group(1));
+            keyValues.put("cpu_sys%",matcher.group(2));
+            keyValues.put("cpu_nice%",matcher.group(3));
+            keyValues.put("cpu_wait%",matcher.group(4));
+            keyValues.put("cpu_hi%",matcher.group(5));
+            keyValues.put("cpu_si%",matcher.group(6));
+		}
+		p = Pattern.compile("Mem:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k buffers");
+		matcher = p.matcher(headers[3]);
+		if(matcher.find()) {
+			keyValues.put("mem_total",matcher.group(1));
+			keyValues.put("mem_used",matcher.group(2));
+			keyValues.put("mem_free",matcher.group(3));
+			keyValues.put("mem_buffers",matcher.group(4));
+		}
+		p = Pattern.compile("Swap:\\s+(.*?)k total,\\s+(.*?)k used,\\s+(.*?)k free,\\s+(.*?)k cached");
+		matcher = p.matcher(headers[4]);
+		if(matcher.find()) {
+			keyValues.put("swap_total",matcher.group(1));
+			keyValues.put("swap_used",matcher.group(2));
+			keyValues.put("swap_free",matcher.group(3));
+			keyValues.put("swap_cached",matcher.group(4));
+		}
+		Iterator<String> ki = keyValues.keySet().iterator();
+		while(ki.hasNext()) {
+			String key = ki.next();
+			log.info(key+":"+keyValues.get(key));
+		}
+		return keyValues;
+	}
+
+	public String getDataType() {
+		return recordType;
+	}
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/UnknownRecordTypeException.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,29 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+public class UnknownRecordTypeException extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 8925135975093252279L;
+
+	public UnknownRecordTypeException()
+	{}
+
+	public UnknownRecordTypeException(String message)
+	{
+		super(message);
+	}
+
+	public UnknownRecordTypeException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public UnknownRecordTypeException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatchProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatchProcessor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatchProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YWatchProcessor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,125 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.chukwa.extraction.database.DatabaseHelper;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class YWatchProcessor extends AbstractProcessor
+{
+	static Logger log = Logger.getLogger(PbsNodesProcessor.class);
+
+	private static final String ywatchType = "YWatch";
+	
+	private static String regex= null;
+	
+	private static Pattern p = null;
+	
+	private Matcher matcher = null;
+	private SimpleDateFormat sdf = null;
+
+	public YWatchProcessor()
+	{
+		//TODO move that to config
+		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+		regex="([0-9]{4}\\-[0-9]{2}\\-[0-9]{2} [0-9]{2}\\:[0-9]{2}:[0-9]{2},[0-9]{3}) (INFO|DEBUG|ERROR|WARN) (.*?): (.*)";
+		p = Pattern.compile(regex);
+		matcher = p.matcher("-");
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void parse(String recordEntry, OutputCollector<Text, ChukwaRecord> output,
+			Reporter reporter)
+	{
+		
+		log.info("YWatchProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
+		
+		
+		matcher.reset(recordEntry);
+		if (matcher.matches())
+		{
+			log.info("YWatchProcessor Matches");
+			
+			try
+			{
+				Date d = sdf.parse( matcher.group(0).trim());
+				key.set("" + d.getTime());
+				String body = matcher.group(4);
+				
+				try
+				{
+					JSONObject json = new JSONObject(body);
+					
+					// database
+					DatabaseHelper databaseRecord = new DatabaseHelper("switches");//
+					String tag = json.getString("poller") + "-" + json.getString("host");
+					String metricName = json.getString("metricName");
+					
+					// Data
+					JSONObject jsonData = json.getJSONObject("data").getJSONObject("data");
+					
+				
+					long ts = 0;
+					double value = 0;
+					String jsonTs = null;
+					String jsonValue = null;
+					Iterator<String> it = jsonData.keys();
+					while(it.hasNext())
+					{
+						jsonTs = it.next();
+						jsonValue = jsonData.getString(jsonTs);
+						try
+						{
+							value = Double.parseDouble(jsonValue);
+							ts = Long.parseLong(jsonTs);
+							databaseRecord.add(ts,metricName,value);
+						}
+						catch(NumberFormatException e)
+						{
+							if (!jsonValue.equalsIgnoreCase("N/A"))
+							{
+								throw new YwatchInvalidEntry("YWatchProcessor invalid entry [" + body + "]");
+							}
+						}
+					}
+					
+					output.collect(key, databaseRecord.buildChukwaRecord());
+					log.info("YWatchProcessor output 1 metric to database");
+					
+				} 
+				catch (IOException e)
+				{
+					log.warn("Unable to collect output in YWatchProcessor [" + recordEntry + "]", e);
+					e.printStackTrace();
+				}
+				catch (JSONException e)
+				{
+					e.printStackTrace();
+					log.warn("Wrong format in YWatchProcessor [" + recordEntry + "]", e);
+				}
+				
+			}
+			catch(Exception e)
+			{
+				e.printStackTrace();
+			}
+		}
+	}
+
+	public String getDataType()
+	{
+		return YWatchProcessor.ywatchType;
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/YwatchInvalidEntry.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,30 @@
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+public class YwatchInvalidEntry extends Exception
+{
+
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 7074989443687516732L;
+
+	public YwatchInvalidEntry()
+	{
+	}
+
+	public YwatchInvalidEntry(String message)
+	{
+		super(message);
+	}
+
+	public YwatchInvalidEntry(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public YwatchInvalidEntry(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.record.Buffer;
+
+public class ChukwaRecord extends ChukwaRecordJT
+implements Record
+{
+	public ChukwaRecord()
+	{}
+	
+	
+	
+	public void add(String key, String value)
+	{
+		synchronized(this)
+		{
+			if (this.mapFields == null)
+			{
+				this.mapFields = new TreeMap<String,org.apache.hadoop.record.Buffer>();
+			}
+		}
+		this.mapFields.put(key, new Buffer(value.getBytes()));
+	}
+	
+	public String[] getFields()
+	{
+		return this.mapFields.keySet().toArray(new String[0]);
+	}
+
+	public String getValue(String field)
+	{
+		if (this.mapFields.containsKey(field))
+		{
+			return new String(this.mapFields.get(field).get());		
+		}
+		else
+		{
+			return null;
+		}
+	}
+
+	public boolean containsField(String field)
+	{
+		return this.mapFields.containsKey(field);
+	}
+
+	@Override
+	public String toString()
+	{
+		Set <Map.Entry<String,Buffer>> f = this.mapFields.entrySet();
+		Iterator <Map.Entry<String,Buffer>> it = f.iterator();
+		
+		Map.Entry<String,Buffer> entry = null;
+		StringBuilder sb = new StringBuilder();
+		sb.append("<event  ");
+		String body = null;
+		String key = null;
+		String val = null;
+		
+		while (it.hasNext())
+		{
+			entry = it.next();
+			key = entry.getKey().intern();
+			val = new String(entry.getValue().get());
+			if (key == Record.rawField.intern())
+			{
+				continue;
+			}
+			
+			if (key == Record.bodyField.intern())
+			{
+				body = val;
+			}
+			else
+			{
+				sb.append(entry.getKey()).append("=\"").append(val).append("\" ");
+			}
+		}
+		sb.append(">").append(body);
+		sb.append("</event>");
+		
+		return sb.toString();
+//		//<event start="Jun 15 2008 00:00:00" end="Jun 15 2008 12:00:00" title="hello" link="/here">body</event>
+//		return 	"<event start=\"" + formatter.format(new Date(this.getTime())) + "\" title=\""
+//		+  this.getValue(Record.sourceField)   + "\" >" + this.getValue(Record.bodyField) + "</event>" ;
+	}
+
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// File generated by hadoop record compiler. Do not edit.
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class ChukwaRecordJT extends org.apache.hadoop.record.Record {
+  private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo;
+  private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter;
+  private static int[] _rio_rtiFilterFields;
+  static {
+    _rio_recTypeInfo = new org.apache.hadoop.record.meta.RecordTypeInfo("ChukwaRecordJT");
+    _rio_recTypeInfo.addField("time", org.apache.hadoop.record.meta.TypeID.LongTypeID);
+    _rio_recTypeInfo.addField("mapFields", new org.apache.hadoop.record.meta.MapTypeID(org.apache.hadoop.record.meta.TypeID.StringTypeID, org.apache.hadoop.record.meta.TypeID.BufferTypeID));
+  }
+  
+  protected long time;
+  protected java.util.TreeMap<String,org.apache.hadoop.record.Buffer> mapFields;
+  public ChukwaRecordJT() { }
+  public ChukwaRecordJT(
+    final long time,
+    final java.util.TreeMap<String,org.apache.hadoop.record.Buffer> mapFields) {
+    this.time = time;
+    this.mapFields = mapFields;
+  }
+  public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
+    return _rio_recTypeInfo;
+  }
+  public static void setTypeFilter(org.apache.hadoop.record.meta.RecordTypeInfo rti) {
+    if (null == rti) return;
+    _rio_rtiFilter = rti;
+    _rio_rtiFilterFields = null;
+  }
+  private static void setupRtiFields()
+  {
+    if (null == _rio_rtiFilter) return;
+    // we may already have done this
+    if (null != _rio_rtiFilterFields) return;
+    int _rio_i, _rio_j;
+    _rio_rtiFilterFields = new int [_rio_rtiFilter.getFieldTypeInfos().size()];
+    for (_rio_i=0; _rio_i<_rio_rtiFilterFields.length; _rio_i++) {
+      _rio_rtiFilterFields[_rio_i] = 0;
+    }
+    java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_itFilter = _rio_rtiFilter.getFieldTypeInfos().iterator();
+    _rio_i=0;
+    while (_rio_itFilter.hasNext()) {
+      org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfoFilter = _rio_itFilter.next();
+      java.util.Iterator<org.apache.hadoop.record.meta.FieldTypeInfo> _rio_it = _rio_recTypeInfo.getFieldTypeInfos().iterator();
+      _rio_j=1;
+      while (_rio_it.hasNext()) {
+        org.apache.hadoop.record.meta.FieldTypeInfo _rio_tInfo = _rio_it.next();
+        if (_rio_tInfo.equals(_rio_tInfoFilter)) {
+          _rio_rtiFilterFields[_rio_i] = _rio_j;
+          break;
+        }
+        _rio_j++;
+      }
+      _rio_i++;
+    }
+  }
+  public long getTime() {
+    return time;
+  }
+  public void setTime(final long time) {
+    this.time=time;
+  }
+  public java.util.TreeMap<String,org.apache.hadoop.record.Buffer> getMapFields() {
+    return mapFields;
+  }
+  public void setMapFields(final java.util.TreeMap<String,org.apache.hadoop.record.Buffer> mapFields) {
+    this.mapFields=mapFields;
+  }
+  public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(this,_rio_tag);
+    _rio_a.writeLong(time,"time");
+    {
+      _rio_a.startMap(mapFields,"mapFields");
+      java.util.Set<java.util.Map.Entry<String,org.apache.hadoop.record.Buffer>> _rio_es1 = mapFields.entrySet();
+      for(java.util.Iterator<java.util.Map.Entry<String,org.apache.hadoop.record.Buffer>> _rio_midx1 = _rio_es1.iterator(); _rio_midx1.hasNext();) {
+        java.util.Map.Entry<String,org.apache.hadoop.record.Buffer> _rio_me1 = _rio_midx1.next();
+        String _rio_k1 = _rio_me1.getKey();
+        org.apache.hadoop.record.Buffer _rio_v1 = _rio_me1.getValue();
+        _rio_a.writeString(_rio_k1,"_rio_k1");
+        _rio_a.writeBuffer(_rio_v1,"_rio_v1");
+      }
+      _rio_a.endMap(mapFields,"mapFields");
+    }
+    _rio_a.endRecord(this,_rio_tag);
+  }
+  private void deserializeWithoutFilter(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    _rio_a.startRecord(_rio_tag);
+    time=_rio_a.readLong("time");
+    {
+      org.apache.hadoop.record.Index _rio_midx1 = _rio_a.startMap("mapFields");
+      mapFields=new java.util.TreeMap<String,org.apache.hadoop.record.Buffer>();
+      for (; !_rio_midx1.done(); _rio_midx1.incr()) {
+        String _rio_k1;
+        _rio_k1=_rio_a.readString("_rio_k1");
+        org.apache.hadoop.record.Buffer _rio_v1;
+        _rio_v1=_rio_a.readBuffer("_rio_v1");
+        mapFields.put(_rio_k1,_rio_v1);
+      }
+      _rio_a.endMap("mapFields");
+    }
+    _rio_a.endRecord(_rio_tag);
+  }
+  public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
+  throws java.io.IOException {
+    if (null == _rio_rtiFilter) {
+      deserializeWithoutFilter(_rio_a, _rio_tag);
+      return;
+    }
+    // if we're here, we need to read based on version info
+    _rio_a.startRecord(_rio_tag);
+    setupRtiFields();
+    for (int _rio_i=0; _rio_i<_rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
+      if (1 == _rio_rtiFilterFields[_rio_i]) {
+        time=_rio_a.readLong("time");
+      }
+      else if (2 == _rio_rtiFilterFields[_rio_i]) {
+        {
+          org.apache.hadoop.record.Index _rio_midx1 = _rio_a.startMap("mapFields");
+          mapFields=new java.util.TreeMap<String,org.apache.hadoop.record.Buffer>();
+          for (; !_rio_midx1.done(); _rio_midx1.incr()) {
+            String _rio_k1;
+            _rio_k1=_rio_a.readString("_rio_k1");
+            org.apache.hadoop.record.Buffer _rio_v1;
+            _rio_v1=_rio_a.readBuffer("_rio_v1");
+            mapFields.put(_rio_k1,_rio_v1);
+          }
+          _rio_a.endMap("mapFields");
+        }
+      }
+      else {
+        java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>)(_rio_rtiFilter.getFieldTypeInfos());
+        org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i).getFieldID(), typeInfos.get(_rio_i).getTypeID());
+      }
+    }
+    _rio_a.endRecord(_rio_tag);
+  }
+  public int compareTo (final Object _rio_peer_) throws ClassCastException {
+    if (!(_rio_peer_ instanceof ChukwaRecordJT)) {
+      throw new ClassCastException("Comparing different types of records.");
+    }
+    ChukwaRecordJT _rio_peer = (ChukwaRecordJT) _rio_peer_;
+    int _rio_ret = 0;
+    _rio_ret = (time == _rio_peer.time)? 0 :((time<_rio_peer.time)?-1:1);
+    if (_rio_ret != 0) return _rio_ret;
+    {
+      java.util.Set<String> _rio_set10 = mapFields.keySet();
+      java.util.Set<String> _rio_set20 = _rio_peer.mapFields.keySet();
+      java.util.Iterator<String> _rio_miter10 = _rio_set10.iterator();
+      java.util.Iterator<String> _rio_miter20 = _rio_set20.iterator();
+      for(; _rio_miter10.hasNext() && _rio_miter20.hasNext();) {
+        String _rio_k10 = _rio_miter10.next();
+        String _rio_k20 = _rio_miter20.next();
+        _rio_ret = _rio_k10.compareTo(_rio_k20);
+        if (_rio_ret != 0) { return _rio_ret; }
+      }
+      _rio_ret = (_rio_set10.size() - _rio_set20.size());
+    }
+    if (_rio_ret != 0) return _rio_ret;
+    return _rio_ret;
+  }
+  public boolean equals(final Object _rio_peer_) {
+    if (!(_rio_peer_ instanceof ChukwaRecordJT)) {
+      return false;
+    }
+    if (_rio_peer_ == this) {
+      return true;
+    }
+    ChukwaRecordJT _rio_peer = (ChukwaRecordJT) _rio_peer_;
+    boolean _rio_ret = false;
+    _rio_ret = (time==_rio_peer.time);
+    if (!_rio_ret) return _rio_ret;
+    _rio_ret = mapFields.equals(_rio_peer.mapFields);
+    if (!_rio_ret) return _rio_ret;
+    return _rio_ret;
+  }
+  public Object clone() throws CloneNotSupportedException {
+    ChukwaRecordJT _rio_other = new ChukwaRecordJT();
+    _rio_other.time = this.time;
+    _rio_other.mapFields = (java.util.TreeMap<String,org.apache.hadoop.record.Buffer>) this.mapFields.clone();
+    return _rio_other;
+  }
+  public int hashCode() {
+    int _rio_result = 17;
+    int _rio_ret;
+    _rio_ret = (int) (time^(time>>>32));
+    _rio_result = 37*_rio_result + _rio_ret;
+    _rio_ret = mapFields.hashCode();
+    _rio_result = 37*_rio_result + _rio_ret;
+    return _rio_result;
+  }
+  public static String signature() {
+    return "LChukwaRecordJT(l{sB})";
+  }
+  public static class Comparator extends org.apache.hadoop.record.RecordComparator {
+    public Comparator() {
+      super(ChukwaRecordJT.class);
+    }
+    static public int slurpRaw(byte[] b, int s, int l) {
+      try {
+        int os = s;
+        {
+          long i = org.apache.hadoop.record.Utils.readVLong(b, s);
+          int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+          s+=z; l-=z;
+        }
+        {
+          int mi1 = org.apache.hadoop.record.Utils.readVInt(b, s);
+          int mz1 = org.apache.hadoop.record.Utils.getVIntSize(mi1);
+          s+=mz1; l-=mz1;
+          for (int midx1 = 0; midx1 < mi1; midx1++) {{
+              int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+              int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+              s+=(z+i); l-= (z+i);
+            }
+            {
+              int i = org.apache.hadoop.record.Utils.readVInt(b, s);
+              int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+              s += z+i; l -= (z+i);
+            }
+          }
+        }
+        return (os - s);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    static public int compareRaw(byte[] b1, int s1, int l1,
+                                   byte[] b2, int s2, int l2) {
+      try {
+        int os1 = s1;
+        {
+          long i1 = org.apache.hadoop.record.Utils.readVLong(b1, s1);
+          long i2 = org.apache.hadoop.record.Utils.readVLong(b2, s2);
+          if (i1 != i2) {
+            return ((i1-i2) < 0) ? -1 : 0;
+          }
+          int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+          int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+          s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+        }
+        {
+          int mi11 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+          int mi21 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+          int mz11 = org.apache.hadoop.record.Utils.getVIntSize(mi11);
+          int mz21 = org.apache.hadoop.record.Utils.getVIntSize(mi21);
+          s1+=mz11; s2+=mz21; l1-=mz11; l2-=mz21;
+          for (int midx1 = 0; midx1 < mi11 && midx1 < mi21; midx1++) {{
+              int i1 = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+              int i2 = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+              int z1 = org.apache.hadoop.record.Utils.getVIntSize(i1);
+              int z2 = org.apache.hadoop.record.Utils.getVIntSize(i2);
+              s1+=z1; s2+=z2; l1-=z1; l2-=z2;
+              int r1 = org.apache.hadoop.record.Utils.compareBytes(b1,s1,i1,b2,s2,i2);
+              if (r1 != 0) { return (r1<0)?-1:0; }
+              s1+=i1; s2+=i2; l1-=i1; l1-=i2;
+            }
+            {
+              int i = org.apache.hadoop.record.Utils.readVInt(b1, s1);
+              int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+              s1 += z+i; l1 -= (z+i);
+            }
+            {
+              int i = org.apache.hadoop.record.Utils.readVInt(b2, s2);
+              int z = org.apache.hadoop.record.Utils.getVIntSize(i);
+              s2 += z+i; l2 -= (z+i);
+            }
+          }
+          if (mi11 != mi21) { return (mi11<mi21)?-1:0; }
+        }
+        return (os1 - s1);
+      } catch(java.io.IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    public int compare(byte[] b1, int s1, int l1,
+                         byte[] b2, int s2, int l2) {
+      int ret = compareRaw(b1,s1,l1,b2,s2,l2);
+      return (ret == -1)? -1 : ((ret==0)? 1 : 0);}
+  }
+  
+  static {
+    org.apache.hadoop.record.RecordComparator.define(ChukwaRecordJT.class, new Comparator());
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchResult.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.List;
+import java.util.TreeMap;
+
+
+public class ChukwaSearchResult implements SearchResult
+{
+	private TreeMap<Long, List<Record>> records;
+
+	public TreeMap<Long, List<Record>> getRecords()
+	{
+		return records;
+	}
+
+	public void setRecords(TreeMap<Long, List<Record>> records)
+	{
+		this.records = records;
+	}
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaSearchService.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSource;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceFactory;
+
+public class ChukwaSearchService implements SearchService
+{
+	private DataSourceFactory dataSourceFactory = DataSourceFactory.getInstance();
+	
+	public SearchResult  search(String cluster,String[] dataSources,long t0,long t1,String filter)
+	throws DataSourceException
+	{
+		SearchResult result = new ChukwaSearchResult();
+		
+		TreeMap<Long, List<Record>> records = new TreeMap<Long,List<Record>> ();
+		result.setRecords(records);
+		
+		for(int i=0;i<dataSources.length;i++)
+		{
+			DataSource ds = dataSourceFactory.getDataSource(dataSources[i]);
+			ds.search(result, cluster, dataSources[i], t0, t1, filter);
+		}
+		return result;
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/DatabaseRecord.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/DatabaseRecord.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/DatabaseRecord.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/DatabaseRecord.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,102 @@
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public class DatabaseRecord 
+{
+	public static final String intType = "i";
+	public static final String longType = "l";
+	public static final String floatType = "f";
+	public static final String doubleType = "d";
+	
+	public static final String tableField = "tbl";
+	public static final String columnsNameField = "cols";
+	public static final String columnsTypesField = "types";
+	
+	public static final String insertSQLCmde = "insert";
+	public static final String updateSQLCmde = "update";
+	public static final String sqlCmdeField = "cmde";
+	
+	ChukwaRecord record  = new ChukwaRecord();
+	
+	public DatabaseRecord()
+	{
+		this.record.add(Record.destinationField, "database");
+	}
+
+	public DatabaseRecord(ChukwaRecord record)
+	{
+		this.record = record;
+	}
+
+	
+	public ChukwaRecord getRecord() {
+		return this.record;
+	}
+
+	public void setRecord(ChukwaRecord record) {
+		this.record = record;
+	}
+
+	public String getTable()
+	{
+		return this.record.getValue(tableField);
+	}
+	public void setTable(String tableName)
+	{
+		this.record.add(tableField, tableName);
+	}
+	
+	public void setSqlCommand(String cmde)
+	{
+		this.record.add(sqlCmdeField, cmde);
+	}
+	
+	public String getSqlCommand()
+	{
+		return this.record.getValue(sqlCmdeField);
+	}
+	
+	public String[] getColumns()
+	{
+		return this.record.getValue(columnsNameField).split(",");
+	}
+	public void setColumns(String[] columns)
+	{
+		StringBuilder sb = new StringBuilder();
+		for(String name: columns)
+		{
+			sb.append(name).append(",");
+		}
+		this.record.add(columnsNameField, sb.substring(0, sb.length()));
+	}
+	
+	public String[] getColumnTypes()
+	{
+		return this.record.getValue(columnsTypesField).split(",");
+	}
+	
+	public void setColumnTypes(String[] columnsTypes)
+	{
+		StringBuilder sb = new StringBuilder();
+		for(String types: columnsTypes)
+		{
+			sb.append(types).append(",");
+		}
+		this.record.add(columnsTypesField, sb.substring(0, sb.length()));
+	}
+
+	public void setTime(long time) {this.record.setTime(time);
+		
+	}
+
+	public void add(String key, String value) {
+		this.record.add(key, value);
+	}
+
+	public long getTime() {
+		return this.record.getTime();
+	}
+
+	public String getValue(String field) {
+		return this.record.getValue(field);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/Record.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+public interface Record
+{
+	public static final String bodyField = "body";
+	
+	public static final String logLevelField = "logLevel";
+	public static final String destinationField = "dest";
+	public static final String dataSourceField = "ds";
+	public static final String sourceField = "src";
+	public static final String streamNameField = "sname";
+	public static final String typeField = "type";
+	public static final String classField = "pkg";
+	public static final String rawField = "raw";
+	
+	public static final String fieldSeparator = ":";
+	
+	public long getTime();
+	public void add(String key, String value);
+	public String[] getFields();
+	public String getValue(String field);
+	public String toString();
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchResult.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import java.util.List;
+import java.util.TreeMap;
+
+
+
+public interface SearchResult
+{
+	public TreeMap<Long, List<Record>> getRecords();
+	public void setRecords(TreeMap<Long, List<Record>> records);
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/SearchService.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine;
+
+import org.apache.hadoop.chukwa.extraction.engine.datasource.DataSourceException;
+
+
+public interface SearchService
+{
+	public SearchResult search(String cluster,String[] dataSources,long t0,long t1,String filter)
+	throws DataSourceException;
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DataSource.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.engine.datasource;
+
+import org.apache.hadoop.chukwa.extraction.engine.SearchResult;
+
+
+
+public interface DataSource
+{
+	 
+	public SearchResult
+		search(	SearchResult result,String cluster,String dataSource,
+				long t0,long t1,
+				String filter)
+		throws DataSourceException;
+	public boolean isThreadSafe();
+	
+}



Mime
View raw message