hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r728207 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/
Date Sat, 20 Dec 2008 01:12:05 GMT
Author: cdouglas
Date: Fri Dec 19 17:12:05 2008
New Revision: 728207

URL: http://svn.apache.org/viewvc?rev=728207&view=rev
Log:
HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa. Contributed by Eric Yang

Removed:
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/mysql_create_tables
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/mysql_upgrade_tables
Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/bin/dbAdmin.sh
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/aggregator.sql
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/database_create_tables
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Dec 19 17:12:05 2008
@@ -466,6 +466,9 @@
     HADOOP-4849. Documentation for Service Level Authorization implemented in
     HADOOP-4348. (acmurthy)
 
+    HADOOP-4827. Replace Consolidator with Aggregator macros in Chukwa (Eric
+    Yang via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/bin/dbAdmin.sh
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/bin/dbAdmin.sh?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/bin/dbAdmin.sh (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/bin/dbAdmin.sh Fri Dec 19 17:12:05
2008
@@ -37,18 +37,17 @@
     cat ${CHUKWA_CONF_DIR}/jdbc.conf | \
     while read LINE; do
         CLUSTER=`echo ${LINE} | cut -f 1 -d'='`
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 7 
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 3650
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Aggregator

-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Consolidator

-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 7
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 30
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 91
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 365
-        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 3650
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.TableCreator
${EXP_DATE} 3650 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.Aggregator
&
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 7 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 30 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 91 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 365 &
+        ${JAVA_HOME}/bin/java -DCLUSTER=${CLUSTER} ${JVM_OPTS} org.apache.hadoop.chukwa.database.DataExpiration
${EXP_DATE} 3650 &
     done
     end=`date +%s`
     duration=$(( $end - $start ))

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/aggregator.sql
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/aggregator.sql?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/aggregator.sql (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/aggregator.sql Fri Dec 19 17:12:05
2008
@@ -1,12 +1,88 @@
-insert into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] from [system_metrics]
where timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from [dfs_datanode] where
timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [cluster_disk] (select a.timestamp,a.mount,a.used,a.available,a.used_percent
from (select from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as timestamp,mount,avg(used)
as used,avg(available) as available,avg(used_percent) as used_percent from [disk] where timestamp
between '[past_hour]' and '[now]' group by timestamp,mount) as a group by a.timestamp, a.mount);
-insert into [hod_job_digest] (select timestamp,d.hodid,d.userid,[avg(system_metrics)] from
(select a.HodID,b.host as machine,a.userid,a.starttime,a.endtime from [HodJob] a join [hod_machine]
b on (a.HodID = b.HodID) where endtime between '[past_hour]' and '[now]') as d,[system_metrics]
where timestamp between d.starttime and d.endtime and host=d.machine group by hodid,timestamp);
-insert into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from [hadoop_rpc] where
timestamp between '[past_hour]' and '[now]' group by timestamp);
-#insert into [cluster_hadoop_mapred] (select timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job]
where timestamp between '[past_hour]' and '[now]' group by timestamp);
-insert into [user_util] (select timestamp, j.UserID as user, sum(j.NumOfMachines) as node_total,
sum(cpu_idle_pcnt*j.NumOfMachines) as cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines)
as cpu_used, avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_unused, sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_used, avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as disk_used_pcnt,
sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, sum(eth0_busy_pcnt*j.NumOfMachines)
as network_used, avg(eth0_busy_pcnt) as network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines)
as memory_unused, sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) and Timestamp
between '[past_hour]' and '[now]' group by j.UserID);
 #insert into [node_util] select starttime, avg(unused) as unused, avg(used) as used from
(select DATE_FORMAT(m.LAUNCH_TIME,'%Y-%m-%d %H:%i:%s') as starttime,sum(AvgCPUBusy*j.NumOfMachines/(60*100))
as unused,sum((100-AvgCPUBusy)*j.NumOfMachines/(60*100)) as used from HodJobDigest d join
HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where m.LAUNCH_TIME >=
'2008-09-12 21:11' and m.LAUNCH_TIME <= '2008-09-12 22:11' and d.Timestamp >= m.LAUNCH_TIME
and d.Timestamp <= m.FINISH_TIME group by m.MRJobID order by m.LAUNCH_TIME) as t group
by t.starttime 
 #insert into [jobtype_util] select CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName
like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other'
END as m, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID)) as jobs from HodJobDigest
d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp
>= '2008-09-12 21:11' and d.Timestamp <= '2008-09-12 22:11' and d.Timestamp >= m.LAUNCH_TIME
and d.Timestamp <= m.FINISH_TIME group by CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig'
WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus'
ELSE 'Other' END order by CASE WHEN MRJobName like 'PigLatin%' THEN 'Pig' WHEN MRJobName like
'streamjob%' THEN 'Streaming' WHEN MRJobName like '%abacus%' THEN 'Abacus' ELSE 'Other' END
-#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * j.NumOfMachines) / (sum(j.NumOfMachines)
* 1)) as used from Digest d join HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_hour]'
and d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
-#insert into [b] select m, sum(foo.nodehours) as nodehours from (select m.MRJobID, round(avg(if(AvgCPUBusy
is null,0,AvgCPUBusy)),0) as m, count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest
d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp
>= '[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME and
d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
-#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN MRJobName like
'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like
'%abacus%' THEN 'Abacus' ELSE 'Other' END as interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID))
as jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID
= j.HodID) where d.Timestamp >= '[past_hour]' and d.Timestamp <= '[now]' and d.Timestamp
>= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by AvgCPUBusy,CASE WHEN MRJobName
like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName
like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [a] select d.Timestamp as starttime,((AvgCPUBusy * j.NumOfMachines) / (sum(j.NumOfMachines)
* 1)) as used from Digest d join HodJob j on (d.HodID = j.HodID) where d.Timestamp >= '[past_10_minutes]'
and d.Timestamp <= '[now]' group by d.Timestamp order by d.Timestamp 
+#insert into [b] select m, sum(foo.nodehours) as nodehours from (select m.MRJobID, round(avg(if(AvgCPUBusy
is null,0,AvgCPUBusy)),0) as m, count(*)*j.NumOfMachines/60 as nodehours from HodJobDigest
d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID = j.HodID) where d.Timestamp
>= '[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp >= m.LAUNCH_TIME
and d.Timestamp <= m.FINISH_TIME group by m.MRJobID) as foo group by m; 
+#insert into [c] select if(AvgCPUBusy is null,0,AvgCPUBusy) as m, CASE WHEN MRJobName like
'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName like
'%abacus%' THEN 'Abacus' ELSE 'Other' END as interface, count(*)*j.NumOfMachines/60 as nodehours,count(distinct(MRJobID))
as jobs from HodJobDigest d join HodJob j on (d.HodID = j.HodID) join MRJob m on (m.HodID
= j.HodID) where d.Timestamp >= '[past_10_minutes]' and d.Timestamp <= '[now]' and d.Timestamp
>= m.LAUNCH_TIME and d.Timestamp <= m.FINISH_TIME group by AvgCPUBusy,CASE WHEN MRJobName
like 'PigLatin%' THEN 'Pig' WHEN MRJobName like 'streamjob%' THEN 'Streaming' WHEN MRJobName
like '%abacus%' THEN 'Abacus' ELSE 'Other' END order by if(AvgCPUBusy is null,0,AvgCPUBusy)
+#insert into [cluster_hadoop_mapred] (select timestamp,[avg(hadoop_mapred_job)] from [hadoop_mapred_job]
where timestamp between '[past_10_minutes]' and '[now]' group by timestamp);
+replace into [cluster_system_metrics] (select timestamp,[avg(system_metrics)] from [system_metrics]
where timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [dfs_throughput] (select timestamp,[avg(dfs_datanode)] from [dfs_datanode] where
timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [cluster_disk] (select a.timestamp,a.mount,a.used,a.available,a.used_percent
from (select from_unixtime(unix_timestamp(timestamp)-unix_timestamp(timestamp)%60)as timestamp,mount,avg(used)
as used,avg(available) as available,avg(used_percent) as used_percent from [disk] where timestamp
between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp,mount) as a group by
a.timestamp, a.mount);
+replace delayed into [hod_job_digest] (select timestamp,d.hodid,d.userid,[avg(system_metrics)]
from (select a.HodID,b.host as machine,a.userid,a.starttime,a.endtime from [HodJob] a join
[hod_machine] b on (a.HodID = b.HodID) where endtime between '[past_10_minutes]' and '[past_5_minutes]')
as d,[system_metrics] where timestamp between d.starttime and d.endtime and host=d.machine
group by hodid,timestamp);
+replace into [cluster_hadoop_rpc] (select timestamp,[avg(hadoop_rpc)] from [hadoop_rpc] where
timestamp between '[past_10_minutes]' and '[past_5_minutes]' group by timestamp);
+replace into [user_util] (select timestamp, j.UserID as user, sum(j.NumOfMachines) as node_total,
sum(cpu_idle_pcnt*j.NumOfMachines) as cpu_unused, sum((cpu_user_pcnt+cpu_system_pcnt)*j.NumOfMachines)
as cpu_used, avg(cpu_user_pcnt+cpu_system_pcnt) as cpu_used_pcnt, sum((100-(sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_unused, sum(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)*j.NumOfMachines)
as disk_used, avg(((sda_busy_pcnt+sdb_busy_pcnt+sdc_busy_pcnt+sdd_busy_pcnt)/4)) as disk_used_pcnt,
sum((100-eth0_busy_pcnt)*j.NumOfMachines) as network_unused, sum(eth0_busy_pcnt*j.NumOfMachines)
as network_used, avg(eth0_busy_pcnt) as network_used_pcnt, sum((100-mem_used_pcnt)*j.NumOfMachines)
as memory_unused, sum(mem_used_pcnt*j.NumOfMachines) as memory_used, avg(mem_used_pcnt) as
memory_used_pcnt from [hod_job_digest] d,[HodJob] j where (d.HodID = j.HodID) and Timestamp
between '[past_10_minutes]' and '[past_5_minutes]' group
  by j.UserID);
+#
+# Down sample metrics for charts
+replace into [system_metrics_month] (select timestamp,[group_avg(system_metrics)] from [system_metrics_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [system_metrics_quarter] (select timestamp,[group_avg(system_metrics)] from
[system_metrics_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [system_metrics_year] (select timestamp,[group_avg(system_metrics)] from [system_metrics_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [system_metrics_decade] (select timestamp,[group_avg(system_metrics)] from [system_metrics_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_namenode_month] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_namenode_quarter] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_namenode_year] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_namenode_decade] (select timestamp,[group_avg(dfs_namenode)] from [dfs_namenode_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [dfs_datanode_month] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_datanode_quarter] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_datanode_year] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_datanode_decade] (select timestamp,[group_avg(dfs_datanode)] from [dfs_datanode_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_rpc_month] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_rpc_quarter] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_rpc_year] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_rpc_decade] (select timestamp,[group_avg(hadoop_rpc)] from [hadoop_rpc_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [cluster_hadoop_rpc_month] (select timestamp,[avg(cluster_hadoop_rpc)] from
[cluster_hadoop_rpc_week] where timestamp between '[past_15_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_hadoop_rpc_quarter] (select timestamp,[avg(cluster_hadoop_rpc)] from
[cluster_hadoop_rpc_month] where timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_hadoop_rpc_year] (select timestamp,[avg(cluster_hadoop_rpc)] from [cluster_hadoop_rpc_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_hadoop_rpc_decade] (select timestamp,[avg(cluster_hadoop_rpc)] from
[cluster_hadoop_rpc_year] where timestamp between '[past_2160_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hadoop_mapred_month] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [hadoop_mapred_quarter] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [hadoop_mapred_year] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [hadoop_mapred_decade] (select timestamp,[group_avg(hadoop_mapred)] from [hadoop_mapred_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [hadoop_jvm_month] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,process_name);
+replace into [hadoop_jvm_quarter] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,process_name);
+replace into [hadoop_jvm_year] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,process_name);
+replace into [hadoop_jvm_decade] (select timestamp,[group_avg(hadoop_jvm)] from [hadoop_jvm_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,process_name);
+#
+replace into [dfs_throughput_month] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [dfs_throughput_quarter] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [dfs_throughput_year] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [dfs_throughput_decade] (select timestamp,[avg(dfs_throughput)] from [dfs_throughput_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [node_activity_month] (select timestamp,[avg(node_activity)] from [node_activity_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [node_activity_quarter] (select timestamp,[avg(node_activity)] from [node_activity_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [node_activity_year] (select timestamp,[avg(node_activity)] from [node_activity_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [node_activity_decade] (select timestamp,[avg(node_activity)] from [node_activity_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [dfs_fsnamesystem_month] (select timestamp,[group_avg(dfs_fsnamesystem)] from
[dfs_fsnamesystem_week] where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host);
+replace into [dfs_fsnamesystem_quarter] (select timestamp,[group_avg(dfs_fsnamesystem)] from
[dfs_fsnamesystem_month] where timestamp between '[past_90_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host);
+replace into [dfs_fsnamesystem_year] (select timestamp,[group_avg(dfs_fsnamesystem)] from
[dfs_fsnamesystem_quarter] where timestamp between '[past_540_minutes]' and '[now]' group
by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host);
+replace into [dfs_fsnamesystem_decade] (select timestamp,[group_avg(dfs_fsnamesystem)] from
[dfs_fsnamesystem_year] where timestamp between '[past_2160_minutes]' and '[now]' group by
FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host);
+#
+replace into [disk_month] (select timestamp,[group_avg(disk)] from [disk_week] where timestamp
between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),host,mount);
+replace into [disk_quarter] (select timestamp,[group_avg(disk)] from [disk_month] where timestamp
between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),host,mount);
+replace into [disk_year] (select timestamp,[group_avg(disk)] from [disk_quarter] where timestamp
between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),host,mount);
+replace into [disk_decade] (select timestamp,[group_avg(disk)] from [disk_year] where timestamp
between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),host,mount);
+#
+replace into [cluster_disk_month] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),mount);
+replace into [cluster_disk_quarter] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),mount);
+replace into [cluster_disk_year] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),mount);
+replace into [cluster_disk_decade] (select timestamp,[group_avg(cluster_disk)] from [cluster_disk_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),mount);
+#
+replace into [cluster_system_metrics_month] (select timestamp,[avg(cluster_system_metrics)]
from [cluster_system_metrics_week] where timestamp between '[past_15_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300));
+replace into [cluster_system_metrics_quarter] (select timestamp,[avg(cluster_system_metrics)]
from [cluster_system_metrics_month] where timestamp between '[past_90_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800));
+replace into [cluster_system_metrics_year] (select timestamp,[avg(cluster_system_metrics)]
from [cluster_system_metrics_quarter] where timestamp between '[past_540_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800));
+replace into [cluster_system_metrics_decade] (select timestamp,[avg(cluster_system_metrics)]
from [cluster_system_metrics_year] where timestamp between '[past_2160_minutes]' and '[now]'
group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200));
+#
+replace into [hod_job_digest_month] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),HodID);
+replace into [hod_job_digest_quarter] (select timestamp,[group_avg(hod_job_digest)] from
[hod_job_digest_month] where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),HodID);
+replace into [hod_job_digest_year] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),HodID);
+replace into [hod_job_digest_decade] (select timestamp,[group_avg(hod_job_digest)] from [hod_job_digest_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),HodID);
+#
+replace into [user_util_month] (select timestamp,[group_avg(user_util)] from [user_util_week]
where timestamp between '[past_15_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/300),user);
+replace into [user_util_quarter] (select timestamp,[group_avg(user_util)] from [user_util_month]
where timestamp between '[past_90_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/1800),user);
+replace into [user_util_year] (select timestamp,[group_avg(user_util)] from [user_util_quarter]
where timestamp between '[past_540_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/10800),user);
+replace into [user_util_decade] (select timestamp,[group_avg(user_util)] from [user_util_year]
where timestamp between '[past_2160_minutes]' and '[now]' group by FLOOR(UNIX_TIMESTAMP(Timestamp)/43200),user);

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/database_create_tables
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/database_create_tables?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/database_create_tables (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/conf/database_create_tables Fri Dec
19 17:12:05 2008
@@ -160,7 +160,7 @@
     sdc_busy_pcnt float,
     sdd_busy_pcnt float,
     swap_used_pcnt float,
-    primary key(host, timestamp),
+    primary key(timestamp),
     index (timestamp)
 );
 
@@ -571,7 +571,7 @@
     sdd_busy_pcnt float,
     swap_used_pcnt float,
     primary key(HodId, timestamp),
-    index(timeStamp)
+    index(timestamp)
 ); 
 
 create table if not exists user_util_template (
@@ -589,7 +589,9 @@
     network_used_pcnt float,
     memory_unused double,
     memory_used double,
-    memory_used_pcnt float
+    memory_used_pcnt float,
+    primary key(user, timestamp),
+    index(timestamp)
 );
 
 create table if not exists QueueInfo(
@@ -598,5 +600,5 @@
     Queue VARCHAR(20),
     NumOfMachine smallint unsigned,
     status varchar(1),
-    index(TimeStamp)
+    index(Timestamp)
 );

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
(original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
Fri Dec 19 17:12:05 2008
@@ -25,30 +25,37 @@
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
-
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.sql.DatabaseMetaData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.PidFile;
 
 public class Aggregator {
 	private static DatabaseConfig dbc = null;
 
-	private static Log log = LogFactory.getLog(Consolidator.class);
+	private static Log log = LogFactory.getLog(Aggregator.class);
+	private String table = null;
+	private String jdbc = null;
+	private int[] intervals;
 	private long current = 0;
-    private static PidFile loader=null;
-
-	public Aggregator() {
-
+    private static DatabaseWriter db = null;
+    public Aggregator() {
 		dbc = new DatabaseConfig();
 		Calendar now = Calendar.getInstance();
 		current = now.getTimeInMillis();
 	}
 
-	public HashMap<String,String> findMacros(String query) {
+	public HashMap<String,String> findMacros(String query) throws SQLException {
 		boolean add=false;
 		HashMap<String,String> macroList = new HashMap<String,String>();
 		String macro="";
@@ -71,54 +78,131 @@
 	    return macroList;
 	}
 
-	public String computeMacro(String macro) {
-		if(macro.indexOf("avg(")==0) {
+	public String computeMacro(String macro) throws SQLException {
+		Pattern p = Pattern.compile("past_(.*)_minutes");
+		Matcher matcher = p.matcher(macro);
+		if(macro.indexOf("avg(")==0 || macro.indexOf("group_avg(")==0) {
 			String meta="";
-			String[] table = dbc.findTableName(macro.substring(4,macro.indexOf(")")), current, current);
+			String[] table = dbc.findTableName(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")),
current, current);
 			try {
 				String cluster = System.getProperty("CLUSTER");
 				if(cluster==null) {
 					cluster="unknown";
 				}
-				DatabaseWriter db = new DatabaseWriter(cluster);
-
-			    String query = "select * from "+table[0]+" order by timestamp desc limit 1";
-	            log.debug("Query: "+query);
-	            ResultSet rs = db.query(query);
-	            if(rs==null) {
-	          	    throw new SQLException("Table is undefined.");
+                DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
+	            ResultSet rs = dbMetaData.getColumns ( null,null,table[0], null);
+	            boolean first=true;
+	            while(rs.next()) {
+	            	if(!first) {
+	            		meta = meta+",";
+	            	}
+	            	String name = rs.getString(4);
+	            	int type = rs.getInt(5);
+	            	if(type==java.sql.Types.VARCHAR) {
+	            		if(macro.indexOf("group_avg(")<0) {
+	            			meta=meta+"count("+name+") as "+name;
+	            		} else {
+	            			meta=meta+name;
+	            		}
+		            	first=false;
+	            	} else if(type==java.sql.Types.DOUBLE ||
+	            			  type==java.sql.Types.FLOAT ||
+	            			  type==java.sql.Types.INTEGER) {
+	            		meta=meta+"avg("+name+")";
+		            	first=false;
+	            	} else if(type==java.sql.Types.TIMESTAMP) {
+	            		// Skip the column
+	            	} else {
+	            		meta=meta+"AVG("+name+")";
+		            	first=false;
+	            	}
 	            }
-	            ResultSetMetaData rmeta = rs.getMetaData();
-	            if(rs.next()) {
-	            	boolean first=true;
-	                for(int i=1;i<=rmeta.getColumnCount();i++) {
-	                	if(!first) {
-	                		meta=meta+",";
-	                	}
-		                if(rmeta.getColumnType(i)==java.sql.Types.VARCHAR) {
-		                	meta=meta+"count("+rmeta.getColumnName(i)+") as "+rmeta.getColumnName(i);
-		                	first=false;
-		                } else if(rmeta.getColumnType(i)==java.sql.Types.DOUBLE || 
-		                		  rmeta.getColumnType(i)==java.sql.Types.INTEGER || 
-		                		  rmeta.getColumnType(i)==java.sql.Types.FLOAT) {
-		                	meta=meta+"avg("+rmeta.getColumnName(i)+")";
-		                	first=false;
-		                } else if(rmeta.getColumnType(i)==java.sql.Types.TIMESTAMP) {
-		                	// Skip the column
-		                } else {
-		                	meta=meta+"avg("+rmeta.getColumnName(i)+")";
-		                	first=false;		                	
-		                }
-		            }
+	            if(first) {
+	          	    throw new SQLException("Table is undefined.");
 	            }
 			} catch(SQLException ex) {
-				log.error(ex);
+				throw new SQLException("Table does not exist:"+ table[0]);
 			}
 			return meta;
 		} else if(macro.indexOf("now")==0) {
+			SimpleDateFormat sdf = new SimpleDateFormat();
 			return DatabaseWriter.formatTimeStamp(current);
+		} else if(matcher.find()) {
+			int period = Integer.parseInt(matcher.group(1));
+			long timestamp = current - (current % (period*60*1000L)) - (period*60*1000L);
+			return DatabaseWriter.formatTimeStamp(timestamp);
 		} else if(macro.indexOf("past_hour")==0) {
 			return DatabaseWriter.formatTimeStamp(current-3600*1000L);
+		} else if(macro.endsWith("_week")) {
+			long partition = current / DatabaseConfig.WEEK;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_week");
+			return tableName.toString();
+		} else if(macro.endsWith("_month")) {
+			long partition = current / DatabaseConfig.MONTH;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_month");
+			return tableName.toString();
+		} else if(macro.endsWith("_quarter")) {
+			long partition = current / DatabaseConfig.QUARTER;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_quarter");
+			return tableName.toString();
+		} else if(macro.endsWith("_year")) {
+			long partition = current / DatabaseConfig.YEAR;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_year");
+			return tableName.toString();
+		} else if(macro.endsWith("_decade")) {
+			long partition = current / DatabaseConfig.DECADE;
+			if(partition<=0) {
+				partition=1;
+			}
+			String[] buffers = macro.split("_");
+			StringBuffer tableName = new StringBuffer();
+			for(int i=0;i<buffers.length-1;i++) {
+				tableName.append(buffers[i]);
+				tableName.append("_");
+			}
+			tableName.append(partition);
+			tableName.append("_decade");
+			return tableName.toString();
 		}
 		String[] tableList = dbc.findTableName(macro,current,current);
 		return tableList[0];
@@ -143,62 +227,51 @@
         return contents.toString();
     }
 
-	public void process(String table, String query) {
+	public void process(String query) {
 		ResultSet rs = null;
+		String[] columns;
+		int[] columnsType;
+        String groupBy = "";
 	    long start = current;
 	    long end = current;
         
-		String cluster = System.getProperty("CLUSTER");
-		if(cluster==null) {
-			cluster="unknown";
+
+		try {
+            HashMap<String, String> macroList = findMacros(query);
+            Iterator<String> macroKeys = macroList.keySet().iterator();
+            while(macroKeys.hasNext()) {
+        	    String mkey = macroKeys.next();
+        	    log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
+	    	    query = query.replace("["+mkey+"]", macroList.get(mkey));
+            }
+            db.execute(query);
+		} catch(SQLException e) {
+		    log.error(query);
+			log.error(e.getMessage());
 		}
-	    DatabaseWriter db = new DatabaseWriter(cluster);
-			    // Find the last aggregated value from table
-			    String[] tmpList = dbc.findTableName(table,start,end);
-			    String timeTest = "select timestamp from "+tmpList[0]+" order by timestamp desc limit
1";
-			    try {
-					rs = db.query(timeTest);
-				    while(rs.next()) {
-				    	start=rs.getTimestamp(1).getTime();
-				    	end=start;
-				    }
-			    } catch (SQLException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-			    // Transform table names
-                HashMap<String, String> macroList = findMacros(query);
-                Iterator<String> macroKeys = macroList.keySet().iterator();
-                while(macroKeys.hasNext()) {
-                	String mkey = macroKeys.next();
-                	log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
-			    	query = query.replace("["+mkey+"]", macroList.get(mkey));
-                }
-				log.info(query);
-                db.execute(query);
-            db.close();
 	}
 
     public static void main(String[] args) {
-        loader=new PidFile(System.getProperty("CLUSTER")+"Aggregator");
-    	dbc = new DatabaseConfig();    	
+        log.info("Aggregator started.");
+    	dbc = new DatabaseConfig();
+		String cluster = System.getProperty("CLUSTER");
+		if(cluster==null) {
+			cluster="unknown";
+		}
+    	db = new DatabaseWriter(cluster);
     	String queries = Aggregator.getContents(new File(System.getenv("CHUKWA_CONF_DIR")+File.separator+"aggregator.sql"));
     	String[] query = queries.split("\n");
     	for(int i=0;i<query.length;i++) {
-    		    int startOffset = query[i].indexOf("[")+1;
-    		    int endOffset = query[i].indexOf("]");
     		    if(query[i].equals("")) {
-    		    } else if(startOffset==-1 || endOffset==-1) {
-    		    	log.error("Unable to extract table name from query:"+query[i]);
     		    } else if(query[i].indexOf("#")==0) {
     		    	log.debug("skipping: "+query[i]);
     		    } else {
-    		    	String table = query[i].substring(startOffset, endOffset);
     		    	Aggregator dba = new Aggregator();
-    		    	dba.process(table, query[i]);
+    		    	dba.process(query[i]);
     		    }
         }
-        loader.clean();
+        db.close();
+    	log.info("Aggregator finished.");
     }
 
 }

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=728207&r1=728206&r2=728207&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
(original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
Fri Dec 19 17:12:05 2008
@@ -27,6 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 
 public class DatabaseWriter {
     private static Log log = LogFactory.getLog(DatabaseWriter.class);
@@ -35,26 +36,25 @@
     private ResultSet rs = null;
 
     public DatabaseWriter(String host, String user, String password) {
-    	String jdbc_url = System.getenv("JDBC_URL_PREFIX")+host+"/";
-    	
-		if(user!=null) {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+host+"/";
+        if(user!=null) {
             jdbc_url = jdbc_url + "?user=" + user;
             if(password!=null) {
                 jdbc_url = jdbc_url + "&password=" + password;
             }
-		}
+        }
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-            String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
@@ -66,20 +66,43 @@
         try {
             // The newInstance() call is a work around for some
             // broken Java implementations
-        	String jdbcDriver = System.getenv("JDBC_DRIVER");
-            Class.forName(jdbcDriver).newInstance();
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
         } catch (Exception ex) {
             // handle the error
             log.error(ex,ex);
         }
         try {
             conn = DriverManager.getConnection(jdbc_url);
-            log.info("Initialized JDBC URL: "+jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
         } catch (SQLException ex) {
             log.error(ex,ex);
         }
     }
     
+    public DatabaseWriter() {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
+        if(mdlConfig.get("jdbc.user")!=null) {
+            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+            if(mdlConfig.get("jdbc.password")!=null) {
+                jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+            }
+        }
+        try {
+            // The newInstance() call is a work around for some
+            // broken Java implementations
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        } catch (Exception ex) {
+            // handle the error
+            log.error(ex,ex);
+        }
+        try {
+            conn = DriverManager.getConnection(jdbc_url);
+            log.debug("Initialized JDBC URL: "+jdbc_url);
+        } catch (SQLException ex) {
+            log.error(ex,ex);
+        }
+    }
     public void execute(String query) {
         try {
             stmt = conn.createStatement(); 
@@ -102,6 +125,9 @@
             }
         }
     }
+    public Connection getConnection() {
+    	return conn;
+    }
     public ResultSet query(String query) throws SQLException {
         try {
             stmt = conn.createStatement(); 



Mime
View raw message