incubator-chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r761001 - in /hadoop/chukwa/trunk: bin/ conf/ src/java/org/apache/hadoop/chukwa/extraction/database/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/
Date Wed, 01 Apr 2009 18:27:09 GMT
Author: eyang
Date: Wed Apr  1 18:27:07 2009
New Revision: 761001

URL: http://svn.apache.org/viewvc?rev=761001&view=rev
Log:
CHUKWA-50.  Added parser for extract Job History and Job Conf into key value pairs, and database loader dictionary file.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java
Modified:
    hadoop/chukwa/trunk/bin/processSinkFiles.sh
    hadoop/chukwa/trunk/conf/database_create_tables
    hadoop/chukwa/trunk/conf/mdl.xml.template
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java

Modified: hadoop/chukwa/trunk/bin/processSinkFiles.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/processSinkFiles.sh?rev=761001&r1=761000&r2=761001&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/processSinkFiles.sh (original)
+++ hadoop/chukwa/trunk/bin/processSinkFiles.sh Wed Apr  1 18:27:07 2009
@@ -124,7 +124,7 @@
   debugDate=`date `
   echo "$debugDate done with demux job" >> "${CHUKWA_LOG_DIR}/mr.log"
    
-  ${JAVA_HOME}/bin/java -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -Dlog4j.configuration=log4j.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.extraction.database.DatabaseLoader "${srcDoneHdfsDir}/demux" SystemMetrics Df Hadoop_dfs Hadoop_jvm Hadoop_mapred Hadoop_rpc MSSRGraph MRJobCounters NodeActivity HodJob HodMachine Hadoop_dfs_FSDirectory Hadoop_dfs_FSNamesystem Hadoop_dfs_datanode Hadoop_dfs_namenode Hadoop_jvm_metrics Hadoop_mapred_job Hadoop_mapred_jobtracker Hadoop_mapred_shuffleOutput Hadoop_mapred_tasktracker Hadoop_rpc_metrics HDFSUsage
+  ${JAVA_HOME}/bin/java -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -Dlog4j.configuration=log4j.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.extraction.database.DatabaseLoader "${srcDoneHdfsDir}/demux" SystemMetrics Df Hadoop_dfs Hadoop_jvm Hadoop_mapred Hadoop_rpc MSSRGraph MRJobCounters NodeActivity HodJob HodMachine Hadoop_dfs_FSDirectory Hadoop_dfs_FSNamesystem Hadoop_dfs_datanode Hadoop_dfs_namenode Hadoop_jvm_metrics Hadoop_mapred_job Hadoop_mapred_jobtracker Hadoop_mapred_shuffleOutput Hadoop_mapred_tasktracker Hadoop_rpc_metrics JobData TaskData HDFSUsage
   endDbLoaderTime=`date +%s`
   dbLoaderDuration=$(( $endDbLoaderTime - $endDemuxTime))
   echo "dbLoaderDuration $dbLoaderDuration" >> "${CHUKWA_LOG_DIR}/mr.log"

Modified: hadoop/chukwa/trunk/conf/database_create_tables
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/database_create_tables?rev=761001&r1=761000&r2=761001&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/database_create_tables (original)
+++ hadoop/chukwa/trunk/conf/database_create_tables Wed Apr  1 18:27:07 2009
@@ -1,5 +1,29 @@
-create table if not exists node_activity_template 
-(
+drop table if exists node_activity_template;
+drop table if exists switch_data_template;
+drop table if exists system_metrics_template;
+drop table if exists disk_template;
+drop table if exists cluster_disk_template;
+drop table if exists cluster_system_metrics_template;
+drop table if exists dfs_namenode_template;
+drop table if exists dfs_datanode_template;
+drop table if exists dfs_fsnamesystem_template;
+drop table if exists dfs_throughput_template;
+drop table if exists hadoop_jvm_template;
+drop table if exists hadoop_mapred_template;
+drop table if exists hadoop_rpc_template;
+drop table if exists cluster_hadoop_rpc_template;
+drop table if exists hadoop_rpc_calls_template;
+drop table if exists mr_job_template;
+drop table if exists mr_task_template;
+drop table if exists mr_job_timeline_template;
+drop table if exists hod_machine_template;
+drop table if exists HodJob_template;
+drop table if exists hod_job_digest_template;
+drop table if exists user_util_template;
+drop table if exists hdfs_usage_template;
+drop table if exists QueueInfo;
+
+create table if not exists node_activity_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
     used int(11) default NULL,
     usedMachines text,
@@ -9,7 +33,7 @@
     downMachines text,
     primary key(timestamp),
     index (Timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists switch_data_template (
     timestamp timestamp default CURRENT_TIMESTAMP,
@@ -20,7 +44,7 @@
     value double,
     primary key(timestamp, host, port),
     index (Timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists system_metrics_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
@@ -80,7 +104,7 @@
     swap_used_pcnt float,
     primary key(host, timestamp),
     index (timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists disk_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
@@ -92,7 +116,7 @@
     fs varchar(40),
     primary key(timestamp,host,mount),
     index (timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists cluster_disk_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
@@ -102,7 +126,7 @@
     used_percent double,
     primary key(timestamp,mount),
     index (timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists cluster_system_metrics_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
@@ -162,7 +186,7 @@
     swap_used_pcnt float,
     primary key(timestamp),
     index (timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists dfs_namenode_template (
     timestamp timestamp default 0,
@@ -186,7 +210,7 @@
     fs_image_load_time double,
     primary key(timestamp, host),
     index(timeStamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists dfs_datanode_template (
     timestamp timestamp default 0,
@@ -220,7 +244,7 @@
     writes_from_remote_client double,
     primary key(timestamp, host),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists dfs_fsnamesystem_template (
     timestamp timestamp default 0,
@@ -236,7 +260,7 @@
     under_replicated_blocks double,
     primary key(timestamp, host),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists dfs_throughput_template (
     timestamp timestamp default 0,
@@ -270,7 +294,7 @@
     writes_from_remote_client double,
     primary key(timestamp),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists hadoop_jvm_template (
     timestamp timestamp default 0,
@@ -294,7 +318,7 @@
     threads_waiting double,
     primary key (timestamp, host, process_name),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists hadoop_mapred_template (
     timestamp timestamp default 0,
@@ -307,7 +331,7 @@
     reduces_launched double,
     primary key (timestamp, host),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists hadoop_rpc_template (
     timestamp timestamp default 0,
@@ -342,7 +366,7 @@
     submit_job_num_ops double,
     primary key (timestamp, host),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists cluster_hadoop_rpc_template (
     timestamp timestamp default 0,
@@ -377,7 +401,7 @@
     submit_job_num_ops double,
     primary key (timestamp),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
 create table if not exists hadoop_rpc_calls_template (
     timestamp timestamp default 0,
@@ -385,117 +409,83 @@
     calls double,
     primary key(timestamp, method),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
-create table if not exists mssrgraph_template (
-    timestamp timestamp default 0,
-    job_id VARCHAR(80),
-    type VARCHAR(10),
-    count double,
-    primary key (timestamp, job_id),
-    index(timestamp)
-);
-
-create table if not exists mr_job(
-    HodID varchar(20),
-    MRJobID varchar(80),
-    MRJobName varchar(100),
-    STATUS varchar(10),
-    SUBMIT_TIME timestamp default 0,
-    LAUNCH_TIME timestamp default 0,
-    FINISH_TIME timestamp default 0,
-    MAPPER_PHASE_END_TIME timestamp default 0,
-    TOTAL_MAPS int unsigned,
-    TOTAL_REDUCES int unsigned,
-    FINISHED_MAPS  int unsigned,
-    FINISHED_REDUCES  int unsigned,      
-    NumOfLocalSplits int unsigned,
-    NumOfRackLocalSplits int unsigned,
-    NUM_OF_MAPPER_ATTEMPTS int unsigned,
-    NUM_OF_REDUCER_ATTEMPTS int unsigned,
-    MAPPER_PHASE_EXECUTION_TIME int,
-    AvgMapperExecutionTime int unsigned,
-    AvgLocalMapperExecutionTime int unsigned,
-    AvgRackLocalMapperExecutionTime int unsigned,
-    AvgRemoteMapperExecutionTime int unsigned,
-    AvgReducerExecutionTime int unsigned,
-    AvgShuffleExecutionTime int unsigned,
-    AvgSortExecutionTime int unsigned,
-    MapperClass varchar(80),
-    ReducerClass varchar(80),
-    PartitionerClass varchar(80),
-    CombinerClass varchar(80),
-    InputFormatClass varchar(80),
-    InputKeyClass varchar(80),
-    InputValueClass varchar(80),
-    OutputFormatClass varchar(80),
-    OutputCompressed tinyint,
-    OutputCompressionType  varchar(20),
-    OutputCompressionCodec varchar(20),
-    OutputKeyClass varchar(80),
-    OutputValueClass varchar(80),
-    MapOutputKeyClass varchar(80),
-    MapOutputValueClass varchar(80),
-    MapOutputCompressed tinyint,
-    MapOutputCompressionType  varchar(20),
-    MapOutputCompressionCodec varchar(20),
-    InputDataSizeInMB int unsigned,
-    MapOutputSizeInMB int unsigned,
-    OutputDataSizeInMB int unsigned,
-    MR_JOBCONF  text, 
-    InputDir  text, 
-    primary key(SUBMIT_TIME, HodID, MRJobID),
-    index(SUBMIT_TIME)
-);
+create table if not exists mr_job_template (
+    job_id varchar(80),
+    user varchar(32),
+    queue varchar(100),
+    status varchar(10),
+    submit_time timestamp default 0,
+    launch_time timestamp default 0,
+    finish_time timestamp default 0,
+    hdfs_bytes_read bigint default 0,
+    hdfs_bytes_written bigint default 0,
+    local_bytes_read bigint default 0,
+    local_bytes_written bigint default 0,
+    launched_map_tasks bigint default 0,
+    launched_reduce_tasks bigint default 0,
+    data_local_map_tasks bigint default 0,
+    data_local_reduce_tasks bigint default 0,
+    map_input_bytes bigint default 0,
+    map_output_bytes bigint default 0,
+    map_input_records bigint default 0,
+    map_output_records bigint default 0,
+    combine_input_records bigint default 0,
+    combine_output_records bigint default 0,
+    spilled_records bigint default 0,
+    reduce_input_groups bigint default 0,
+    reduce_output_groups bigint default 0,
+    reduce_input_records bigint default 0,
+    reduce_output_records bigint default 0,
+    jobconf  text, 
+    primary key(job_id),
+    index(submit_time, finish_time, user, queue)
+) ENGINE=InnoDB;
 
-create table if not exists mr_job_counters_template (
-    timestamp timestamp default 0,
-    job_id BIGINT,
-    hdfs_bytes_read double,
-    hdfs_bytes_written double,
-    local_bytes_read double,
-    local_bytes_written double,
-    data_local_map_tasks double,
-    launched_map_tasks double,
-    launched_reduce_tasks double,
-    combine_input_records double,
-    combine_output_records double,
-    map_input_bytes double,
-    map_output_bytes double,
-    map_input_records double,
-    map_output_records double,
-    reduce_input_groups double,
-    reduce_input_records double,
-    reduce_output_records double,
-    primary key (timestamp, job_id),
-    index(timestamp)
-);
+create table if not exists mr_task_template (
+    job_id VARCHAR(80),
+    task_id VARCHAR(80),
+    start_time timestamp default 0,
+    finish_time timestamp default 0,
+    status varchar(10) default 0,
+    attempts tinyint default 0,
+    hdfs_bytes_read bigint default 0,
+    hdfs_bytes_written bigint default 0,
+    local_bytes_read bigint default 0,
+    local_bytes_written bigint default 0,
+    map_input_bytes bigint default 0,
+    map_output_bytes bigint default 0,
+    map_input_records bigint default 0,
+    map_output_records bigint default 0,
+    combine_input_records bigint default 0,
+    combine_output_records bigint default 0,
+    spilled_records bigint default 0,
+    reduce_input_groups bigint default 0,
+    reduce_output_groups bigint default 0,
+    reduce_input_records bigint default 0,
+    reduce_output_records bigint default 0,
+    reduce_input_bytes bigint default 0,
+    reduce_output_bytes bigint default 0,
+    primary key(task_id),
+    index(start_time, finish_time, job_id)
+) ENGINE=InnoDB;
 
 create table if not exists mr_job_timeline_template (
     timestamp timestamp default CURRENT_TIMESTAMP,
-    mr_job_id varchar(80),
+    job_id varchar(80),
     task_type varchar(20),
-    tasks double,
-    time double,
-    primary key(timestamp, mr_job_id),
-    index(timestamp)
-);
+    task_id double,
+    status varchar(20),
+    primary key(timestamp, job_id),
+    index(timestamp, job_id, task_id, task_type)
+) ENGINE=InnoDB;
 
-create table if not exists mr_finish_time_vs_size_template (
-    timestamp timestamp default CURRENT_TIMESTAMP,
-    mr_job_id varchar(80),
-    task_type varchar(20),
-    size double,
-    time double,
-    primary key(timestamp, mr_job_id),
-    index(timestamp)
-);
-    
 create table if not exists hod_machine_template (
     hodid varchar(20) not null, 
     host varchar(40) not null,
     index(HodId)
-);
+) ENGINE=InnoDB;
 
 create table if not exists HodJob_template (
     HodID varchar(20), 
@@ -510,7 +500,7 @@
     LogProcessStatus varchar(20),
     primary key(HodId),
     index(StartTime, EndTime)
-);
+) ENGINE=InnoDB;
 
 create table if not exists hod_job_digest_template (
     timestamp timestamp default 0,
@@ -572,34 +562,33 @@
     swap_used_pcnt float,
     primary key(HodId, timestamp),
     index(timestamp)
-); 
+) ENGINE=InnoDB; 
 
 create table if not exists user_util_template (
     timestamp timestamp default CURRENT_TIMESTAMP,
     user VARCHAR(20),
-    node_total int,
-    cpu_unused double,
-    cpu_used double,
-    cpu_used_pcnt float,
-    disk_unused double,
-    disk_used double,
-    disk_used_pcnt float,
-    network_unused double,
-    network_used double,
-    network_used_pcnt float,
-    memory_unused double,
-    memory_used double,
-    memory_used_pcnt float,
+    node_total int default 0,
+    cpu_unused double default 0,
+    cpu_used double default 0,
+    cpu_used_pcnt float default 0,
+    disk_unused double default 0,
+    disk_used double default 0,
+    disk_used_pcnt float default 0,
+    network_unused double default 0,
+    network_used double default 0,
+    network_used_pcnt float default 0,
+    memory_unused double default 0,
+    memory_used double default 0,
+    memory_used_pcnt float default 0,
     primary key(user, timestamp),
     index(timestamp)
-);
+) ENGINE=InnoDB;
 
-create table if not exists util_template (
+create table if not exists hdfs_usage_template (
     timestamp timestamp default CURRENT_TIMESTAMP,
     user VARCHAR(20),
-    queue VARCHAR(20),
-    bytes bigint,
-    slot_hours double,
-    primary key(user, timestamp),
-    index(queue)
-);
+    bytes bigint default 0,
+    files bigint default 0,
+    primary key(timestamp, user),
+    index(timestamp)
+) ENGINE=InnoDB;

Modified: hadoop/chukwa/trunk/conf/mdl.xml.template
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/mdl.xml.template?rev=761001&r1=761000&r2=761001&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/mdl.xml.template (original)
+++ hadoop/chukwa/trunk/conf/mdl.xml.template Wed Apr  1 18:27:07 2009
@@ -390,6 +390,11 @@
 </property>
 
 <property>
+  <name>metric.systemmetrics.%system</name>
+  <value>cpu_system_pcnt</value>
+</property>
+
+<property>
   <name>metric.systemmetrics.%user</name>
   <value>cpu_user_pcnt</value>
 </property>
@@ -1285,5 +1290,486 @@
   <value>5,30,180,720</value>
 </property>
 
+
+<!-- start mapping for jobdata -->
+<property>
+  <name>report.db.name.jobdata</name>
+  <value>mr_job</value>
+  <description></description>
+</property>
+  
+<property>
+  <name>consolidator.table.mr_job</name>
+  <value>5,30,180,720</value>
+</property>
+
+<property>
+  <name>metric.jobdata.jobid</name>
+  <value>job_id</value>
+</property>
+
+<property>
+  <name>metric.jobdata.user</name>
+  <value>user</value>
+</property>
+
+<property>
+  <name>metric.jobdata.mapred.job.queue.name</name>
+  <value>queue</value>
+</property>
+
+<property>
+  <name>metric.jobdata.job_status</name>
+  <value>status</value>
+</property>
+
+<property>
+  <name>metric.jobdata.submit_time</name>
+  <value>submit_time</value>
+</property>
+
+<property>
+  <name>metric.jobdata.launch_time</name>
+  <value>launch_time</value>
+</property>
+
+<property>
+  <name>metric.jobdata.finish_time</name>
+  <value>finish_time</value>
+</property>
+
+<property>
+  <name>metric.jobdata.jobconf-json</name>
+  <value>jobconf</value>
+</property>
+  
+<!-- for hadoop 0.20 -->
+<property>
+  <name>metric.jobdata.counter:filesystemcounters:hdfs_bytes_read</name>
+  <value>hdfs_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:filesystemcounters:hdfs_bytes_written</name>
+  <value>hdfs_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:filesystemcounters:file_bytes_read</name>
+  <value>local_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:filesystemcounters:file_bytes_written</name>
+  <value>local_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.jobinprogress$counter:total_launched_maps</name>
+  <value>launched_map_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.jobinprogress$counter:total_launched_reduces</name>
+  <value>launched_reduce_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.jobinprogress$counter:data_local_maps</name>
+  <value>data_local_map_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.jobinprogress$counter:data_local_reduces</name>
+  <value>data_local_reduce_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:map_input_bytes</name>
+  <value>map_input_bytes</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:map_output_bytes</name>
+  <value>map_output_bytes</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:map_input_records</name>
+  <value>map_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:map_output_records</name>
+  <value>map_output_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:combine_input_records</name>
+  <value>combine_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:combine_output_records</name>
+  <value>combine_output_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:spilled_records</name>
+  <value>spilled_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:reduce_input_groups</name>
+  <value>reduce_input_groups</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:reduce_output_groups</name>
+  <value>reduce_output_groups</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:reduce_input_records</name>
+  <value>reduce_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:org.apache.hadoop.mapred.task$counter:reduce_output_records</name>
+  <value>reduce_output_records</value>
+</property>
+<!-- end for hadoop 0.20 -->
+
+<!-- for hadoop 0.18 -->
+<property>
+  <name>metric.jobdata.counter:hadoop18:file-systems.hdfs-bytes-read</name>
+  <value>hdfs_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:file-systems.hdfs-bytes-written</name>
+  <value>hdfs_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:file-systems.local-bytes-read</name>
+  <value>local_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:file-systems.local-bytes-written</name>
+  <value>local_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:job-counters-.launched-map-tasks</name>
+  <value>launched_map_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:job-counters-.launched-reduce-tasks</name>
+  <value>launched_reduce_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:job-counters-.data-local-map-tasks</name>
+  <value>data_local_map_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:job-counters-.data-local-reduce-tasks</name>
+  <value>data_local_reduce_tasks</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.map-input-bytes</name>
+  <value>map_input_bytes</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.map-output-bytes</name>
+  <value>map_output_bytes</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.map-input-records</name>
+  <value>map_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.map-output-records</name>
+  <value>map_output_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.combine-input-records</name>
+  <value>combine_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.combine-output-records</name>
+  <value>combine_output_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.spilled-records</name>
+  <value>spilled_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.reduce-input-groups</name>
+  <value>reduce_input_groups</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.reduce-output-groups</name>
+  <value>reduce_output_groups</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.reduce-input-records</name>
+  <value>reduce_input_records</value>
+</property>
+
+<property>
+  <name>metric.jobdata.counter:hadoop18:map-reduce-framework.reduce-output-records</name>
+  <value>reduce_output_records</value>
+</property>
+
+<!-- end for hadoop 0.18 -->
+<!-- end mapping for jobdata -->
+
+<!-- start mapping for taskdata -->
+<property>
+  <name>report.db.name.taskdata</name>
+  <value>mr_task</value>
+  <description></description>
+</property>
+  
+<property>
+  <name>consolidator.table.mr_task</name>
+  <value>5,30,180,720</value>
+</property>
+
+<property>
+  <name>metric.taskdata.jobid</name>
+  <value>job_id</value>
+</property>
+
+<property>
+  <name>metric.taskdata.taskid</name>
+  <value>task_id</value>
+</property>
+
+<property>
+  <name>metric.taskdata.start_time</name>
+  <value>start_time</value>
+</property>
+
+<property>
+  <name>metric.taskdata.finish_time</name>
+  <value>finish_time</value>
+</property>
+
+<property>
+  <name>metric.taskdata.task_status</name>
+  <value>status</value>
+</property>
+
+<property>
+  <name>metric.taskdata.task_attempt_times</name>
+  <value>attempts</value>
+</property>
+
+<!-- for hadoop 20 -->
+<property>
+  <name>metric.taskdata.counter:filesystemcounters:hdfs_bytes_read</name>
+  <value>hdfs_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:filesystemcounters:hdfs_bytes_written</name>
+  <value>hdfs_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:filesystemcounters:file_bytes_read</name>
+  <value>local_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:filesystemcounters:file_bytes_written</name>
+  <value>local_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:map_input_bytes</name>
+  <value>map_input_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:map_output_bytes</name>
+  <value>map_output_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:map_input_records</name>
+  <value>map_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:map_output_records</name>
+  <value>map_output_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:combine_input_records</name>
+  <value>combine_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:combine_output_records</name>
+  <value>combine_output_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:spilled_records</name>
+  <value>spilled_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_input_groups</name>
+  <value>reduce_input_groups</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_output_groups</name>
+  <value>reduce_output_groups</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_input_bytes</name>
+  <value>reduce_input_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_output_bytes</name>
+  <value>reduce_output_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_input_records</name>
+  <value>reduce_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:org.apache.hadoop.mapred.task$counter:reduce_output_records</name>
+  <value>reduce_output_records</value>
+</property>
+<!-- end for hadoop 20 -->
+
+<!-- for hadoop 18 -->
+<property>
+  <name>metric.taskdata.counter:hadoop18:file-systems.hdfs-bytes-read</name>
+  <value>hdfs_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:file-systems.hdfs-bytes-written</name>
+  <value>hdfs_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:file-systems.local-bytes-read</name>
+  <value>local_bytes_read</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:file-systems.local-bytes-written</name>
+  <value>local_bytes_written</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.map-input-bytes</name>
+  <value>map_input_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.map-output-bytes</name>
+  <value>map_output_bytes</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.map-input-records</name>
+  <value>map_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.map-output-records</name>
+  <value>map_output_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.combine-input-records</name>
+  <value>combine_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.combine-output-records</name>
+  <value>combine_output_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.spilled-records</name>
+  <value>spilled_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.reduce-input-groups</name>
+  <value>reduce_input_groups</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.reduce-output-groups</name>
+  <value>reduce_output_groups</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.reduce-input-records</name>
+  <value>reduce_input_records</value>
+</property>
+
+<property>
+  <name>metric.taskdata.counter:hadoop18:map-reduce-framework.reduce-output-records</name>
+  <value>reduce_output_records</value>
+</property>
+<!-- end for hadoop 18 -->
+
+<property>
+  <name>report.db.name.hdfsusage</name>
+  <value>hdfs_usage</value>
+  <description></description>
+</property>
+
+<property>
+  <name>metric.hdfsusage.user</name>
+  <value>user</value>
+</property>
+
+<property>
+  <name>metric.hdfsusage.bytes</name>
+  <value>bytes</value>
+</property>
+
+<property>
+  <name>metric.hdfsusage.timestamp</name>
+  <value>timestamp</value>
+</property>
+  
+<property>
+  <name>consolidator.table.hdfs_usage</name>
+  <value>5,30,180,720</value>
+</property>
+
 </configuration>
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=761001&r1=761000&r2=761001&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Wed Apr  1 18:27:07 2009
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.chukwa.extraction.database;
 
-
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -28,10 +27,14 @@
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -41,23 +44,40 @@
 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.chukwa.util.ClusterConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 
 public class MetricDataLoader {
   private static Log log = LogFactory.getLog(MetricDataLoader.class);
-  private static Connection conn = null;
-  private static Statement stmt = null;
+
+  private Statement stmt = null;
   private ResultSet rs = null;
-  private static DatabaseConfig mdlConfig = null;
-  private static HashMap<String, String> normalize = null;
-  private static HashMap<String, String> transformer = null;
-  private static HashMap<String, Float> conversion = null;
-  private static HashMap<String, String> dbTables = null;
+  private DatabaseConfig mdlConfig = null;
+  private HashMap<String, String> normalize = null;
+  private HashMap<String, String> transformer = null;
+  private HashMap<String, Float> conversion = null;
+  private HashMap<String, String> dbTables = null;
   private HashMap<String, HashMap<String, Integer>> dbSchema = null;
-  private static String newSpace = "-";
-  private static boolean batchMode = true;
+  private String newSpace = "-";
+  private boolean batchMode = true;
+  private Connection conn = null;
+
+  private static ChukwaConfiguration conf = null;
+  private static FileSystem fs = null;
+
+  static {
+    conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    try {
+      fs = FileSystem.get(new URI(fsName), conf);
+    } catch (Exception e) {
+      e.printStackTrace();
+      log.warn("Exception during HDFS init, Bailing out!", e);
+      System.exit(-1);
+    }
+  }
 
   /** Creates a new instance of DBWriter */
   public MetricDataLoader() {
@@ -150,43 +170,39 @@
   public void process(Path source) throws IOException, URISyntaxException,
       SQLException {
 
-    System.out.println("Input file:" + source.getName());
-
-    ChukwaConfiguration conf = new ChukwaConfiguration();
-    String fsName = conf.get("writer.hdfs.filesystem");
-    FileSystem fs = FileSystem.get(new URI(fsName), conf);
+    log.info("StreamName: " + source.getName());
 
     SequenceFile.Reader r = new SequenceFile.Reader(fs, source, conf);
 
     stmt = conn.createStatement();
     conn.setAutoCommit(false);
+    long currentTimeMillis = System.currentTimeMillis();
+    boolean isSuccessful = true;
+    String recordType = null;
 
     ChukwaRecordKey key = new ChukwaRecordKey();
     ChukwaRecord record = new ChukwaRecord();
     try {
       int batch = 0;
       while (r.next(key, record)) {
-        boolean isSuccessful = true;
         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
         log.debug("Timestamp: " + record.getTime());
         log.debug("DataType: " + key.getReduceType());
-        log.debug("StreamName: " + source.getName());
 
         String[] fields = record.getFields();
         String table = null;
         String[] priKeys = null;
         HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
-        String normKey = new String();
+        StringBuilder normKey = new StringBuilder();
         String node = record.getValue("csource");
-        String recordType = key.getReduceType().toLowerCase();
-        if (dbTables.containsKey("report.db.name." + recordType)) {
-
-          String[] tmp = mdlConfig.findTableName(mdlConfig
-              .get("report.db.name." + recordType), record.getTime(), record
-              .getTime());
+        recordType = key.getReduceType().toLowerCase();
+        String dbKey = "report.db.name." + recordType;
+        if (dbTables.containsKey(dbKey)) {
+          String[] tmp = mdlConfig.findTableName(mdlConfig.get(dbKey), record
+              .getTime(), record.getTime());
           table = tmp[0];
         } else {
-          log.debug("report.db.name." + recordType + " does not exist.");
+          log.debug(dbKey + " does not exist.");
           continue;
         }
         log.debug("table name:" + table);
@@ -199,21 +215,35 @@
           String keyName = escape(field.toLowerCase(), newSpace);
           String keyValue = escape(record.getValue(field).toLowerCase(),
               newSpace);
-          if (normalize.containsKey("normalize." + recordType + "." + keyName)) {
-            if (normKey.equals("")) {
-              normKey = keyName + "." + keyValue;
+          StringBuilder buildKey = new StringBuilder();
+          buildKey.append("normalize.");
+          buildKey.append(recordType);
+          buildKey.append(".");
+          buildKey.append(keyName);
+          if (normalize.containsKey(buildKey.toString())) {
+            if (normKey.toString().equals("")) {
+              normKey.append(keyName);
+              normKey.append(".");
+              normKey.append(keyValue);
             } else {
-              normKey = normKey + "." + keyName + "." + keyValue;
+              normKey.append(".");
+              normKey.append(keyName);
+              normKey.append(".");
+              normKey.append(keyValue);
             }
           }
-          String normalizedKey = "metric." + recordType + "." + normKey;
+          StringBuilder normalizedKey = new StringBuilder();
+          normalizedKey.append("metric.");
+          normalizedKey.append(recordType);
+          normalizedKey.append(".");
+          normalizedKey.append(normKey);
           if (hashReport.containsKey(node)) {
             HashMap<String, String> tmpHash = hashReport.get(node);
-            tmpHash.put(normalizedKey, keyValue);
+            tmpHash.put(normalizedKey.toString(), keyValue);
             hashReport.put(node, tmpHash);
           } else {
             HashMap<String, String> tmpHash = new HashMap<String, String>();
-            tmpHash.put(normalizedKey, keyValue);
+            tmpHash.put(normalizedKey.toString(), keyValue);
             hashReport.put(node, tmpHash);
           }
         }
@@ -221,11 +251,21 @@
           String valueName = escape(field.toLowerCase(), newSpace);
           String valueValue = escape(record.getValue(field).toLowerCase(),
               newSpace);
-          String normalizedKey = "metric." + recordType + "." + valueName;
-          if (!normKey.equals("")) {
-            normalizedKey = "metric." + recordType + "." + normKey + "."
-                + valueName;
+          StringBuilder buildKey = new StringBuilder();
+          buildKey.append("metric.");
+          buildKey.append(recordType);
+          buildKey.append(".");
+          buildKey.append(valueName);
+          if (!normKey.toString().equals("")) {
+            buildKey = new StringBuilder();
+            buildKey.append("metric.");
+            buildKey.append(recordType);
+            buildKey.append(".");
+            buildKey.append(normKey);
+            buildKey.append(".");
+            buildKey.append(valueName);
           }
+          String normalizedKey = buildKey.toString();
           if (hashReport.containsKey(node)) {
             HashMap<String, String> tmpHash = hashReport.get(node);
             tmpHash.put(normalizedKey, valueValue);
@@ -240,101 +280,158 @@
         }
         Iterator<String> i = hashReport.keySet().iterator();
         while (i.hasNext()) {
-          long currentTimeMillis = System.currentTimeMillis();
           Object iteratorNode = i.next();
           HashMap<String, String> recordSet = hashReport.get(iteratorNode);
           Iterator<String> fi = recordSet.keySet().iterator();
           // Map any primary key that was not included in the report keyName
-          String sqlPriKeys = "";
+          StringBuilder sqlPriKeys = new StringBuilder();
           try {
             for (String priKey : priKeys) {
               if (priKey.equals("timestamp")) {
-                sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime + "\"";
+                sqlPriKeys.append(priKey);
+                sqlPriKeys.append(" = \"");
+                sqlPriKeys.append(sqlTime);
+                sqlPriKeys.append("\"");
               }
               if (!priKey.equals(priKeys[priKeys.length - 1])) {
-                sqlPriKeys = sqlPriKeys + ", ";
+                sqlPriKeys.append(sqlPriKeys);
+                sqlPriKeys.append(", ");
               }
             }
           } catch (Exception nullException) {
             // ignore if primary key is empty
           }
           // Map the hash objects to database table columns
-          String sqlValues = "";
+          StringBuilder sqlValues = new StringBuilder();
           boolean firstValue = true;
           while (fi.hasNext()) {
-            String fieldKey = fi.next();
+            String fieldKey = (String) fi.next();
             if (transformer.containsKey(fieldKey)) {
               if (!firstValue) {
-                sqlValues = sqlValues + ", ";
+                sqlValues.append(", ");
               }
               try {
-                if (dbSchema.get(dbTables.get("report.db.name." + recordType))
-                    .get(transformer.get(fieldKey)) == java.sql.Types.VARCHAR
-                    || dbSchema.get(
-                        dbTables.get("report.db.name." + recordType)).get(
+                if (dbSchema.get(dbTables.get(dbKey)).get(
+                    transformer.get(fieldKey)) == java.sql.Types.VARCHAR
+                    || dbSchema.get(dbTables.get(dbKey)).get(
                         transformer.get(fieldKey)) == java.sql.Types.BLOB) {
-                  if (conversion.containsKey("conversion." + fieldKey)) {
-                    sqlValues = sqlValues + transformer.get(fieldKey) + "="
-                        + recordSet.get(fieldKey)
-                        + conversion.get("conversion." + fieldKey).toString();
+                  String conversionKey = "conversion." + fieldKey;
+                  if (conversion.containsKey(conversionKey)) {
+                    sqlValues.append(transformer.get(fieldKey));
+                    sqlValues.append("=");
+                    sqlValues.append(recordSet.get(fieldKey));
+                    sqlValues.append(conversion.get(conversionKey).toString());
                   } else {
-                    sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
-                        + recordSet.get(fieldKey) + "\"";
+                    sqlValues.append(transformer.get(fieldKey));
+                    sqlValues.append("=\"");
+                    sqlValues.append(recordSet.get(fieldKey).replaceAll("\"",
+                        "'"));
+                    sqlValues.append("\"");
+                  }
+                } else if (dbSchema.get(dbTables.get(dbKey)).get(
+                    transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
+                  SimpleDateFormat formatter = new SimpleDateFormat(
+                      "yyyy-MM-dd HH:mm:ss");
+                  Date recordDate = new Date();
+                  recordDate.setTime((long) Long.parseLong(recordSet
+                      .get(fieldKey)));
+                  sqlValues.append(transformer.get(fieldKey));
+                  sqlValues.append("=\"");
+                  sqlValues.append(formatter.format(recordDate));
+                  sqlValues.append("\"");
+                } else if (dbSchema.get(dbTables.get(dbKey)).get(
+                    transformer.get(fieldKey)) == java.sql.Types.BIGINT
+                    || dbSchema.get(dbTables.get(dbKey)).get(
+                        transformer.get(fieldKey)) == java.sql.Types.TINYINT
+                    || dbSchema.get(dbTables.get(dbKey)).get(
+                        transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
+                  long tmp = 0;
+                  try {
+                    tmp = Long.parseLong(recordSet.get(fieldKey).toString());
+                    String conversionKey = "conversion." + fieldKey;
+                    if (conversion.containsKey(conversionKey)) {
+                      tmp = tmp
+                          * Long.parseLong(conversion.get(conversionKey)
+                              .toString());
+                    }
+                  } catch (Exception e) {
+                    tmp = 0;
                   }
+                  sqlValues.append(transformer.get(fieldKey));
+                  sqlValues.append("=");
+                  sqlValues.append(tmp);
                 } else {
-                  double tmp;
+                  double tmp = 0;
                   tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
-                  if (conversion.containsKey("conversion." + fieldKey)) {
+                  String conversionKey = "conversion." + fieldKey;
+                  if (conversion.containsKey(conversionKey)) {
                     tmp = tmp
-                        * Double.parseDouble(conversion.get(
-                            "conversion." + fieldKey).toString());
+                        * Double.parseDouble(conversion.get(conversionKey)
+                            .toString());
                   }
                   if (Double.isNaN(tmp)) {
                     tmp = 0;
                   }
-                  sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
+                  sqlValues.append(transformer.get(fieldKey));
+                  sqlValues.append("=");
+                  sqlValues.append(tmp);
                 }
                 firstValue = false;
               } catch (NumberFormatException ex) {
-                if (conversion.containsKey("conversion." + fieldKey)) {
-                  sqlValues = sqlValues + transformer.get(fieldKey) + "="
-                      + recordSet.get(fieldKey)
-                      + conversion.get("conversion." + fieldKey).toString();
+                String conversionKey = "conversion." + fieldKey;
+                if (conversion.containsKey(conversionKey)) {
+                  sqlValues.append(transformer.get(fieldKey));
+                  sqlValues.append("=");
+                  sqlValues.append(recordSet.get(fieldKey));
+                  sqlValues.append(conversion.get(conversionKey).toString());
                 } else {
-                  sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
-                      + recordSet.get(fieldKey) + "\"";
+                  sqlValues.append(transformer.get(fieldKey));
+                  sqlValues.append("=\"");
+                  sqlValues.append(recordSet.get(fieldKey)
+                      .replaceAll("\"", "'"));
+                  sqlValues.append("\"");
                 }
                 firstValue = false;
+              } catch (NullPointerException ex) {
+                log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
+                    + " does not contain valid MDL structure.");
               }
             }
           }
 
-          String sql = null;
+          StringBuilder sql = new StringBuilder();
           if (sqlPriKeys.length() > 0) {
-            sql = "INSERT INTO " + table + " SET " + sqlPriKeys + ","
-                + sqlValues + " ON DUPLICATE KEY UPDATE " + sqlPriKeys + ","
-                + sqlValues + ";";
+            sql.append("INSERT INTO ");
+            sql.append(table);
+            sql.append(" SET ");
+            sql.append(sqlPriKeys.toString());
+            sql.append(",");
+            sql.append(sqlValues.toString());
+            sql.append(" ON DUPLICATE KEY UPDATE ");
+            sql.append(sqlPriKeys.toString());
+            sql.append(",");
+            sql.append(sqlValues.toString());
+            sql.append(";");
           } else {
-            sql = "INSERT INTO " + table + " SET " + sqlValues
-                + " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
+            sql.append("INSERT INTO ");
+            sql.append(table);
+            sql.append(" SET ");
+            sql.append(sqlValues.toString());
+            sql.append(" ON DUPLICATE KEY UPDATE ");
+            sql.append(sqlValues.toString());
+            sql.append(";");
           }
-          log.debug(sql);
+          log.trace(sql);
           if (batchMode) {
-            stmt.addBatch(sql);
+            stmt.addBatch(sql.toString());
             batch++;
           } else {
-            stmt.execute(sql);
+            stmt.execute(sql.toString());
           }
-          String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
-          long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
-          int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
           if (batchMode && batch > 20000) {
             int[] updateCounts = stmt.executeBatch();
             batch = 0;
           }
-          log.debug(logMsg + " (" + recordType + ","
-              + RecordUtil.getClusterName(record) + "," + record.getTime()
-              + ") " + latencySeconds + " sec");
         }
 
       }
@@ -343,38 +440,46 @@
       }
     } catch (SQLException ex) {
       // handle any errors
+      isSuccessful = false;
       log.error(ex, ex);
       log.error("SQLException: " + ex.getMessage());
       log.error("SQLState: " + ex.getSQLState());
       log.error("VendorError: " + ex.getErrorCode());
     } catch (Exception e) {
+      isSuccessful = false;
+      log.error(ExceptionUtil.getStackTrace(e));
       e.printStackTrace();
     } finally {
+      if (batchMode) {
+        conn.commit();
+      }
+      long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
+      int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
+      String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
+      log.info(logMsg + " (" + recordType + ","
+          + RecordUtil.getClusterName(record) + ") " + latencySeconds + " sec");
       if (rs != null) {
         try {
           rs.close();
-        } catch (SQLException sqlEx) {
-          // ignore
+        } catch (SQLException ex) {
+          log.error(ex, ex);
+          log.error("SQLException: " + ex.getMessage());
+          log.error("SQLState: " + ex.getSQLState());
+          log.error("VendorError: " + ex.getErrorCode());
         }
         rs = null;
       }
       if (stmt != null) {
         try {
           stmt.close();
-        } catch (SQLException sqlEx) {
-          // ignore
+        } catch (SQLException ex) {
+          log.error(ex, ex);
+          log.error("SQLException: " + ex.getMessage());
+          log.error("SQLState: " + ex.getSQLState());
+          log.error("VendorError: " + ex.getErrorCode());
         }
         stmt = null;
       }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sqlEx) {
-          // ignore
-        }
-        conn = null;
-      }
-
     }
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=761001&r1=761000&r2=761001&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Wed Apr  1 18:27:07 2009
@@ -1,19 +1,22 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Calendar;
 import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
+
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
+import org.json.JSONObject;
+import org.mortbay.util.ajax.JSON;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -21,16 +24,15 @@
 import org.w3c.dom.Text;
 
 public class JobConfProcessor extends AbstractProcessor {
-  static Logger log = Logger.getLogger(JobConfProcessor.class);
-  static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
-  static Pattern hodPattern = Pattern
-      .compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
-  static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
-
-  @Override
-  protected void parse(String recordEntry,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-      throws Throwable {
+    static Logger log = Logger.getLogger(JobConfProcessor.class);
+    static  Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
+    static  Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
+    @Override
+    protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+      Reporter reporter) 
+   throws Throwable
+  {
     Long time = 0L;
     Random randomNumber = new Random();
     String tags = this.chunk.getTags();
@@ -40,81 +42,82 @@
       time = Long.parseLong(matcher.group(2));
     }
     String capp = this.chunk.getApplication();
-    String hodID = "";
-    String jobID = "";
-    matcher = hodPattern.matcher(capp);
-    if (matcher.matches()) {
-      hodID = matcher.group(3);
-    }
-    matcher = jobPattern.matcher(capp);
-    if (matcher.matches()) {
-      jobID = matcher.group(2);
-    }
-    ChukwaRecord record = new ChukwaRecord();
-    DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
-        .newInstance();
-    // ignore all comments inside the xml file
-    docBuilderFactory.setIgnoringComments(true);
-    try {
-      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-      Document doc = null;
-      String fileName = "test_" + randomNumber.nextInt();
-      File tmp = new File(fileName);
-      FileOutputStream out = new FileOutputStream(tmp);
-      out.write(recordEntry.getBytes());
-      out.close();
-      doc = builder.parse(fileName);
-      Element root = doc.getDocumentElement();
-      if (!"configuration".equals(root.getTagName()))
-        log.fatal("bad conf file: top-level element not <configuration>");
-      NodeList props = root.getChildNodes();
-
-      for (int i = 0; i < props.getLength(); i++) {
-        Node propNode = props.item(i);
-        if (!(propNode instanceof Element))
-          continue;
-        Element prop = (Element) propNode;
-        if (!"property".equals(prop.getTagName()))
-          log.warn("bad conf file: element not <property>");
-        NodeList fields = prop.getChildNodes();
-        String attr = null;
-        String value = null;
-        boolean finalParameter = false;
-        for (int j = 0; j < fields.getLength(); j++) {
-          Node fieldNode = fields.item(j);
-          if (!(fieldNode instanceof Element))
-            continue;
-          Element field = (Element) fieldNode;
-          if ("name".equals(field.getTagName()) && field.hasChildNodes())
-            attr = ((Text) field.getFirstChild()).getData().trim();
-          if ("value".equals(field.getTagName()) && field.hasChildNodes())
-            value = ((Text) field.getFirstChild()).getData();
-          if ("final".equals(field.getTagName()) && field.hasChildNodes())
-            finalParameter = "true".equals(((Text) field.getFirstChild())
-                .getData());
+      String jobID = "";
+        matcher = jobPattern.matcher(capp);
+        if(matcher.matches()) {
+          jobID=matcher.group(2);
         }
-
-        // Ignore this parameter if it has already been marked as 'final'
-        if (attr != null && value != null) {
-          record.add(attr, value);
+    ChukwaRecord record = new ChukwaRecord();
+      DocumentBuilderFactory docBuilderFactory 
+        = DocumentBuilderFactory.newInstance();
+      //ignore all comments inside the xml file
+      docBuilderFactory.setIgnoringComments(true);
+      try {
+          DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+          Document doc = null;
+          String fileName = "test_"+randomNumber.nextInt();
+          File tmp = new File(fileName);
+          FileOutputStream out = new FileOutputStream(tmp);
+          out.write(recordEntry.getBytes());
+          out.close();
+        doc = builder.parse(fileName);
+        Element root = doc.getDocumentElement();
+        if (!"configuration".equals(root.getTagName()))
+            log.fatal("bad conf file: top-level element not <configuration>");
+        NodeList props = root.getChildNodes();
+            JSONObject json = new JSONObject();
+            String queue = "default";
+    
+        for (int i = 0; i < props.getLength(); i++) {
+            Node propNode = props.item(i);
+            if (!(propNode instanceof Element))
+                continue;
+            Element prop = (Element)propNode;
+            if (!"property".equals(prop.getTagName()))
+                log.warn("bad conf file: element not <property>");
+            NodeList fields = prop.getChildNodes();
+            String attr = null;
+            String value = null;
+            boolean finalParameter = false;
+            for (int j = 0; j < fields.getLength(); j++) {
+                Node fieldNode = fields.item(j);
+                if (!(fieldNode instanceof Element))
+                    continue;
+                Element field = (Element)fieldNode;
+                if ("name".equals(field.getTagName()) && field.hasChildNodes())
+                    attr = ((Text)field.getFirstChild()).getData().trim();
+                if ("value".equals(field.getTagName()) && field.hasChildNodes())
+                    value = ((Text)field.getFirstChild()).getData();
+                if ("final".equals(field.getTagName()) && field.hasChildNodes())
+                    finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
+            }
+            
+            // Ignore this parameter if it has already been marked as 'final'
+            if (attr != null && value != null) {
+                json.put(attr, value);
+                if(attr.intern()=="mapred.job.queue.name".intern()) {
+                    queue=value;
+                }
+            }
         }
-      }
-      buildGenericRecord(record, null, time, "JobConf");
+            record.add("JOBCONF-JSON", json.toString());
+            record.add("mapred.job.queue.name", queue);
+            record.add("JOBID", "job_"+jobID);
+        buildGenericRecord(record,null, time,"JobData");
       calendar.setTimeInMillis(time);
       calendar.set(Calendar.MINUTE, 0);
       calendar.set(Calendar.SECOND, 0);
-      calendar.set(Calendar.MILLISECOND, 0);
-      key.setKey("" + calendar.getTimeInMillis() + "/" + hodID + "." + jobID
-          + "/" + time);
-
-      output.collect(key, record);
-      tmp.delete();
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw e;
-    }
+      calendar.set(Calendar.MILLISECOND, 0);      
+      key.setKey(""+ calendar.getTimeInMillis() + "/job_" + jobID + "/" + time);
+                
+            output.collect(key,record);
+            tmp.delete();
+      } catch(Exception e) {
+          e.printStackTrace();  
+          throw e;
+      }
   }
-
+  
   public String getDataType() {
     return Torque.class.getName();
   }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java?rev=761001&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLog.java Wed Apr  1 18:27:07 2009
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class JobLog extends AbstractProcessor {
+
+	@Override
+	protected void parse(String recordEntry,
+			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+			Reporter reporter) throws Throwable 
+	{
+		JobLogLine line = new JobLogLine(recordEntry);
+		key = new ChukwaRecordKey();
+		ChukwaRecord record = new ChukwaRecord();
+		this.buildGenericRecord(record, null, -1l, line.getLogType());
+		
+		for (Entry<String, String> entry : line.entrySet()) {
+			record.add(entry.getKey(), entry.getValue());
+		}
+		
+		for(Entry<String, Long> entry : line.getCounterHash().flat().entrySet()) {
+			record.add(entry.getKey(), entry.getValue().toString());
+		}
+		
+		long timestamp = line.getTimestamp();
+		record.setTime(timestamp);
+		key.setKey(getKey(timestamp, line.getJobId()));
+		output.collect(key, record);
+	}
+	
+	private String getKey(long ts, String jobId) {
+		long unit = 60 * 60 * 1000;
+		long rounded = (ts / unit) * unit;
+		return rounded + "/" + jobId + "/" + ts;
+	}
+
+	public static class JobLogLine extends HashMap<String, String> {
+		private static final long serialVersionUID = 4902948603527677036L;
+		
+		/**
+		 * search timestamp from stream. if no timestamp found, use last seen one.
+		 */
+		private static final String[] timestampKeys = { 
+			JobHistory.Keys.SUBMIT_TIME.toString(),
+			JobHistory.Keys.LAUNCH_TIME.toString(),
+			JobHistory.Keys.START_TIME.toString(),
+			JobHistory.Keys.FINISH_TIME.toString(), 
+		};
+		private static long lastTimestamp = 0l;
+
+		private String logType;
+		private String jobId;
+		private String taskId;
+		private CounterHash counterHash;
+
+		/**
+		 * example lines: 
+		 * 		Task TASKID="task_200903062215_0577_r_000000" TASK_TYPE="REDUCE" START_TIME="1236386538540" SPLITS="" .
+		 *		Job JOBID="job_200903062215_0577" JOB_PRIORITY="NORMAL" .
+		 *		Job JOBID="job_200903062215_0577" LAUNCH_TIME="1236386526545" TOTAL_MAPS="14" TOTAL_REDUCES="1" JOB_STATUS="PREP" .
+		 */
+		public JobLogLine(String line) {
+			line = line.trim();
+			if (line.length() == 0)
+				return;
+
+			String key = null;
+			String[] pairs = line.split("=\"");
+			for (int i = 0; i < pairs.length; i++) {
+				if (i == 0) {
+					String[] fields = pairs[i].split(" ");
+					
+					logType = fields[0];
+					if(logType.equals("Job")) {
+						logType = "JobData";
+					}
+					else if (logType.equals("Task") || logType.equals("MapAttempt") || logType.equals("ReduceAttempt")) {
+						logType = "TaskData";
+					}
+					
+					if (fields.length > 1)
+						key = fields[1];
+					continue;
+				}
+
+				int pos = pairs[i].lastIndexOf('"');
+				String value = pairs[i].substring(0, pos);
+				put(key, value);
+				if(i == (pairs.length-1))
+					break;
+				key = pairs[i].substring(pos + 2);
+			}
+			
+			// jobid format: job_200903062215_0577
+			jobId = get(JobHistory.Keys.JOBID.toString());
+			
+			// taskid format: task_200903062215_0577_r_000000
+			taskId = get(JobHistory.Keys.TASKID.toString());
+			if(taskId != null) {
+				String[] fields = taskId.split("_");
+				jobId = "job_" + fields[1] + "_" + fields[2];
+				put(JobHistory.Keys.JOBID.toString(), jobId);
+				taskId = taskId.substring(5);
+			}
+			
+			counterHash = new CounterHash(get(JobHistory.Keys.COUNTERS.toString()));
+			
+			if(get("TASK_ATTEMPT_ID") != null) {
+				put("TASK_ATTEMPT_TIMES", "" + getAttempts());
+			}
+		}
+
+		public String getLogType() {
+			return logType;
+		}
+
+		public String getJobId() {
+			return jobId;
+		}
+
+		public String getTaskId() {
+			return taskId;
+		}
+		
+		public long getTimestamp() {
+			for(String key : timestampKeys) {
+				String value = get(key);
+				if(value != null && value.length() != 0) {
+					long ts = Long.parseLong(value);
+					if(ts > lastTimestamp) {
+						lastTimestamp = ts;
+					}
+					break;
+				}
+			}
+			return lastTimestamp;
+		}
+		
+		public CounterHash getCounterHash() {
+			return counterHash;
+		}
+		
+		public int getAttempts() {
+			String attemptId = get("TASK_ATTEMPT_ID");
+			if(attemptId == null) {
+				return -1;
+			}
+			else {
+				try {
+					String[] elems = attemptId.split("_");
+					return Integer.parseInt(elems[elems.length - 1] + 1);
+				} catch (NumberFormatException e) {
+					return -1;
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Parse counter string to object
+	 * 
+	 * Example string:
+	 * {(org\.apache\.hadoop\.mapred\.JobInProgress$Counter)(Job Counters )
+		    [(TOTAL_LAUNCHED_REDUCES)(Launched reduce tasks)(1)]
+		    [(TOTAL_LAUNCHED_MAPS)(Launched map tasks)(14)]
+		    [(DATA_LOCAL_MAPS)(Data-local map tasks)(14)]
+		}
+		{(FileSystemCounters)(FileSystemCounters)
+		    [(FILE_BYTES_READ)(FILE_BYTES_READ)(132)]
+		    [(HDFS_BYTES_READ)(HDFS_BYTES_READ)(20471)]
+		    [(FILE_BYTES_WRITTEN)(FILE_BYTES_WRITTEN)(790)]
+		    [(HDFS_BYTES_WRITTEN)(HDFS_BYTES_WRITTEN)(248)]
+		}
+	 */
+	public static class CounterHash extends HashMap<String, HashMap<String, Long>>{
+		public CounterHash(String str) {
+			if(str == null) {
+				return;
+			}
+			
+			if(str.startsWith("{")) {
+				for(String group : split(str, "[{}]")) {
+					HashMap<String, Long> hash = null; 
+					for(String counter : split(group, "[\\[\\]]")) {
+						ArrayList<String> idAndDisplay = split(counter, "[\\(\\)]");
+						if(hash == null) {
+							hash = new HashMap<String, Long>();
+							String groupId = idAndDisplay.get(0).replaceAll("\\\\.", ".");
+							put(groupId, hash);
+						}
+						else {
+							hash.put(idAndDisplay.get(0), Long.parseLong(idAndDisplay.get(2)));
+						}
+					}
+				}
+			} else {
+				HashMap<String, Long> hash = new HashMap<String, Long>();
+				put("Hadoop18", hash);
+				for(String counter : split(str, ",")) {
+					ArrayList<String> kv = split(counter, ":");
+					hash.put(kv.get(0), Long.parseLong(kv.get(1)));
+				}
+			}
+		}
+		
+		/**
+		 * Flat the counter hashs and add into map passed int. 
+		 * 
+		 * For example mentioned in the constructor, the result will be
+		 * <pre>
+		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:TOTAL_LAUNCHED_REDUCES=1
+		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:TOTAL_LAUNCHED_MAPS=14
+		 * Counter:org\.apache\.hadoop\.mapred\.JobInProgress$Counter:DATA_LOCAL_MAPS=14
+		 * Counter:FileSystemCounters:FILE_BYTES_READ=132
+		 * Counter:FileSystemCounters:HDFS_BYTES_READ=20471
+		 * Counter:FileSystemCounters:FILE_BYTES_WRITTEN=790
+		 * Counter:FileSystemCounters:HDFS_BYTES_WRITTEN=248
+		 * </pre>
+		 */
+		public HashMap<String, Long> flat() {
+			HashMap<String, Long> result = new HashMap<String, Long>();
+			for(Entry<String, HashMap<String, Long>> entry : entrySet()) {
+				String id = entry.getKey();
+				for(Entry<String, Long> counterValue : entry.getValue().entrySet()) {
+					result.put("Counter:" + id + ":" + counterValue.getKey(), counterValue.getValue());
+				}
+			}
+			return result;
+		}
+	}
+	
+	public static ArrayList<String> split(String s, String regex) {
+		ArrayList<String> result = new ArrayList<String>();
+		for(String field : s.split(regex)) {
+			if(field != null && field.length()>0) {
+				result.add(field);
+			}
+		}
+		return result;
+	}
+}
\ No newline at end of file



Mime
View raw message