chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r783565 - in /hadoop/chukwa/trunk: CHANGES.txt bin/UserDailySummary.sh conf/aggregator.sql conf/database_create_tables.sql conf/mdl.xml script/pig/UserDailySummary.pig
Date Wed, 10 Jun 2009 23:08:27 GMT
Author: eyang
Date: Wed Jun 10 23:08:26 2009
New Revision: 783565

URL: http://svn.apache.org/viewvc?rev=783565&view=rev
Log:
CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang)

Added:
    hadoop/chukwa/trunk/bin/UserDailySummary.sh
    hadoop/chukwa/trunk/script/pig/UserDailySummary.pig
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/conf/aggregator.sql
    hadoop/chukwa/trunk/conf/database_create_tables.sql
    hadoop/chukwa/trunk/conf/mdl.xml

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=783565&r1=783564&r2=783565&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Jun 10 23:08:26 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang)
+
     CHUKWA-95. Added Web Service API to export data from database. (Terence Kwan via Eric
Yang)
 
     CHUKWA-281. Created pig scripts to perform down sampling. (Cheng Zhang via Eric Yang)

Added: hadoop/chukwa/trunk/bin/UserDailySummary.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/UserDailySummary.sh?rev=783565&view=auto
==============================================================================
--- hadoop/chukwa/trunk/bin/UserDailySummary.sh (added)
+++ hadoop/chukwa/trunk/bin/UserDailySummary.sh Wed Jun 10 23:08:26 2009
@@ -0,0 +1,73 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+pid=$$
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+while getopts ":c:d:j:m:n:t:" OPTION
+do
+        case $OPTION in
+        c)
+                clusters=$OPTARG
+                ;;
+        d)
+                day=$OPTARG
+                ;;
+        j)
+                jobfile=$OPTARG
+                ;;
+        t)
+                taskfile=$OPTARG
+                ;;
+        esac
+done;
+
+recType=UserDailySummary
+
+if [ "$day" = "" ]; then
+        day=`date -d yesterday +%Y%m%d`
+fi
+
+if [ "$clusters" = "" ]; then
+        clusters=`${HADOOP_HOME}/bin/hadoop --config ${HADOOP_CONF_DIR} dfs -ls /chukwa/repos
| grep "/chukwa/repos" | cut -f 4 -d "/"`
+fi
+
+for cluster in $clusters
+do
+
+        if [ "$jobfile" = "" ]; then
+                jobfile=/chukwa/repos/${cluster}/JobData/${day}/*/*/*.evt
+        fi
+
+        if [ "$taskfile" = "" ]; then
+                taskfile=/chukwa/repos/${cluster}/TaskData/${day}/*/*/*.evt
+        fi
+
+        pig=${CHUKWA_HOME}/script/pig/${recType}.pig
+        uniqdir="/chukwa/pig/${cluster}_${recType}_${day}_`date +%s`"
+        output="$uniqdir/${day}.D"
+
+        "$JAVA_HOME"/bin/java -cp ${CHUKWA_CORE}:${HADOOP_JAR}:${HADOOP_CONF_DIR}:${CHUKWA_HOME}/lib/pig.jar
org.apache.pig.Main -param chukwaCore=${CHUKWA_CORE} -param chukwaPig=${CHUKWA_HOME}/chukwa-pig.jar
-param output=${output} -param jobfile=$jobfile -param taskfile=$taskfile -param TIMESTAMP=`date
-d ${day} +%s` ${pig}
+
+        cmd="${JAVA_HOME}/bin/java -DAPP=UserDailySummary -Dlog4j.configuration=chukwa-log4j.properties
-DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR}
-classpath ${CHUKWA_HOME}/chukwa-pig.jar:${HADOOP_CONF_DIR}:${CLASSPATH}:${CHUKWA_CORE}:${COMMON}:${HADOOP_JAR}:${CHUKWA_CONF_DIR}
org.apache.hadoop.chukwa.tools.PigMover ${cluster} ${recType} ${uniqdir}  ${output} /chukwa/postProcess/"
+        echo $cmd
+        $cmd
+
+done

Modified: hadoop/chukwa/trunk/conf/aggregator.sql
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/aggregator.sql?rev=783565&r1=783564&r2=783565&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/aggregator.sql (original)
+++ hadoop/chukwa/trunk/conf/aggregator.sql Wed Jun 10 23:08:26 2009
@@ -123,3 +123,8 @@
 replace into [mr_job_conf_quarter] (select * from [mr_job_conf_week] where ts between '[past_15_minutes]'
and '[now]');
 replace into [mr_job_conf_year] (select * from [mr_job_conf_week] where ts between '[past_15_minutes]'
and '[now]');
 replace into [mr_job_conf_decade] (select * from [mr_job_conf_week] where ts between '[past_15_minutes]'
and '[now]');
+
+replace into [user_job_summary_month] (select FLOOR(UNIX_TIMESTAMP(timestamp)/300),userid,
sum(totalJobs), sum(dataLocalMaps), sum(rackLoaclMaps), sum(removeMaps), sum(mapInputBytes),
sum(reduceOutputRecords), sum(mapSlotHours), sum(reduceSlotHours), sum(totalMaps), sum(totalReduces)
from [user_job_summary_week] where ts between '[past_15_minutes]' and '[now]') group by FLOOR(UNIX_TIMESTAMP(timestamp)/300),userid);
+replace into [user_job_summary_quarter] (select FLOOR(UNIX_TIMESTAMP(timestamp)/1800),userid,
sum(totalJobs), sum(dataLocalMaps), sum(rackLoaclMaps), sum(removeMaps), sum(mapInputBytes),
sum(reduceOutputRecords), sum(mapSlotHours), sum(reduceSlotHours), sum(totalMaps), sum(totalReduces)
from [user_job_summary_week] where ts between '[past_90_minutes]' and '[now]') group by FLOOR(UNIX_TIMESTAMP(timestamp)/1800),userid);
+replace into [user_job_summary_year] (select FLOOR(UNIX_TIMESTAMP(timestamp)/10800),userid,
sum(totalJobs), sum(dataLocalMaps), sum(rackLoaclMaps), sum(removeMaps), sum(mapInputBytes),
sum(reduceOutputRecords), sum(mapSlotHours), sum(reduceSlotHours), sum(totalMaps), sum(totalReduces)
from [user_job_summary_week] where ts between '[past_540_minutes]' and '[now]') group by FLOOR(UNIX_TIMESTAMP(timestamp)/10800),userid);
+replace into [user_job_summary_decade] (select FLOOR(UNIX_TIMESTAMP(timestamp)/43200),userid,
sum(totalJobs), sum(dataLocalMaps), sum(rackLoaclMaps), sum(removeMaps), sum(mapInputBytes),
sum(reduceOutputRecords), sum(mapSlotHours), sum(reduceSlotHours), sum(totalMaps), sum(totalReduces)
from [user_job_summary_week] where ts between '[past_2160_minutes]' and '[now]') group by
FLOOR(UNIX_TIMESTAMP(timestamp)/43200),userid);

Modified: hadoop/chukwa/trunk/conf/database_create_tables.sql
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/database_create_tables.sql?rev=783565&r1=783564&r2=783565&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/database_create_tables.sql (original)
+++ hadoop/chukwa/trunk/conf/database_create_tables.sql Wed Jun 10 23:08:26 2009
@@ -23,6 +23,7 @@
 drop table if exists hdfs_usage_template;
 drop table if exists QueueInfo;
 drop table if exists mapreduce_fsm_template;
+drop table if exists user_job_summary_template;
 
 create table if not exists node_activity_template (
     timestamp  timestamp default CURRENT_TIMESTAMP,
@@ -733,3 +734,19 @@
     primary key(unique_id),
     index(start_time, finish_time, job_id)
 ) ENGINE=InnoDB;
+
+create table if not exists user_job_summary_template (
+    timestamp timestamp default CURRENT_TIMESTAMP,
+    userid varchar(32),
+    totalJobs double null, 
+    dataLocalMaps double null, 
+    rackLocalMaps double null, 
+    remoteMaps double null, 
+    mapInputBytes double null, 
+    reduceOutputRecords double null, 
+    mapSlotHours double null, 
+    reduceSlotHours double null, 
+    totalMaps double null, 
+    totalReduces double null,
+    primary key(userid, timestamp)
+) ENGINE=InnoDB;

Modified: hadoop/chukwa/trunk/conf/mdl.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/mdl.xml?rev=783565&r1=783564&r2=783565&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/mdl.xml (original)
+++ hadoop/chukwa/trunk/conf/mdl.xml Wed Jun 10 23:08:26 2009
@@ -1987,5 +1987,9 @@
 <property><name>metric.mapreduce_fsm.counter_input_groups</name><value>input_groups</value></property>
 <property><name>metric.mapreduce_fsm.counter_spilled_records</name><value>spilled_records</value></property>

 
+<!-- for user daily aggregation -->
+<property><name>report.db.name.userdailysummary</name><value>user_job_summary</value></property>
+<property><name>consolidator.table.user_job_summary</name><value>5,30,180,720</value></property>
+
 </configuration>
 

Added: hadoop/chukwa/trunk/script/pig/UserDailySummary.pig
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/script/pig/UserDailySummary.pig?rev=783565&view=auto
==============================================================================
--- hadoop/chukwa/trunk/script/pig/UserDailySummary.pig (added)
+++ hadoop/chukwa/trunk/script/pig/UserDailySummary.pig Wed Jun 10 23:08:26 2009
@@ -0,0 +1,70 @@
+register $chukwaCore
+register $chukwaPig
+
+define seqWriter org.apache.hadoop.chukwa.ChukwaStorage('c_timestamp','userid', 'totalJobs',
'dataLocalMaps', 'rackLocalMaps', 'remoteMaps', 'mapInputBytes', 'reduceOutputRecords', 'mapSlotHours',
'reduceSlotHours', 'totalMaps', 'totalReduces','c_recordtype','c_source','c_application');
+
+/*****************************************************************/
+/*********************** Task data begin *************************/
+taskInputData = load '$taskfile' using  org.apache.hadoop.chukwa.ChukwaStorage() as (ts:
long,fields);
+
+-- convert map to row-column
+taskTable = FOREACH taskInputData GENERATE fields#'JOBID' as jobId, fields#'TASKID' as taskId,
fields#'START_TIME' as startTime, fields#'FINISH_TIME' as finishTime, fields#'TASK_TYPE' as
taskType;
+
+-- group by taskId. get startTime and finishTime
+taskGroup = group taskTable  by (jobId, taskId, taskType);
+TaskTime = foreach taskGroup generate group, (MAX(taskTable.finishTime) - MAX(taskTable.startTime))
as slotHours;
+
+-- taskTypeGroup
+--taskTypeGroup =
+
+-- group by jobId
+taskJobGroup = group TaskTime by ($0.jobId, $0.taskType);
+taskJobTime = foreach taskJobGroup generate group.jobId as jobId, group.taskType as taskType,
SUM(TaskTime.slotHours) as slotHours;
+
+-- seperate map and reduce slotHours
+mapTaskTime = filter taskJobTime by taskType eq 'MAP';
+reduceTaskTime = filter taskJobTime by taskType eq 'REDUCE';
+/*********************** Task data end *************************/
+/*****************************************************************/
+
+
+/*****************************************************************/
+/*********************** Job data begin *************************/
+jobInputData = load '$jobfile' using  org.apache.hadoop.chukwa.ChukwaStorage() as (ts: long,fields);
+
+-- convert map to row-column
+jobTable = FOREACH jobInputData GENERATE fields#'USER' as user, fields#'JOBID' as jobId,
fields#'SUBMIT_TIME' as submitTime, fields#'FINISH_TIME' as finishTime, fields#'JOB_STATUS'
as jobStatus, fields#'Counter:org.apache.hadoop.mapreduce.JobCounter:DATA_LOCAL_MAPS' as dataLocalMaps,
fields#'Counter:org.apache.hadoop.mapreduce.JobCounter:RACK_LOCAL_MAPS' as rackLocalMaps,
fields#'Counter:org.apache.hadoop.mapreduce.JobCounter:REMOTE_MAPS' as remoteMaps, fields#'TOTAL_MAPS'
as totalMaps, fields#'TOTAL_REDUCES' as totalReduces, fields#'Counter:org.apache.hadoop.mapred.Task\$Counter:MAP_INPUT_BYTES'
as mapInputBytes, fields#'Counter:org.apache.hadoop.mapred.Task\$Counter:REDUCE_OUTPUT_RECORDS'
as reduceOutputRecords;
+
+-- load data from a text file.
+--jobTable = load '$jobfile' using PigStorage(',') as (user, jobId, submitTime, finishTime,
jobStatus, dataLocalMaps, rackLocalMaps, remoteMaps, totalMaps, totalReduces, mapInputBytes,
reduceOutputRecords);
+
+-- find job and user
+UserRecords = filter jobTable by user is not null;
+JobUserGroup = group UserRecords by (jobId, user);
+JobUser = foreach JobUserGroup generate group.jobId as jobId, group.user as user;
+
+-- group by jobId. get submitTime and finishTime
+jobGroup = group jobTable by jobId;
+JobTime = foreach jobGroup generate group as jobId, MAX(jobTable.submitTime) as submitTime,
MAX(jobTable.finishTime) as finishTime, MAX(jobTable.dataLocalMaps) as dataLocalMaps, MAX(jobTable.rackLocalMaps)
as rackLocalMaps, MAX(jobTable.remoteMaps) as remoteMaps, MAX(jobTable.totalMaps) as totalMaps,
MAX(jobTable.totalReduces) as totalReduces, MAX(jobTable.mapInputBytes) as mapInputBytes,
MAX(jobTable.reduceOutputRecords) as reduceOutputRecords;
+
+-- job status
+finishedJobs = filter jobTable by jobStatus eq 'SUCCESS' or jobStatus eq 'KILLED' or jobStatus
eq 'FAILED';
+jobStatusRecords = foreach finishedJobs generate jobId, jobStatus;
+distinctJobStatus = distinct jobStatusRecords;
+/*********************** Job data end *************************/
+/*****************************************************************/
+
+
+-- Join job and task
+JoinedJobTask = join JobUser by jobId, JobTime by jobId, distinctJobStatus by jobId, mapTaskTime
by jobId, reduceTaskTime by jobId;
+
+-- JoinedJobTask = join JobUser by $0.jobId, JobTime by jobId, distinctJobStatus by jobId,
mapReduceTaskTime by jobId;
+userJobRecords = foreach JoinedJobTask generate JobUser::user as user, JobTime::jobId as
jobId, JobTime::submitTime as submitTime, JobTime::finishTime as finishTime, JobTime::dataLocalMaps
as dataLocalMaps, JobTime::rackLocalMaps as rackLocalMaps, JobTime::remoteMaps as remoteMaps,
JobTime::totalMaps as totalMaps, JobTime::totalReduces as totalReduces, JobTime::mapInputBytes
as mapInputBytes, JobTime::reduceOutputRecords as reduceOutputRecords, distinctJobStatus::jobStatus
as jobStatus, mapTaskTime::slotHours as mapSlotHours, reduceTaskTime::slotHours as reduceSlotHours;
+
+-- group by user
+userGroup = group userJobRecords by user;
+userSummary = foreach userGroup generate $TIMESTAMP as ts, group as user, COUNT(userJobRecords.jobId)
as totalJobs, SUM(userJobRecords.dataLocalMaps) as dataLocalMaps, SUM(userJobRecords.rackLocalMaps)
as rackLocalMaps, SUM(userJobRecords.remoteMaps) as remoteMaps, SUM(userJobRecords.mapInputBytes)
as mapInputBytes, SUM(userJobRecords.reduceOutputRecords) as reduceOutputRecords, SUM(userJobRecords.mapSlotHours)
as mapSlotHours, SUM(userJobRecords.reduceSlotHours) as reduceSlotHours, SUM(userJobRecords.totalMaps)
as totalMaps, SUM(userJobRecords.totalReduces) as totalReduces;
+
+describe userSummary;
+-- dump userSummary;
+store userSummary into '$output' using seqWriter;



Mime
View raw message