chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r816739 [1/3] - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/ src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ src/test/org/apache/hadoop/chukwa/analysis/ src/test/org/apache/hadoop/chukwa/analysis/salsa/ src/test/org/apache/hadoop/c...
Date Fri, 18 Sep 2009 18:43:02 GMT
Author: asrabkin
Date: Fri Sep 18 18:43:01 2009
New Revision: 816739

URL: http://svn.apache.org/viewvc?rev=816739&view=rev
Log:
CHUKWA-344. State-Machine Generation for input to SALSA visualizations. Contributed by Jiaqi Tan

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntryPartitioner.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/analysis/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/analysis/salsa/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/analysis/salsa/fsm/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/analysis/salsa/fsm/TestFSMBuilder.java
    hadoop/chukwa/trunk/test/samples/ClientTrace.log
    hadoop/chukwa/trunk/test/samples/JobHistory.log
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/build.xml
    hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=816739&r1=816738&r2=816739&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Sep 18 18:43:01 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-344. State-Machine Generation for input to SALSA visualizations. (Jiaqi Tan via asrabkin)
+
     CHUKWA-384. Added REST API charting capability. (Eric Yang)
 
     CHUKWA-369. Tolerance of collector failures. (asrabkin)

Modified: hadoop/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=816739&r1=816738&r2=816739&view=diff
==============================================================================
--- hadoop/chukwa/trunk/build.xml (original)
+++ hadoop/chukwa/trunk/build.xml Fri Sep 18 18:43:01 2009
@@ -478,6 +478,12 @@
                                 <include name="chukwa-core*.jar" />
                         </fileset>
                 </copy>
+                <copy todir="${test.build.classes}/webapps">
+                  <fileset dir="${build.dir}">
+                    <include name="chukwa.war" />
+                    <include name="hicc.war" />
+                  </fileset>
+                </copy>
                 <copy file="${basedir}/conf/chukwa-demux-conf.xml.template" tofile="${test.build.dir}/conf/chukwa-demux-conf.xml"></copy>
                 <copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
                 <copy file="${build.dir}/hicc.war" tofile="${test.build.classes}/webapps/hicc.war"></copy>

Modified: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=816739&r1=816738&r2=816739&view=diff
==============================================================================
Binary files - no diff available.

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/DataNodeClientTraceMapper.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+import java.util.regex.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.extraction.demux.*;
+import org.apache.hadoop.chukwa.extraction.engine.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.filecache.DistributedCache;
+
+/**
+ * Pluggable mapper for FSMBuilder
+ *
+ * K2 = State Name + State ID 
+ * (We use ChukwaRecordKey since it would already have implemented a bunch of
+ *  useful things such as Comparators etc.)
+ * V2 = TreeMap
+ */
+public class DataNodeClientTraceMapper 
+  extends MapReduceBase 
+  implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry> 
+{
+  private static Log log = LogFactory.getLog(FSMBuilder.class);
+	protected static final String SEP = "/";
+	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.FILESYSTEM_FSM];
+	private final Pattern ipPattern =
+    Pattern.compile(".*[a-zA-Z\\-_:\\/]([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*");
+  private final Pattern logMsgPattern = Pattern.compile("^(.{23}) ([A-Z]+) ([a-zA-Z0-9\\.]+): (.*)");
+
+  public void map
+    (ChukwaRecordKey key, ChukwaRecord val,
+     OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 Reporter reporter)
+    throws IOException 
+  {
+		String newkey = new String("");
+		String key_trimmed = key.toString().trim();
+		String task_type;
+		FSMIntermedEntry this_rec = new FSMIntermedEntry(); 
+
+		/* Extract field names for checking */
+		String [] fieldNames = val.getFields();
+		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
+		for (int i = 0; i < fieldNames.length; i++) {
+		  fieldNamesList.add(fieldNames[i]);
+		}
+		
+		// Handle ClientTraceDetailed and DataNodeLog entries separately
+		// because we need to combine both types of entries for a complete picture
+		
+		if (key.getReduceType().equals("ClientTraceDetailed")) {
+		  assert(fieldNamesList.contains("op"));
+		  if (val.getValue("op").startsWith("HDFS")) { 
+	      parseClientTraceDetailed(key, val, output, reporter, fieldNamesList);
+	    } // drop non-HDFS operations
+		} 
+		// ignore "DataNode" type log messages; unsupported
+				
+		return;
+  } // end of map()
+
+  protected final int DEFAULT_READ_DURATION_MS = 10;
+
+  // works with <= 0.20 ClientTrace with no durations
+  // includes hack to create start+end entries immediately
+  protected void parseClientTraceDetailed
+    (ChukwaRecordKey key, ChukwaRecord val,
+     OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 Reporter reporter, ArrayList<String> fieldNamesList)
+    throws IOException
+  {
+    FSMIntermedEntry start_rec, end_rec;
+    String current_op = null, src_add = null, dest_add = null;
+    String datanodeserver_add = null, blkid = null, cli_id = null;
+    
+    /* initialize state records */
+    start_rec = new FSMIntermedEntry();
+    end_rec = new FSMIntermedEntry();
+    start_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
+    start_rec.state_type = new StateType(StateType.STATE_START);
+    end_rec.fsm_type = new FSMType(FSMType.FILESYSTEM_FSM);
+    end_rec.state_type = new StateType(StateType.STATE_END);    
+        
+    /* extract addresses */
+    Matcher src_regex = ipPattern.matcher(val.getValue("src"));
+    if (src_regex.matches()) {
+      src_add = src_regex.group(1);
+    } else {
+      log.warn("Failed to match src IP:"+val.getValue("src")+"");
+      src_add = new String("");
+    }
+    Matcher dest_regex = ipPattern.matcher(val.getValue("dest"));
+    if (dest_regex.matches()) {
+      dest_add = dest_regex.group(1);
+    } else {
+      log.warn("Failed to match dest IP:"+val.getValue("dest")+"");
+      dest_add = new String("");
+    }
+    Matcher datanodeserver_regex = ipPattern.matcher(val.getValue("srvID"));
+    if (datanodeserver_regex.matches()) {
+      datanodeserver_add = datanodeserver_regex.group(1);
+    } else {
+      log.warn("Failed to match DataNode server address:"+val.getValue("srvID")+"");
+      datanodeserver_add = new String("");
+    }
+    
+    start_rec.host_exec = new String(datanodeserver_add);
+    end_rec.host_exec = new String(datanodeserver_add);
+        
+    blkid = val.getValue("blockid").trim();
+    if (fieldNamesList.contains("cliID")) {
+      cli_id = val.getValue("cliID").trim();
+      if (cli_id.startsWith("DFSClient_")) {
+        cli_id = cli_id.substring(10);
+      }
+    } else {
+      cli_id = new String("");
+    }
+    current_op = val.getValue("op");
+    String [] k = key.getKey().split("/");    
+    
+    long actual_time_ms = Long.parseLong(val.getValue("actual_time"));
+    if (fieldNamesList.contains("duration")) {
+      try {
+        actual_time_ms -= (Long.parseLong(val.getValue("duration").trim()) / 1000);
+      } catch (NumberFormatException nef) {
+        log.warn("Failed to parse duration: >>" + val.getValue("duration"));
+      }
+    } else {
+      actual_time_ms -= DEFAULT_READ_DURATION_MS;
+    }
+    
+    start_rec.time_orig_epoch = k[0];
+    start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used
+    start_rec.timestamp = (new Long(actual_time_ms)).toString();
+    start_rec.time_end = new String("");
+    start_rec.time_start = new String(start_rec.timestamp);
+    
+    end_rec.time_orig_epoch = k[0];
+    end_rec.time_orig = val.getValue("actual_time");    
+    end_rec.timestamp = new String(val.getValue("actual_time"));
+    end_rec.time_end = new String(val.getValue("actual_time"));
+    end_rec.time_start = new String("");
+    
+    log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start)));
+
+    end_rec.job_id = new String(cli_id); // use job id = block id
+    start_rec.job_id = new String(cli_id); 
+      
+    if (current_op.equals("HDFS_READ")) {
+      if (src_add != null && src_add.equals(dest_add)) {
+        start_rec.state_hdfs = new HDFSState(HDFSState.READ_LOCAL);
+      } else {
+        start_rec.state_hdfs = new HDFSState(HDFSState.READ_REMOTE);
+      }
+      // should these ALWAYS be dest?
+      start_rec.host_other = new String(dest_add);
+      end_rec.host_other = new String(dest_add);
+    } else if (current_op.equals("HDFS_WRITE")) {
+      if (src_add != null && dest_add.equals(datanodeserver_add)) {
+        start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_LOCAL);
+        } else if (dest_add != null && !dest_add.equals(datanodeserver_add)) {
+        start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REMOTE);
+      } else {
+        start_rec.state_hdfs = new HDFSState(HDFSState.WRITE_REPLICATED);        
+      }
+      start_rec.host_other = new String(dest_add);
+      end_rec.host_other = new String(dest_add);
+    } else {
+      log.warn("Invalid state: " + current_op);
+    }
+    end_rec.state_hdfs = start_rec.state_hdfs;
+    start_rec.state_name = start_rec.state_hdfs.toString();
+    end_rec.state_name = end_rec.state_hdfs.toString();
+    start_rec.identifier = new String(blkid);
+    end_rec.identifier = new String(blkid);
+    
+    start_rec.unique_id = new String(start_rec.state_name + "@" + 
+      start_rec.identifier + "@" + start_rec.job_id);
+    end_rec.unique_id = new String(end_rec.state_name + "@" + 
+      end_rec.identifier + "@" + end_rec.job_id);
+      
+    start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
+		start_rec.add_info.put("csource",val.getValue("csource"));
+    end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
+		end_rec.add_info.put("csource",val.getValue("csource"));
+		end_rec.add_info.put("STATE_STRING",new String("SUCCESS")); // by default
+		
+		// add counter value
+		end_rec.add_info.put("BYTES",val.getValue("bytes"));
+		    
+    String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp);
+    String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp);
+    output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec);
+    output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec);
+    
+    return;
+  }
+
+} // end of mapper class

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMBuilder.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.extraction.demux.*;
+import org.apache.hadoop.chukwa.extraction.engine.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * FSM Builder
+ * 
+ * Input: start/end pairs i.e. JobHistory data
+ * 
+ * Input handling is controlled by choosing a custom mapper that 
+ * is able to parse the desired input format (e.g. JobHistory lines)
+ * One map class is provided for each type of input data provided
+ * Each map class "standardizes" the different input log types
+ * to the standardized internal FSMIntermedEntry representation
+ *
+ * Currently available mapper classes:
+ * DataNodeClientTraceMapper
+ * TaskTrackerClientTraceMapper
+ * JobHistoryTaskDataMapper
+ *
+ * Parameterizing choice of mapper class - read in as config parameter
+ *
+ * Output is standardized, regardless of input, and is generated by
+ * the common reducer
+ * 
+ */
+
+public class FSMBuilder extends Configured implements Tool {
+
+  private static Log log = LogFactory.getLog(FSMBuilder.class);
+
+	public enum AddInfoTypes {HOST_OTHER, INPUT_BYTES, INPUT_RECORDS, INPUT_GROUPS, 
+		OUTPUT_BYTES, OUTPUT_RECORDS, SHUFFLE_BYTES, RECORDS_SPILLED, 
+		COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS}
+	
+	protected static final String SEP = "/";
+
+  public static class FSMReducer
+    extends MapReduceBase
+    implements Reducer<ChukwaRecordKey, FSMIntermedEntry, ChukwaRecordKey, ChukwaRecord> {
+  
+		/**
+		 * These are used for the add_info TreeMap; keys not listed here are automatically
+		 * prepended with "COUNTER_"
+		 */
+    protected static String NON_COUNTER_KEYS [] = {"csource","ctags","STATE_STRING"};
+
+    protected static String JCDF_ID1 = "JCDF_ID1";
+    protected static String JCDF_ID2 = "JCDF_ID2";
+    protected static String JCDF_EDGE_TIME = "JCDF_E_TIME";
+    protected static String JCDF_EDGE_VOL = "JCDF_E_VOL";
+    protected static String JCDF_SEP = "@";
+
+
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_blockread
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("JOB_ID"));
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_BYTES"));
+
+      String id1 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
+      String id2 = new String("map"+JCDF_SEP+cr.getValue("JOB_ID"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }
+
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_map
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_INPUT_BYTES"));
+
+      String id1 = new String("map"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String id2 = new String("shuf"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }
+
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_shuffle
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_BYTES"));
+
+      String mapid, redid, statename;
+      String id_parts[];
+      
+      id_parts = (cr.getValue("TASK_ID")).split("@");
+      if (id_parts.length != 2) {
+        log.warn("Could not split [" + cr.getValue("TASK_ID") + "]; had length " + id_parts.length);
+      }
+      redid = id_parts[0];
+      mapid = id_parts[1];
+
+      String id1 = new String("shuf"+JCDF_SEP+mapid);
+      String id2 = new String("shufred"+JCDF_SEP+redid);
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }    
+
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_redshufwait
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_INPUT_BYTES"));
+
+      String id1 = new String("shufred"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String id2 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }
+
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_redsort
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_INPUT_BYTES"));
+
+      String id1 = new String("redsort"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String id2 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }    
+    
+    /**
+     * Populates fields used by Pig script for stitching together causal flows
+     */
+    protected void addStitchingFields_redreducer
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_INPUT_BYTES"));
+
+      String id1 = new String("red"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String id2 = new String("redout"+JCDF_SEP+cr.getValue("TASK_ID"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_INPUT_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }    
+    
+    protected void addStitchingFields_blockwrite
+      (ChukwaRecord cr, ArrayList<String> fnl)
+    {
+      assert(fnl.contains("JOB_ID"));
+      assert(fnl.contains("TASK_ID"));
+      assert(fnl.contains("TIME_END"));
+      assert(fnl.contains("TIME_START"));
+      assert(fnl.contains("COUNTER_BYTES"));
+
+      String id1 = new String("redout"+JCDF_SEP+cr.getValue("JOB_ID"));
+      String id2 = new String(cr.getValue("TASK_ID")+JCDF_SEP+cr.getValue("TIME_START"));
+      String et = new String(
+        (new Long(Long.parseLong(cr.getValue("TIME_END")) - 
+        Long.parseLong(cr.getValue("TIME_START")))).toString()
+      );
+      String ev = new String(cr.getValue("COUNTER_BYTES"));      
+      cr.add(JCDF_ID1, id1);
+      cr.add(JCDF_ID2, id2);
+      cr.add(JCDF_EDGE_TIME, et);
+      cr.add(JCDF_EDGE_VOL, ev);
+      return;
+    }
+
+    public void addStitchingFields
+     (ChukwaRecord cr)
+    {
+      String state_name = null;
+  		String [] fieldNames = cr.getFields();
+  		
+  		// get field name list
+  		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
+  		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]);
+
+      // safety checks
+      assert(fieldNamesList.contains("STATE_NAME"));
+      
+      state_name = cr.getValue("STATE_NAME");
+      if (state_name.equals("MAP")) {
+        addStitchingFields_map(cr, fieldNamesList);
+      } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) { 
+        addStitchingFields_redshufwait(cr, fieldNamesList);
+      } else if (state_name.equals("REDUCE_SORT")) { 
+        addStitchingFields_redsort(cr, fieldNamesList);        
+      } else if (state_name.equals("REDUCE_REDUCER")) { 
+        addStitchingFields_redreducer(cr, fieldNamesList);
+      } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) { 
+        addStitchingFields_shuffle(cr, fieldNamesList);
+      } else if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) { 
+        addStitchingFields_blockread(cr, fieldNamesList);        
+      } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")) {
+        addStitchingFields_blockwrite(cr, fieldNamesList);
+      } 
+      // else add nothing
+      return;
+    }
+
+    public void reduce
+      (ChukwaRecordKey key, Iterator<FSMIntermedEntry> values,
+       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, 
+       Reporter reporter) 
+      throws IOException
+    {
+			FSMIntermedEntry start_rec = null, end_rec = null;
+			FSMIntermedEntry tmpent;
+      String keystr = key.getKey();
+			String newkey;
+			ArrayList<FSMIntermedEntry> ents = new ArrayList<FSMIntermedEntry>();
+			ArrayList<String> noncounters = new ArrayList<String>();
+			keystr.trim();
+			ChukwaRecord cr = new ChukwaRecord();
+			
+			for (int i = 0; i < NON_COUNTER_KEYS.length; i++) noncounters.add(NON_COUNTER_KEYS[i]);
+			
+			ChukwaOutputCollector coc = new ChukwaOutputCollector("SALSA_COMPLETE", output, reporter);
+
+      int itemcount = 0;
+			try {
+      	while (values.hasNext()) { 
+					itemcount++; 
+					tmpent = values.next(); 
+					ents.add(tmpent.clone()); 
+				}
+			} catch (CloneNotSupportedException e) {
+				// do nothing
+			}
+
+      log.debug("In reduce [Key " + keystr + "] (" + itemcount + " vals)");
+      
+			if (itemcount == 2) { // i.e. we have both start and end events
+
+				if (ents.get(0).state_type.val == StateType.STATE_START && 
+						ents.get(1).state_type.val == StateType.STATE_END) 
+				{
+					start_rec = ents.get(0); end_rec = ents.get(1);
+				} else if	(ents.get(1).state_type.val == StateType.STATE_START && 
+									 ents.get(0).state_type.val == StateType.STATE_END)
+				{
+					start_rec = ents.get(1); end_rec = ents.get(0);
+				} else {
+					log.warn("In reduce [Key " + keystr + "] Invalid combination of state types: number of states: "+itemcount+".");
+					// error handling?
+				}
+						
+				cr.add(new String("STATE_NAME"),start_rec.state_name);
+				cr.add(new String("STATE_UNIQ_ID"),start_rec.getUniqueID());
+				cr.add(new String("TIMESTAMP"),start_rec.timestamp);
+				cr.add(new String("TIME_START"),start_rec.time_start);
+				cr.add(new String("TIME_END"),end_rec.time_end);
+				cr.add(new String("TIME_START_MILLIS"),start_rec.time_start.substring(start_rec.time_start.length()-3));
+				cr.add(new String("TIME_END_MILLIS"),end_rec.time_end.substring(end_rec.time_end.length()-3));
+				cr.add(new String("HOST"),start_rec.host_exec);
+				cr.add(new String("HOST_OTHER"),start_rec.host_other);
+				cr.add(new String("JOB_ID"),start_rec.job_id); 
+				cr.add(new String("TASK_ID"),start_rec.getFriendlyID());
+
+				Set<String> treemapkeys = end_rec.add_info.keySet();
+				Iterator<String> keyIter = treemapkeys.iterator();
+				
+				for (int i = 0; i < treemapkeys.size(); i++) {
+					assert(keyIter.hasNext());
+					String currkey = keyIter.next();
+					if (currkey != null && 
+					    !noncounters.contains(currkey)) {
+						cr.add(new String("COUNTER_" + currkey), end_rec.add_info.get(currkey));	
+					} else if (currkey != null && noncounters.contains(currkey)) {
+						cr.add(new String(currkey), end_rec.add_info.get(currkey));				
+					} 
+				}
+				assert(!keyIter.hasNext());
+				cr.setTime(Long.parseLong(start_rec.timestamp));
+				
+				newkey = null;
+				newkey = new String(start_rec.time_orig_epoch + 
+					SEP + start_rec.getUniqueID() + SEP + start_rec.time_orig);
+
+				log.info("Key ["+newkey+"] Task ["+start_rec.getUniqueID()+"] Job ["+start_rec.job_id+"] Friendly ["+start_rec.getFriendlyID()+"]");
+
+        addStitchingFields(cr);
+        log.debug(cr);
+				coc.collect(new ChukwaRecordKey(key.getReduceType(), newkey), cr);
+				
+			} else if (itemcount == 1) { 
+				// check that we have only the start; if we have only the end, dump it
+				// otherwise change the reducetype to get record written to file for
+				// incomplete entries
+				
+				log.warn("Key ["+keystr+"] Too few state entries: "+itemcount+" (intermediate processing not implemented yet).");
+				
+			} else { // any other value is invalid
+				// malformed data; print debug info?
+
+				log.warn("Key ["+keystr+"] Malformed data: unexpected number of state entries: "+itemcount+".");
+
+			}
+    }
+  }
+  
+  
+  public int run (String args[]) throws Exception {
+		int num_inputs;
+    JobConf conf = new JobConf(getConf(), FSMBuilder.class);
+		String [] args2 = args;
+
+		if (args2.length < 4 || !"-in".equals(args2[0]))
+		{
+			System.err.println("Specifying mapper (full Java class): -D chukwa.salsa.fsm.mapclass=");
+			System.err.println("Application-specific arguments: -in <# inputs> [input dir 1] ... [input dir n] [output dir]");
+			return(1);
+		} 
+
+    conf.setJobName("Salsa_FSMBuilder");
+    
+		/* Get name of Mapper class to use */
+		String mapclassname = conf.get("chukwa.salsa.fsm.mapclass");
+		log.info("Mapper class: " + mapclassname);
+		Class mapperClass = null;
+		try {
+			mapperClass = Class.forName(mapclassname);
+		} catch (ClassNotFoundException c) {
+			System.err.println("Mapper " + mapclassname + " not found: " + c.toString());
+		}
+
+    /* Get on with usual job setup */
+    conf.setMapperClass(mapperClass);
+    conf.setReducerClass(FSMReducer.class);
+    conf.setOutputKeyClass(ChukwaRecordKey.class);
+    conf.setOutputValueClass(ChukwaRecord.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(ChukwaRecordOutputFormat.class);
+    conf.setPartitionerClass(FSMIntermedEntryPartitioner.class);
+		conf.setMapOutputValueClass(FSMIntermedEntry.class);
+		conf.setMapOutputKeyClass(ChukwaRecordKey.class);
+		conf.setNumReduceTasks(1); // fixed at 1 to ensure that all records are grouped together
+                             
+		/* Setup inputs/outputs */
+		try {
+			num_inputs = Integer.parseInt(args2[1]);
+		} catch (NumberFormatException e) {
+			System.err.println("Specifying mapper: -D chukwa.salsa.fsm.mapper=");
+			System.err.println("Application-specific arguments: -in <# inputs> -out <#outputs> [input dir] [output dir]");
+			return(1);
+		}
+		
+		if (num_inputs <= 0) {
+			System.err.println("Must have at least 1 input.");
+			return(1);
+		}
+		
+		for (int i = 2; i < 2+num_inputs; i++) {
+	    Path in = new Path(args2[i]);
+	    FileInputFormat.addInputPath(conf, in);			
+		}
+
+    Path out = new Path(args2[2+num_inputs]);
+    FileOutputFormat.setOutputPath(conf, out);
+    
+    JobClient.runJob(conf);
+    
+    return(0);
+  }
+  
+  public static void main (String [] args) throws Exception {
+        
+    int res = ToolRunner.run(new Configuration(), new FSMBuilder(), args);
+    
+    System.exit(res);
+  }
+
+
+  
+}
+

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntry.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+import java.util.Set;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/*
+ * FSM Intermediate State Entry
+ * 
+ * Each state corresponds to two of these entries:
+ * One corresponding to the start of the state, one corresponding to the end of the state
+ *
+ * Intermediate data-structure passed from Maps to Reduces
+ *
+ */
+public class FSMIntermedEntry 
+	implements Cloneable, WritableComparable 
+{
+	private final char DELIM = 1;
+	
+	/* Begin fields */
+	public StateType state_type;
+	public MapRedState state_mapred;
+	public HDFSState state_hdfs;
+	public FSMType fsm_type;
+	
+	public String state_name;
+	public String identifier;
+	public String unique_id; // state name + unique identifier 
+														// (state-dependent)
+														// this id should also correspond 
+														// to the k2 value between
+														// mappers and reducers
+														
+	public String timestamp;
+	public String time_start;
+	public String time_end;
+	
+	public String host_exec;
+	public String host_other; // for instance, source host for shuffle, 
+														// src/dest host for dfs read/write
+	
+	// These values filled in by splitting the original 
+	// ChukwaRecordKey from Demux
+	public String time_orig_epoch;
+	public String time_orig;
+	public String job_id; // we get this for free from the CRK
+	
+	TreeMap<String,String> add_info; // additional information 
+																	 // e.g. locality information
+																	
+	/* End of fields */
+	
+	public FSMIntermedEntry() {
+		this.state_mapred = new MapRedState(MapRedState.NONE);
+		this.state_hdfs = new HDFSState(HDFSState.NONE);
+		this.state_type = new StateType(StateType.STATE_NOOP);			
+		this.add_info = new TreeMap<String, String>();
+		this.host_other = new String("");
+		this.job_id = new String("");
+		this.time_orig_epoch = new String("");
+		this.time_orig = new String("");
+	}
+	
+	public String getUniqueID()
+	{
+		return new String(this.unique_id);
+	}
+	
+	public String getFriendlyID()
+	{
+		return new String(this.identifier);
+	}
+	
+	/**
+	 * Set state_type and identifier before calling
+	 */
+	public void generateUniqueID()
+	{
+		if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || 
+			  this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) 
+		{
+			this.state_name = new String(this.state_mapred.toString());
+		} else if (this.fsm_type.val == FSMType.MAPREDUCE_FSM || 
+							 this.fsm_type.val == FSMType.MAPREDUCE_FSM_INCOMPLETE) 
+		{
+			this.state_name = new String(this.state_hdfs.toString());
+		}
+		this.unique_id = new String(this.state_name + "@" + this.identifier);
+	}	
+	
+	public void write(DataOutput out) throws IOException {
+		Set<String> mapKeys;
+		
+		out.writeInt(this.state_type.val);
+		out.writeInt(this.state_mapred.val);
+		out.writeInt(this.state_hdfs.val);
+		out.writeInt(this.fsm_type.val);
+		out.writeChar(DELIM);
+		out.writeInt(state_name.length());
+		if (state_name.length() > 0) out.writeUTF(state_name);
+		out.writeInt(unique_id.length());
+		if (unique_id.length() > 0) out.writeUTF(unique_id);
+		out.writeInt(timestamp.length());
+		if (timestamp.length() > 0) out.writeUTF(timestamp);
+		out.writeInt(time_start.length());
+		if (time_start.length() > 0) out.writeUTF(time_start);
+		out.writeInt(time_end.length());
+		if (time_end.length() > 0) out.writeUTF(time_end);
+		out.writeInt(host_exec.length());
+		if (host_exec.length() > 0) out.writeUTF(host_exec);
+		out.writeInt(host_other.length());
+		if (host_other.length() > 0) out.writeUTF(host_other);
+		out.writeInt(time_orig_epoch.length());
+		if (time_orig_epoch.length() > 0) out.writeUTF(time_orig_epoch);
+		out.writeInt(time_orig.length());
+		if (time_orig.length() > 0) out.writeUTF(time_orig);
+		out.writeInt(job_id.length());
+		if (job_id.length() > 0) out.writeUTF(job_id);
+		out.writeInt(identifier.length());
+		if (identifier.length() > 0) out.writeUTF(identifier);
+					
+		mapKeys = this.add_info.keySet();
+		out.writeInt(mapKeys.size());
+		
+		Iterator<String> keyIter = mapKeys.iterator(); 
+		
+		for (int i = 0; i < mapKeys.size(); i++) {
+			assert(keyIter.hasNext());
+			String currKey = keyIter.next();
+			if (currKey != null) {
+				String currvalue = this.add_info.get(currKey);
+				out.writeUTF(currKey);
+				out.writeInt(currvalue.length());
+				if (currvalue.length() > 0) {
+					out.writeUTF(currvalue);
+				} 
+			} else {
+				out.writeUTF(new String("NULL"));
+				out.writeInt(0);
+			}
+		}
+	}
+
+	public void readFields(DataInput in) throws IOException {
+		int currlen, numkeys;
+		
+		this.state_type = new StateType(in.readInt());
+		this.state_mapred = new MapRedState(in.readInt());
+		this.state_hdfs = new HDFSState(in.readInt());
+		this.fsm_type = new FSMType(in.readInt());
+		in.readChar();
+
+		currlen = in.readInt();
+		if (currlen > 0) this.state_name = in.readUTF();
+		else this.state_name = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.unique_id = in.readUTF();
+		else this.unique_id = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.timestamp = in.readUTF();
+		else this.timestamp = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.time_start = in.readUTF();
+		else this.time_start = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.time_end = in.readUTF();
+		else this.time_end = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.host_exec = in.readUTF();
+		else this.host_exec = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.host_other = in.readUTF();
+		else this.host_other = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.time_orig_epoch = in.readUTF();
+		else this.time_orig_epoch = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.time_orig = in.readUTF();
+		else this.time_orig = new String("");
+
+		currlen = in.readInt();
+		if (currlen > 0) this.job_id = in.readUTF();
+		else this.job_id = new String("");
+			
+		currlen = in.readInt();
+		if (currlen > 0) this.identifier = in.readUTF();
+		else this.identifier = new String("");			
+					
+		numkeys = in.readInt();
+
+		this.add_info = new TreeMap<String, String>();
+		
+		if (numkeys > 0) {
+			for (int i = 0; i < numkeys; i++) {
+				String currkey, currval;
+				currkey = in.readUTF();
+				currlen = in.readInt();
+				if (currlen > 0) {
+					currval = in.readUTF();
+					this.add_info.put(currkey, currval);
+				}
+			}
+		}
+	}
+	 
+	
+	public boolean equals (Object o) {
+		FSMIntermedEntry other = (FSMIntermedEntry) o;
+		return this.unique_id.equals(other.unique_id);
+	}
+	
+	public int compareTo (Object o) {
+		FSMIntermedEntry other = (FSMIntermedEntry) o;
+		return this.unique_id.compareTo(other.unique_id);
+	}
+	
+	/*
+	 * This method is to support convenient creating of new copies
+	 * of states for Reduce to create sub-states ReduceShuffle, ReduceSort, and ReduceReducer
+	 */
+	public FSMIntermedEntry clone() throws CloneNotSupportedException {
+		FSMIntermedEntry newObj = (FSMIntermedEntry) super.clone();
+		Set<String> mapKeys;
+
+		newObj.state_type = new StateType(this.state_type.val);
+		newObj.state_mapred = new MapRedState(this.state_mapred.val);
+		newObj.state_hdfs = new HDFSState(this.state_hdfs.val);
+		newObj.fsm_type = new FSMType(this.fsm_type.val);
+
+		/* Deep copy all strings */
+		newObj.state_name = new String(this.state_name);
+		newObj.unique_id = new String(this.unique_id);
+		newObj.timestamp = new String(this.timestamp);
+		newObj.time_start = new String(this.time_start);
+		newObj.time_end = new String(this.time_end);
+		
+		newObj.time_orig_epoch = new String(this.time_orig_epoch);
+		newObj.time_orig = new String(this.time_orig);
+		newObj.job_id = new String(this.job_id);
+		
+		
+		/* Deep copy of TreeMap */
+		newObj.add_info = new TreeMap<String,String>();
+		mapKeys = this.add_info.keySet();
+		Iterator<String> keyIter = mapKeys.iterator();
+		String currKey = null;
+		
+		for (int i = 0; i < mapKeys.size(); i++) {
+			assert(keyIter.hasNext());
+			currKey = keyIter.next();
+			if (currKey != null) {
+				newObj.add_info.put(currKey, this.add_info.get(currKey));
+			}
+		}
+		
+		return newObj;
+	}
+	
+	public String toString() {
+		return new String(this.state_name + "@" + this.unique_id);
+	}
+	
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntryPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntryPartitioner.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntryPartitioner.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMIntermedEntryPartitioner.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+
+public class FSMIntermedEntryPartitioner
+	implements Partitioner<ChukwaRecordKey, FSMIntermedEntry>
+{
+
+	public int getPartition
+		(ChukwaRecordKey key, FSMIntermedEntry val, int numPartitions)
+	{
+		return (Math.abs(key.hashCode() % numPartitions));		
+	}
+	
+	public void configure(JobConf job) {
+		// do nothing
+		return;
+	}
+
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/FSMType.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+public class FSMType {
+	public static final int MAPREDUCE_FSM = 0;
+	public static final int FILESYSTEM_FSM = 1;
+	public static final int MAPREDUCE_FSM_INCOMPLETE = 2;
+	public static final int FILESYSTEM_FSM_INCOMPLETE = 3;
+	public static final String [] NAMES = { "MAPREDUCE_FSM", "FILESYSTEM_FSM", "MAPREDUCE_FSM_INCOMPLETE", "FILESYSTEM_FSM_INCOMPLETE" };
+	public FSMType() { this.val = 0; }
+	public FSMType(int newval) { this.val = newval; }
+	public int val;
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }		
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/HDFSState.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+public class HDFSState {
+	public static final int NONE = 6;
+	public static final int READ_LOCAL = 1; 
+	public static final int READ_REMOTE = 2;
+	public static final int WRITE_LOCAL = 3;
+	public static final int WRITE_REMOTE = 4;
+	public static final int WRITE_REPLICATED = 5;
+	public static final String [] NAMES = { "NONE", "READ_LOCAL", "READ_REMOTE", "WRITE_LOCAL", "WRITE_REMOTE", "WRITE_REPLICATED"};
+	public HDFSState() { this.val = 1; }
+	public HDFSState(int newval) { this.val = newval; }
+	public int val;
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }		
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/JobHistoryTaskDataMapper.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.extraction.demux.*;
+import org.apache.hadoop.chukwa.extraction.engine.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.filecache.DistributedCache;
+
+/**
+ * Pluggable mapper for FSMBuilder
+ * Supports only 0.20+ JobHistory files
+ * because of explicitly coded counter names
+ *
+ * K2 = State Name + State ID 
+ * (We use ChukwaRecordKey since it would already have implemented a bunch of
+ *  useful things such as Comparators etc.)
+ * V2 = TreeMap
+ */
+public class JobHistoryTaskDataMapper 
+  extends MapReduceBase 
+  implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry> 
+{
+  private static Log log = LogFactory.getLog(FSMBuilder.class);
+	protected static final String SEP = "/";
+	
+	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+
+	/*
+	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
+	 * with input/output counters for Map records
+	 */
+	protected FSMIntermedEntry populateRecord_MapCounters
+		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
+	{
+		String mapCounterNames [] = {		
+			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_BYTES",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_INPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_BYTES",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:MAP_OUTPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
+		};
+		String mapCounterDestNames[] = {
+			"FILE_BYTES_WRITTEN",
+			"COMBINE_INPUT_RECORDS",
+			"COMBINE_OUTPUT_RECORDS",
+			"INPUT_BYTES",
+			"INPUT_RECORDS",
+			"OUTPUT_BYTES",
+			"OUTPUT_RECORDS",
+			"SPILLED_RECORDS"
+		};
+		
+		assert(mapCounterDestNames.length == mapCounterNames.length);
+		
+		String currstr = new String();
+		for (int i = 0; i < mapCounterDestNames.length; i++) {
+			if (fieldNamesList.contains(mapCounterNames[i])) {
+				this_rec.add_info.put(mapCounterDestNames[i], val.getValue(mapCounterNames[i]));				
+			}
+		}
+		this_rec.add_info.put("FILE_BYTES_READ",new String("0")); // to have same fields as reduce
+		this_rec.add_info.put("INPUT_GROUPS",new String("0")); // to have same fields as reduce
+		
+		return this_rec;
+	}
+
+	/*
+	 * Helper function for mapper to populate TreeMap of FSMIntermedEntr
+	 * with input/output counters for Reduce records
+	 */
+	protected FSMIntermedEntry populateRecord_ReduceCounters
+		(FSMIntermedEntry this_rec, ChukwaRecord val, ArrayList<String> fieldNamesList) 
+	{
+		String redCounterNames [] = {		
+			"Counter:FileSystemCounters:FILE_BYTES_READ",
+			"Counter:FileSystemCounters:FILE_BYTES_WRITTEN",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_INPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:COMBINE_OUTPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_GROUPS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_INPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_OUTPUT_RECORDS",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:REDUCE_SHUFFLE_BYTES",
+			"Counter:org.apache.hadoop.mapred.Task$Counter:SPILLED_RECORDS"
+		};
+		String redCounterDestNames[] = {
+			"FILE_BYTES_READ",
+			"FILE_BYTES_WRITTEN",
+			"COMBINE_INPUT_RECORDS",
+			"COMBINE_OUTPUT_RECORDS",
+			"INPUT_GROUPS",
+			"INPUT_RECORDS",
+			"OUTPUT_RECORDS",
+			"INPUT_BYTES", // NOTE: shuffle bytes are mapped to "input_bytes"
+			"SPILLED_RECORDS"
+		};
+		
+		assert(redCounterDestNames.length == redCounterNames.length);
+		
+		String currstr = new String();
+		for (int i = 0; i < redCounterDestNames.length; i++) {
+			if (fieldNamesList.contains(redCounterNames[i])) {
+				this_rec.add_info.put(redCounterDestNames[i], val.getValue(redCounterNames[i]));				
+			}
+		}
+		
+		this_rec.add_info.put("OUTPUT_BYTES",new String("0")); // to have same fields as map		
+		
+		return this_rec;		
+	}
+
+  public void map
+    (ChukwaRecordKey key, ChukwaRecord val,
+     OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 Reporter reporter)
+    throws IOException 
+  {
+		String newkey = new String("");
+		String task_type;
+		FSMIntermedEntry this_rec = new FSMIntermedEntry(); 
+		boolean add_record = true;
+
+		/* Extract field names for checking */
+		String [] fieldNames = val.getFields();
+		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
+		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
+					
+		/* Check state (Map or Reduce), generate unique ID */
+		if (!fieldNamesList.contains("TASK_ATTEMPT_ID")) return; // Ignore "TASK" entries
+		if (!fieldNamesList.contains("TASK_TYPE")) { // Malformed, ignore
+			return;
+		} else {
+			task_type = val.getValue("TASK_TYPE"); 
+			if (!task_type.equals("MAP") && !task_type.equals("REDUCE")) {
+				return; // do nothing
+			} else {
+				// newkey = newkey.concat(task_type);
+				// newkey = newkey.concat("@");
+				// newkey = newkey.concat(val.getValue("TASK_ATTEMPT_ID"));
+			} 
+		} 
+
+		/* Check if this is a start or end entry, set state type, extract start/end times */
+		if (fieldNamesList.contains("START_TIME")) {
+			this_rec.state_type.val = StateType.STATE_START;
+			this_rec.timestamp = new String(val.getValue("START_TIME"));
+			this_rec.time_start = new String(val.getValue("START_TIME"));
+			this_rec.time_end = new String("");
+			if (val.getValue("START_TIME").length() < 4+2) { // needs to at least have milliseconds
+			  add_record = add_record & false;
+			} 
+		} else if (fieldNamesList.contains("FINISH_TIME")) {
+			this_rec.state_type.val = StateType.STATE_END;
+			this_rec.timestamp = new String(val.getValue("FINISH_TIME"));
+			this_rec.time_start = new String("");
+			this_rec.time_end = new String(val.getValue("FINISH_TIME"));
+			if (val.getValue("FINISH_TIME").length() < 4+2) { // needs to at least have milliseconds
+			  add_record = add_record & false;
+			} 		
+		} else {
+			this_rec.state_type.val = StateType.STATE_NOOP;
+		}
+		
+		/* Fill in common intermediate state entry information */
+		
+		// Extract original ChukwaRecordKey values for later key reconstruction by reducer
+		try {
+			this_rec = ParseUtilities.splitChukwaRecordKey(key.getKey().trim(),this_rec,SEP);
+		} catch (Exception e) {
+			log.warn("Error occurred splitting ChukwaRecordKey ["+key.getKey().trim()+"]: " + e.toString());
+			return;
+		}
+		
+		// Populate state enum information
+		this_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
+		if (task_type.equals("MAP")) {
+			this_rec.state_mapred = new MapRedState(MapRedState.MAP);
+		} else if (task_type.equals("REDUCE")) {
+			this_rec.state_mapred = new MapRedState(MapRedState.REDUCE);
+		} else {
+			this_rec.state_mapred = new MapRedState(MapRedState.NONE); // error handling here?
+		}
+		
+		// Fill state name, unique ID
+		this_rec.state_name = new String(this_rec.state_mapred.toString());
+		this_rec.identifier = new String(val.getValue("TASK_ATTEMPT_ID"));
+		this_rec.generateUniqueID();
+		newkey = new String(this_rec.getUniqueID());
+		
+		// Extract hostname from tracker name (if present), or directly fill from hostname (<= 0.18)
+		if (fieldNamesList.contains("HOSTNAME")) {
+			this_rec.host_exec = new String(val.getValue("HOSTNAME"));
+			this_rec.host_exec = ParseUtilities.removeRackFromHostname(this_rec.host_exec);
+		} else if (fieldNamesList.contains("TRACKER_NAME")) {
+			this_rec.host_exec = ParseUtilities.extractHostnameFromTrackerName(val.getValue("TRACKER_NAME"));
+		} else {
+			this_rec.host_exec = new String("");
+		}
+		
+		if (this_rec.state_type.val == StateType.STATE_END) {
+			assert(fieldNamesList.contains("TASK_STATUS"));
+			String tmpstring = null;
+			tmpstring = val.getValue("TASK_STATUS");
+			if (tmpstring.equals("KILLED") || tmpstring.equals("FAILED")) {
+			  add_record = add_record & false;
+			}
+			if (tmpstring != null && tmpstring.length() > 0) {
+				this_rec.add_info.put("STATE_STRING",tmpstring);
+			} else {
+				this_rec.add_info.put("STATE_STRING",new String(""));
+			}
+			
+			switch(this_rec.state_mapred.val) {
+				case MapRedState.MAP:
+					this_rec = populateRecord_MapCounters(this_rec, val, fieldNamesList);
+					break;
+				case MapRedState.REDUCE:
+					this_rec = populateRecord_ReduceCounters(this_rec, val, fieldNamesList);
+					break;
+				default:
+					// do nothing
+					break;
+			}
+		}
+		// manually add clustername etc
+		assert(fieldNamesList.contains(Record.tagsField));
+		assert(fieldNamesList.contains("csource"));
+		this_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
+		this_rec.add_info.put("csource",val.getValue("csource"));
+				
+		/* Special handling for Reduce Ends */
+		if (task_type.equals("REDUCE")) {
+			if (this_rec.state_type.val == StateType.STATE_END) {
+				add_record = add_record & expandReduceEnd(key,val,output,reporter,this_rec);
+			} else if (this_rec.state_type.val == StateType.STATE_START) {
+				add_record = add_record & expandReduceStart(key,val,output,reporter,this_rec);				
+			}
+		} else if (task_type.equals("MAP")) {
+		  add_record = add_record & true;
+		}
+		
+		if (add_record) {
+		  log.debug("Collecting record " + this_rec + "("+this_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		  output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType,this_rec.getUniqueID()),this_rec); 
+	  }
+		
+		return;
+  } // end of map()
+
+	protected boolean expandReduceStart
+			(ChukwaRecordKey key, ChukwaRecord val,
+	   	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 	 Reporter reporter, FSMIntermedEntry this_rec)
+			throws IOException
+	{
+		FSMIntermedEntry redshuf_start_rec = null;
+		String newkey = new String("");
+		
+		try {
+			redshuf_start_rec = this_rec.clone();
+		} catch (CloneNotSupportedException e) {
+			// TODO: Error handling
+		}
+				
+		redshuf_start_rec.state_type 		= new StateType(StateType.STATE_START);
+		redshuf_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
+	 	
+		redshuf_start_rec.timestamp = new String(this_rec.timestamp);
+		redshuf_start_rec.time_start = new String(this_rec.timestamp);
+		redshuf_start_rec.time_end = new String("");
+		
+		redshuf_start_rec.generateUniqueID();
+		
+		log.debug("Collecting record " + redshuf_start_rec + 
+			"("+redshuf_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_start_rec.getUniqueID()),
+			redshuf_start_rec
+		);
+		
+		return true;
+	}
+	/*
+	 * Generates 5 extra FSMIntermedEntry's for a given reduce_end entry
+	 */
+	protected boolean expandReduceEnd 
+		(ChukwaRecordKey key, ChukwaRecord val,
+   	 OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+	 	 Reporter reporter, FSMIntermedEntry this_rec)
+		throws IOException
+	{
+		/* Split into ReduceShuffleWait, ReduceSort, ReduceReducer
+		 * But also retain the original combined Reduce at the same time 
+		 */
+		FSMIntermedEntry redshuf_end_rec = null;
+		FSMIntermedEntry redsort_start_rec = null, redsort_end_rec = null;
+		FSMIntermedEntry redred_start_rec = null, redred_end_rec = null;
+		
+		/* Extract field names for checking */
+		String [] fieldNames = val.getFields();
+		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
+		for (int i = 0; i < fieldNames.length; i++) fieldNamesList.add(fieldNames[i]); 
+		
+		try {
+		 	redsort_start_rec 	= this_rec.clone();
+		 	redred_start_rec 		= this_rec.clone();
+		 	redshuf_end_rec		= this_rec.clone();
+		 	redsort_end_rec 	= this_rec.clone();
+		 	redred_end_rec 		= this_rec.clone();
+		} catch (CloneNotSupportedException e) {
+			// TODO: Error handling
+		}
+
+		redshuf_end_rec.state_type 			= new StateType(StateType.STATE_END);
+		redshuf_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SHUFFLEWAIT);
+
+		redsort_start_rec.state_type 		= new StateType(StateType.STATE_START);
+		redsort_end_rec.state_type 			= new StateType(StateType.STATE_END);
+		redsort_start_rec.state_mapred 	= new MapRedState(MapRedState.REDUCE_SORT);
+		redsort_end_rec.state_mapred 		= new MapRedState(MapRedState.REDUCE_SORT);
+
+		redred_start_rec.state_type 		= new StateType(StateType.STATE_START);
+		redred_end_rec.state_type 			= new StateType(StateType.STATE_END);
+		redred_start_rec.state_mapred		= new MapRedState(MapRedState.REDUCE_REDUCER);
+		redred_end_rec.state_mapred			= new MapRedState(MapRedState.REDUCE_REDUCER);
+		
+		redshuf_end_rec.generateUniqueID();
+		redsort_start_rec.generateUniqueID();
+		redsort_end_rec.generateUniqueID();
+		redred_start_rec.generateUniqueID();
+		redred_end_rec.generateUniqueID();
+		
+		if(fieldNamesList.contains("SHUFFLE_FINISHED") && fieldNamesList.contains("SORT_FINISHED")) {
+		  if (val.getValue("SHUFFLE_FINISHED") == null) return false;
+		  if (val.getValue("SORT_FINISHED") == null) return false;
+		} else {
+		  return false;
+		}
+		redshuf_end_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED"));
+		redshuf_end_rec.time_start = new String("");
+		redshuf_end_rec.time_end = new String(val.getValue("SHUFFLE_FINISHED"));
+		redsort_start_rec.timestamp = new String(val.getValue("SHUFFLE_FINISHED")); 
+		redsort_start_rec.time_start = new String(val.getValue("SHUFFLE_FINISHED")); 
+		redsort_start_rec.time_end = new String("");
+		
+		assert(fieldNamesList.contains("SORT_FINISHED"));
+		redsort_end_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
+		redsort_end_rec.time_start = new String("");
+		redsort_end_rec.time_end = new String(val.getValue("SORT_FINISHED"));
+		redred_start_rec.timestamp = new String(val.getValue("SORT_FINISHED"));
+		redred_start_rec.time_start = new String(val.getValue("SORT_FINISHED"));
+		redred_start_rec.time_end = new String("");
+		
+		/* redred_end times are exactly the same as the original red_end times */
+		
+		log.debug("Collecting record " + redshuf_end_rec + 
+			"("+redshuf_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redshuf_end_rec.getUniqueID()),
+			redshuf_end_rec
+		);
+		
+		log.debug("Collecting record " + redsort_start_rec + 
+			"("+redsort_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_start_rec.getUniqueID()),
+			redsort_start_rec
+		);
+		
+		log.debug("Collecting record " + redsort_end_rec + 
+			"("+redsort_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redsort_end_rec.getUniqueID()),
+			redsort_end_rec
+		);
+		
+		log.debug("Collecting record " + redred_start_rec + 
+			"("+redred_start_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_start_rec.getUniqueID()),
+			redred_start_rec
+		);
+		
+		log.debug("Collecting record " + redred_end_rec + 
+			"("+redred_end_rec.state_type+") (ReduceType "+FSM_CRK_ReduceType+")");	
+		output.collect(
+			new ChukwaRecordKey(FSM_CRK_ReduceType,redred_end_rec.getUniqueID()),
+			redred_end_rec
+		);
+		
+		return true;
+	}
+
+} // end of mapper class

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/MapRedState.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+public class MapRedState {
+	public static final int NONE = 0;
+	public static final int MAP = 1;
+	public static final int REDUCE = 2;
+	public static final int REDUCE_SHUFFLEWAIT = 3;
+	public static final int REDUCE_SORT = 4;
+	public static final int REDUCE_REDUCER = 5;
+	public static final int SHUFFLE_LOCAL = 6;
+	public static final int SHUFFLE_REMOTE = 7;
+	public static final String [] NAMES = { "NONE", "MAP", "REDUCE", "REDUCE_SHUFFLEWAIT", 
+	  "REDUCE_SORT", "REDUCE_REDUCER", "SHUFFLE_LOCAL", "SHUFFLE_REMOTE"};
+	public MapRedState() { this.val = 0; }
+	public MapRedState(int newval) { this.val = newval; }
+	public int val;
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]);	}		
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/ParseUtilities.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.util.StringTokenizer;
+
+/**
+ * Parse Utilities for Parsing ChukwaRecords for FSMBuilder Mappers
+ *
+ */
+
+public class ParseUtilities {
+	
+	public static FSMIntermedEntry splitChukwaRecordKey
+		(String origkey, FSMIntermedEntry rec, String delim) 
+		throws Exception
+	{
+		StringTokenizer st = new StringTokenizer(origkey, delim);
+		if (st.countTokens() != 3) {
+			throw new Exception("Expected 3 tokens from ChukwaRecordKey but only found " + st.countTokens() + ".");
+		}
+		rec.time_orig_epoch = new String(st.nextToken());
+		rec.job_id = new String(st.nextToken());
+		rec.time_orig = new String(st.nextToken());
+		return rec;
+	}
+	
+	public static String extractHostnameFromTrackerName (String trackerName) 
+	{
+		int firstPos = "tracker_".length();
+		int secondPos;
+		String hostname = new String("");
+		
+		if (trackerName.startsWith("tracker_")) {
+			secondPos = trackerName.indexOf(":",firstPos);
+			hostname = trackerName.substring(firstPos, secondPos);
+		}
+		
+		return hostname;
+	}
+	
+	public static String removeRackFromHostname (String origHostname) 
+	{
+		int pos = origHostname.lastIndexOf("/");
+		if (pos > -1) {
+			return new String(origHostname.substring(pos));
+		} else {
+			return new String(origHostname);
+		}
+	}
+	
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/StateType.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+public class StateType {
+	public static final int STATE_NOOP = 0;
+	public static final int STATE_START = 1;
+	public static final int STATE_END = 2;
+	public static final int STATE_INSTANT = 3;
+	public static final String [] NAMES = {"STATE_NOOP", "STATE_START", "STATE_END", "STATE_INSTANT"};
+	public StateType() { this.val = 0; }
+	public StateType(int newval) { this.val = newval; }
+	public int val;
+	public String toString() { assert(this.val < NAMES.length && this.val >= 0); return new String(NAMES[this.val]); }
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java?rev=816739&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/analysis/salsa/fsm/TaskTrackerClientTraceMapper.java Fri Sep 18 18:43:01 2009
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.analysis.salsa.fsm;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.TreeSet;
+import java.util.regex.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.extraction.demux.*;
+import org.apache.hadoop.chukwa.extraction.engine.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.filecache.DistributedCache;
+
+/**
+ * Pluggable mapper for FSMBuilder
+ *
+ * K2 = State Name + State ID 
+ * (We use ChukwaRecordKey since it would already have implemented a bunch of
+ *  useful things such as Comparators etc.)
+ * V2 = TreeMap
+ */
+public class TaskTrackerClientTraceMapper 
+  extends MapReduceBase 
+  implements Mapper<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, FSMIntermedEntry> 
+{
+  private static Log log = LogFactory.getLog(FSMBuilder.class);
+	protected static final String SEP = "/";
+	protected static String FSM_CRK_ReduceType = FSMType.NAMES[FSMType.MAPREDUCE_FSM];
+	private final Pattern ipPattern =
+    Pattern.compile("([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+)[a-zA-Z\\-_:\\/].*");
+  
+  public void map
+    (ChukwaRecordKey key, ChukwaRecord val,
+     OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 Reporter reporter)
+    throws IOException 
+  {
+		String newkey = new String("");
+		String key_trimmed = key.toString().trim();
+		String task_type;
+		FSMIntermedEntry this_rec = new FSMIntermedEntry(); 
+
+		/* Extract field names for checking */
+		String [] fieldNames = val.getFields();
+		ArrayList<String> fieldNamesList = new ArrayList<String>(fieldNames.length);
+		for (int i = 0; i < fieldNames.length; i++) {
+		  fieldNamesList.add(fieldNames[i]);
+		}
+		
+		// Handle ClientTraceDetailed and DataNodeLog entries separately
+		// because we need to combine both types of entries for a complete picture
+		
+		if (key.getReduceType().equals("ClientTraceDetailed")) {
+		  assert(fieldNamesList.contains("op"));
+		  if (val.getValue("op").startsWith("MAPRED")) { 
+		    parseClientTraceDetailed(key, val, output, reporter, fieldNamesList);
+	    } // pick up only mapreduce operations
+		} 
+		return;
+  } // end of map()
+
+  protected final int DEFAULT_SHUFFLE_DURATION_MS = 10;
+
+  // works with 0.20 ClientTrace with no durations
+  // includes hack to create start+end entries immediately
+  protected void parseClientTraceDetailed
+    (ChukwaRecordKey key, ChukwaRecord val,
+     OutputCollector<ChukwaRecordKey, FSMIntermedEntry> output, 
+		 Reporter reporter, ArrayList<String> fieldNamesList)
+    throws IOException
+  {
+    FSMIntermedEntry start_rec, end_rec;
+    String current_op = null, src_add = null, dest_add = null;
+    String reduce_id = null, map_id = null;
+    
+    /* initialize state records */
+    start_rec = new FSMIntermedEntry();
+    end_rec = new FSMIntermedEntry();
+    start_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
+    start_rec.state_type = new StateType(StateType.STATE_START);
+    end_rec.fsm_type = new FSMType(FSMType.MAPREDUCE_FSM);
+    end_rec.state_type = new StateType(StateType.STATE_END);    
+        
+    /* extract addresses */
+    Matcher src_regex = ipPattern.matcher(val.getValue("src"));
+    if (src_regex.matches()) {
+      src_add = src_regex.group(1);
+    } else {
+      log.warn("Failed to match src IP:"+val.getValue("src")+"");
+      src_add = new String("");
+    }
+    Matcher dest_regex = ipPattern.matcher(val.getValue("dest"));
+    if (dest_regex.matches()) {
+      dest_add = dest_regex.group(1);
+    } else {
+      log.warn("Failed to match dest IP:"+val.getValue("dest")+"");
+      dest_add = new String("");
+    }
+    if (fieldNamesList.contains("reduceID")) {
+      reduce_id = new String(val.getValue("reduceID"));
+    } else {
+      // add a random number so we get unique keys or the CRK will break
+      Random r = new Random(); 
+      reduce_id = new String("noreduce" + r.nextInt());
+    }
+    
+    if (fieldNamesList.contains("cliID")) {
+      map_id = new String(val.getValue("cliID").trim());
+    } else {
+      map_id = new String("nomap");
+    }
+    
+    current_op = val.getValue("op");
+    
+    start_rec.host_exec = new String(src_add);
+    end_rec.host_exec = new String(src_add);
+    start_rec.host_other = new String(dest_add);
+    end_rec.host_other = new String(dest_add);
+            
+    // timestamp of the log entry is the end time; 
+    // subtract duration to get start time
+    long actual_time_ms = Long.parseLong(val.getValue("actual_time"));
+    if (fieldNamesList.contains("duration")) {
+      try {
+        actual_time_ms -= (Long.parseLong(val.getValue("duration").trim()) / 1000);
+      } catch (NumberFormatException nef) {
+        log.warn("Failed to parse duration: >>" + val.getValue("duration"));
+      }
+    } else {
+      actual_time_ms -= DEFAULT_SHUFFLE_DURATION_MS;
+    }
+    
+    String [] k = key.getKey().split("/");    
+
+    start_rec.time_orig_epoch = k[0];
+    start_rec.time_orig = (new Long(actual_time_ms)).toString(); // not actually used
+    start_rec.timestamp = (new Long(actual_time_ms)).toString();
+    start_rec.time_end = new String("");
+    start_rec.time_start = new String(start_rec.timestamp);
+    
+    end_rec.time_orig_epoch = k[0];
+    end_rec.time_orig = val.getValue("actual_time");    
+    end_rec.timestamp = new String(val.getValue("actual_time"));
+    end_rec.time_end = new String(val.getValue("actual_time"));
+    end_rec.time_start = new String("");
+
+    log.debug("Duration: " + (Long.parseLong(end_rec.time_end) - Long.parseLong(start_rec.time_start)));
+
+    start_rec.job_id = new String(reduce_id); // use job id = block id
+    end_rec.job_id = new String(reduce_id); 
+          
+    if (current_op.equals("MAPRED_SHUFFLE")) {
+      if (src_add != null && src_add.equals(dest_add)) {
+        start_rec.state_mapred = new MapRedState(MapRedState.SHUFFLE_LOCAL);
+      } else {
+        start_rec.state_mapred = new MapRedState(MapRedState.SHUFFLE_REMOTE);
+      }
+    } else {
+      log.warn("Invalid state: " + current_op);
+    }
+    end_rec.state_mapred = start_rec.state_mapred;
+    start_rec.state_name = start_rec.state_mapred.toString();
+    end_rec.state_name = end_rec.state_mapred.toString();
+    start_rec.identifier = new String(reduce_id + "@" + map_id);
+    end_rec.identifier = new String(reduce_id + "@" + map_id);
+    
+    start_rec.generateUniqueID();
+    end_rec.generateUniqueID();
+        
+    start_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
+		start_rec.add_info.put("csource",val.getValue("csource"));
+    end_rec.add_info.put(Record.tagsField,val.getValue(Record.tagsField));
+		end_rec.add_info.put("csource",val.getValue("csource"));
+		end_rec.add_info.put("STATE_STRING",new String("SUCCESS")); // by default
+		
+		// add counter value
+		end_rec.add_info.put("BYTES",val.getValue("bytes"));
+		    
+    String crk_mid_string_start = new String(start_rec.getUniqueID() + "_" + start_rec.timestamp);
+    String crk_mid_string_end = new String(end_rec.getUniqueID() + "_" + start_rec.timestamp);
+    output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_start), start_rec);
+    output.collect(new ChukwaRecordKey(FSM_CRK_ReduceType, crk_mid_string_end), end_rec);
+    
+    return;
+  }
+
+} // end of mapper class



Mime
View raw message