chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r759839 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/database/ src/test/org/apache/hadoop/chukwa/database/
Date Mon, 30 Mar 2009 04:47:05 GMT
Author: eyang
Date: Mon Mar 30 04:47:05 2009
New Revision: 759839

URL: http://svn.apache.org/viewvc?rev=759839&view=rev
Log:
CHUKWA-13. Added Macro support for chukwa charting.


Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Macro.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestMacro.java
Modified:
    hadoop/chukwa/trunk/build.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Aggregator.java

Modified: hadoop/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=759839&r1=759838&r2=759839&view=diff
==============================================================================
--- hadoop/chukwa/trunk/build.xml (original)
+++ hadoop/chukwa/trunk/build.xml Mon Mar 30 04:47:05 2009
@@ -381,6 +381,7 @@
             dir="${test.build.dir}/classes/" timeout="${test.timeout}"
             errorProperty="tests.failed" failureProperty="tests.failed">
            <classpath refid="testClasspath"/>
+           <env key="DATACONFIG" value="${build.dir}/conf"/>
            <sysproperty key="test.src.dir" value="${test.src.dir}"/>
            <formatter type="${test.junit.output.format}" />
            <batchtest todir="${test.build.dir}" unless="testcase">

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Aggregator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Aggregator.java?rev=759839&r1=759838&r2=759839&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Aggregator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Aggregator.java Mon Mar
30 04:47:05 2009
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.chukwa.database;
 
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -42,7 +41,6 @@
 import org.apache.hadoop.chukwa.util.PidFile;
 
 public class Aggregator {
-  private static DatabaseConfig dbc = null;
 
   private static Log log = LogFactory.getLog(Aggregator.class);
   private String table = null;
@@ -52,174 +50,10 @@
   private static DatabaseWriter db = null;
 
   public Aggregator() {
-    dbc = new DatabaseConfig();
     Calendar now = Calendar.getInstance();
     current = now.getTimeInMillis();
   }
 
-  public HashMap<String, String> findMacros(String query) throws SQLException {
-    boolean add = false;
-    HashMap<String, String> macroList = new HashMap<String, String>();
-    String macro = "";
-    for (int i = 0; i < query.length(); i++) {
-      if (query.charAt(i) == ']') {
-        add = false;
-        if (!macroList.containsKey(macro)) {
-          String subString = computeMacro(macro);
-          macroList.put(macro, subString);
-        }
-        macro = "";
-      }
-      if (add) {
-        macro = macro + query.charAt(i);
-      }
-      if (query.charAt(i) == '[') {
-        add = true;
-      }
-    }
-    return macroList;
-  }
-
-  public String computeMacro(String macro) throws SQLException {
-    Pattern p = Pattern.compile("past_(.*)_minutes");
-    Matcher matcher = p.matcher(macro);
-    if (macro.indexOf("avg(") == 0 || macro.indexOf("group_avg(") == 0
-        || macro.indexOf("sum(") == 0) {
-      String meta = "";
-      String[] table = dbc.findTableName(macro.substring(
-          macro.indexOf("(") + 1, macro.indexOf(")")), current, current);
-      try {
-        String cluster = System.getProperty("CLUSTER");
-        if (cluster == null) {
-          cluster = "unknown";
-        }
-        DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
-        ResultSet rs = dbMetaData.getColumns(null, null, table[0], null);
-        boolean first = true;
-        while (rs.next()) {
-          if (!first) {
-            meta = meta + ",";
-          }
-          String name = rs.getString(4);
-          int type = rs.getInt(5);
-          if (type == java.sql.Types.VARCHAR) {
-            if (macro.indexOf("group_avg(") < 0) {
-              meta = meta + "count(" + name + ") as " + name;
-            } else {
-              meta = meta + name;
-            }
-            first = false;
-          } else if (type == java.sql.Types.DOUBLE
-              || type == java.sql.Types.FLOAT || type == java.sql.Types.INTEGER) {
-            if (macro.indexOf("sum(") == 0) {
-              meta = meta + "sum(" + name + ")";
-            } else {
-              meta = meta + "avg(" + name + ")";
-            }
-            first = false;
-          } else if (type == java.sql.Types.TIMESTAMP) {
-            // Skip the column
-          } else {
-            if (macro.indexOf("sum(") == 0) {
-              meta = meta + "SUM(" + name + ")";
-            } else {
-              meta = meta + "AVG(" + name + ")";
-            }
-            first = false;
-          }
-        }
-        if (first) {
-          throw new SQLException("Table is undefined.");
-        }
-      } catch (SQLException ex) {
-        throw new SQLException("Table does not exist:" + table[0]);
-      }
-      return meta;
-    } else if (macro.indexOf("now") == 0) {
-      SimpleDateFormat sdf = new SimpleDateFormat();
-      return DatabaseWriter.formatTimeStamp(current);
-    } else if (matcher.find()) {
-      int period = Integer.parseInt(matcher.group(1));
-      long timestamp = current - (current % (period * 60 * 1000L))
-          - (period * 60 * 1000L);
-      return DatabaseWriter.formatTimeStamp(timestamp);
-    } else if (macro.indexOf("past_hour") == 0) {
-      return DatabaseWriter.formatTimeStamp(current - 3600 * 1000L);
-    } else if (macro.endsWith("_week")) {
-      long partition = current / DatabaseConfig.WEEK;
-      if (partition <= 0) {
-        partition = 1;
-      }
-      String[] buffers = macro.split("_");
-      StringBuffer tableName = new StringBuffer();
-      for (int i = 0; i < buffers.length - 1; i++) {
-        tableName.append(buffers[i]);
-        tableName.append("_");
-      }
-      tableName.append(partition);
-      tableName.append("_week");
-      return tableName.toString();
-    } else if (macro.endsWith("_month")) {
-      long partition = current / DatabaseConfig.MONTH;
-      if (partition <= 0) {
-        partition = 1;
-      }
-      String[] buffers = macro.split("_");
-      StringBuffer tableName = new StringBuffer();
-      for (int i = 0; i < buffers.length - 1; i++) {
-        tableName.append(buffers[i]);
-        tableName.append("_");
-      }
-      tableName.append(partition);
-      tableName.append("_month");
-      return tableName.toString();
-    } else if (macro.endsWith("_quarter")) {
-      long partition = current / DatabaseConfig.QUARTER;
-      if (partition <= 0) {
-        partition = 1;
-      }
-      String[] buffers = macro.split("_");
-      StringBuffer tableName = new StringBuffer();
-      for (int i = 0; i < buffers.length - 1; i++) {
-        tableName.append(buffers[i]);
-        tableName.append("_");
-      }
-      tableName.append(partition);
-      tableName.append("_quarter");
-      return tableName.toString();
-    } else if (macro.endsWith("_year")) {
-      long partition = current / DatabaseConfig.YEAR;
-      if (partition <= 0) {
-        partition = 1;
-      }
-      String[] buffers = macro.split("_");
-      StringBuffer tableName = new StringBuffer();
-      for (int i = 0; i < buffers.length - 1; i++) {
-        tableName.append(buffers[i]);
-        tableName.append("_");
-      }
-      tableName.append(partition);
-      tableName.append("_year");
-      return tableName.toString();
-    } else if (macro.endsWith("_decade")) {
-      long partition = current / DatabaseConfig.DECADE;
-      if (partition <= 0) {
-        partition = 1;
-      }
-      String[] buffers = macro.split("_");
-      StringBuffer tableName = new StringBuffer();
-      for (int i = 0; i < buffers.length - 1; i++) {
-        tableName.append(buffers[i]);
-        tableName.append("_");
-      }
-      tableName.append(partition);
-      tableName.append("_decade");
-      return tableName.toString();
-    }
-    String[] tableList = dbc.findTableName(macro, current, current);
-    return tableList[0];
-  }
-
   public static String getContents(File aFile) {
     StringBuffer contents = new StringBuffer();
     try {
@@ -248,23 +82,19 @@
     long end = current;
 
     try {
-      HashMap<String, String> macroList = findMacros(query);
-      Iterator<String> macroKeys = macroList.keySet().iterator();
-      while (macroKeys.hasNext()) {
-        String mkey = macroKeys.next();
-        log.debug("replacing:" + mkey + " with " + macroList.get(mkey));
-        query = query.replace("[" + mkey + "]", macroList.get(mkey));
-      }
+      Macro macroProcessor = new Macro(current, current, query);
+      query = macroProcessor.toString();
       db.execute(query);
-    } catch (SQLException e) {
+    } catch (Exception e) {
       log.error(query);
       log.error(e.getMessage());
     }
   }
 
   public static void main(String[] args) {
+    long longest = 0;
+    String longQuery = null;
     log.info("Aggregator started.");
-    dbc = new DatabaseConfig();
     String cluster = System.getProperty("CLUSTER");
     if (cluster == null) {
       cluster = "unknown";
@@ -280,10 +110,19 @@
         log.debug("skipping: " + query[i]);
       } else {
         Aggregator dba = new Aggregator();
+        long start = Calendar.getInstance().getTimeInMillis();
         dba.process(query[i]);
+        long end = Calendar.getInstance().getTimeInMillis();
+        long duration = end - start;
+        if (duration >= longest) {
+          longest = duration;
+          longQuery = query[i];
+        }
       }
     }
     db.close();
+    log.info("Longest running query: " + longQuery + " (" + (double) longest
+        / 1000 + " seconds)");
     log.info("Aggregator finished.");
   }
 

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Macro.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Macro.java?rev=759839&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Macro.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Macro.java Mon Mar 30 04:47:05
2009
@@ -0,0 +1,288 @@
+package org.apache.hadoop.chukwa.database;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+public class Macro {
+    private static Log log = LogFactory.getLog(Macro.class);
+    private boolean forCharting = false;
+    private long current = 0;
+    private long start = 0;
+    private long end = 0;
+    private static DatabaseConfig dbc = new DatabaseConfig();
+    private static DatabaseWriter db = null;
+    private String query = null;
+    private HttpServletRequest request = null;
+
+    public Macro(long timestamp, String query) {
+        this.current = timestamp;
+        this.start = timestamp;
+        this.end = timestamp;
+        this.query = query;
+    }
+
+    public Macro(long startTime, long endTime, String query) {
+        this.current = endTime;
+        this.start = startTime;
+        this.end = endTime;
+        forCharting = true;	
+        this.query = query;
+    }
+    
+    public Macro(long startTime, long endTime, String query, HttpServletRequest request)
{
+        this.request = request;
+        this.current = endTime;
+        this.start = startTime;
+        this.end = endTime;
+        forCharting = true; 
+        this.query = query;        
+    }
+    public HashMap<String,String> findMacros(String query) throws SQLException {
+        boolean add=false;
+        HashMap<String,String> macroList = new HashMap<String,String>();
+        String macro="";
+        for(int i=0;i<query.length();i++) {
+            if(query.charAt(i)==']') {
+                add=false;
+                if(!macroList.containsKey(macro)) {
+                    String subString = computeMacro(macro);
+                    macroList.put(macro,subString);	    			
+                }
+                macro="";
+            }
+            if(add) {
+                macro=macro+query.charAt(i);
+            }
+            if(query.charAt(i)=='[') {
+                add=true;
+            }
+        }
+        return macroList;
+    }
+
+    public String computeMacro(String macro) throws SQLException {
+        Pattern p = Pattern.compile("past_(.*)_minutes");
+        Matcher matcher = p.matcher(macro);
+        if(macro.indexOf("avg(")==0 || macro.indexOf("group_avg(")==0 || macro.indexOf("sum(")==0)
{
+            String meta="";
+            String[] table = null;
+            if(forCharting) {
+                dbc.findTableNameForCharts(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")),
start, end);
+            } else {
+                dbc.findTableName(macro.substring(macro.indexOf("(")+1,macro.indexOf(")")),
start, end);
+            }
+            try {
+                String cluster = System.getProperty("CLUSTER");
+                if(cluster==null) {
+                    cluster="unknown";
+                }
+                DatabaseMetaData dbMetaData = db.getConnection().getMetaData();
+                ResultSet rs = dbMetaData.getColumns ( null,null,table[0], null);
+                boolean first=true;
+                while(rs.next()) {
+                    if(!first) {
+                        meta = meta+",";
+                    }
+                    String name = rs.getString(4);
+                    int type = rs.getInt(5);
+                    if(type==java.sql.Types.VARCHAR) {
+                        if(macro.indexOf("group_avg(")<0) {
+                            meta=meta+"count("+name+") as "+name;
+                        } else {
+                            meta=meta+name;
+                        }
+                        first=false;
+                    } else if(type==java.sql.Types.DOUBLE ||
+                            type==java.sql.Types.FLOAT ||
+                            type==java.sql.Types.INTEGER) {
+                        if(macro.indexOf("sum(")==0) {
+                            meta=meta+"sum("+name+")";	            			
+                        } else {
+                            meta=meta+"avg("+name+")";
+                        }
+                        first=false;
+                    } else if(type==java.sql.Types.TIMESTAMP) {
+                        // Skip the column
+                    } else {
+                        if(macro.indexOf("sum(")==0) {
+                            meta=meta+"SUM("+name+")";
+                        } else {
+                            meta=meta+"AVG("+name+")";	            			
+                        }
+                        first=false;
+                    }
+                }
+                if(first) {
+                    throw new SQLException("Table is undefined.");
+                }
+            } catch(SQLException ex) {
+                throw new SQLException("Table does not exist:"+ table[0]);
+            }
+            return meta;
+        } else if(macro.indexOf("now")==0) {
+            SimpleDateFormat sdf = new SimpleDateFormat();
+            return DatabaseWriter.formatTimeStamp(current);
+        } else if(macro.intern()=="start".intern()) {
+            return DatabaseWriter.formatTimeStamp(start);
+        } else if(macro.intern()=="end".intern()) {
+            return DatabaseWriter.formatTimeStamp(end);
+        } else if(matcher.find()) {
+            int period = Integer.parseInt(matcher.group(1));
+            long timestamp = current - (current % (period*60*1000L)) - (period*60*1000L);
+            return DatabaseWriter.formatTimeStamp(timestamp);
+        } else if(macro.indexOf("past_hour")==0) {
+            return DatabaseWriter.formatTimeStamp(current-3600*1000L);
+        } else if(macro.endsWith("_week")) {
+            long partition = current / DatabaseConfig.WEEK;
+            if(partition<=0) {
+                partition=1;
+            }
+            String[] buffers = macro.split("_");
+            StringBuffer tableName = new StringBuffer();
+            for(int i=0;i<buffers.length-1;i++) {
+                tableName.append(buffers[i]);
+                tableName.append("_");
+            }
+            tableName.append(partition);
+            tableName.append("_week");
+            return tableName.toString();
+        } else if(macro.endsWith("_month")) {
+            long partition = current / DatabaseConfig.MONTH;
+            if(partition<=0) {
+                partition=1;
+            }
+            String[] buffers = macro.split("_");
+            StringBuffer tableName = new StringBuffer();
+            for(int i=0;i<buffers.length-1;i++) {
+                tableName.append(buffers[i]);
+                tableName.append("_");
+            }
+            tableName.append(partition);
+            tableName.append("_month");
+            return tableName.toString();
+        } else if(macro.endsWith("_quarter")) {
+            long partition = current / DatabaseConfig.QUARTER;
+            if(partition<=0) {
+                partition=1;
+            }
+            String[] buffers = macro.split("_");
+            StringBuffer tableName = new StringBuffer();
+            for(int i=0;i<buffers.length-1;i++) {
+                tableName.append(buffers[i]);
+                tableName.append("_");
+            }
+            tableName.append(partition);
+            tableName.append("_quarter");
+            return tableName.toString();
+        } else if(macro.endsWith("_year")) {
+            long partition = current / DatabaseConfig.YEAR;
+            if(partition<=0) {
+                partition=1;
+            }
+            String[] buffers = macro.split("_");
+            StringBuffer tableName = new StringBuffer();
+            for(int i=0;i<buffers.length-1;i++) {
+                tableName.append(buffers[i]);
+                tableName.append("_");
+            }
+            tableName.append(partition);
+            tableName.append("_year");
+            return tableName.toString();
+        } else if(macro.endsWith("_decade")) {
+            long partition = current / DatabaseConfig.DECADE;
+            if(partition<=0) {
+                partition=1;
+            }
+            String[] buffers = macro.split("_");
+            StringBuffer tableName = new StringBuffer();
+            for(int i=0;i<buffers.length-1;i++) {
+                tableName.append(buffers[i]);
+                tableName.append("_");
+            }
+            tableName.append(partition);
+            tableName.append("_decade");
+            return tableName.toString();
+        }
+        if(forCharting) {
+            if(macro.startsWith("session(") && request!=null){
+                String keyword = macro.substring(macro.indexOf("(")+1,macro.indexOf(")"));
+                String[] objects = null;
+                if(request.getSession().getAttribute(keyword)!=null) {
+                    objects = ((String)request.getSession().getAttribute(keyword)).split(",");
+                }
+                StringBuffer buf = new StringBuffer();
+                boolean first = true;
+                if(objects!=null) {
+                    for(String object : objects) {
+                        if(!first) {
+                            buf.append(" or ");
+                        }
+                        first = false;
+                        buf.append(macro.substring(macro.indexOf("(")+1,macro.indexOf(")"))+"='"+object+"'");
+                    }
+                    return buf.toString();
+                }
+                return "";
+            } else {
+                String[] tableList = dbc.findTableNameForCharts(macro, start, end);
+                StringBuffer buf = new StringBuffer();
+                boolean first = true;
+                for(String table : tableList) {
+                    if(!first) {
+                        buf.append("|");
+                    }
+                    first = false;
+                    buf.append(table);
+                }
+                return buf.toString();
+            }
+        }
+        String[] tableList = dbc.findTableName(macro,current,current);
+        return tableList[0];
+    }
+    public String toString() {
+        try {
+        HashMap<String, String> macroList = findMacros(query);
+        Iterator<String> macroKeys = macroList.keySet().iterator();
+        while(macroKeys.hasNext()) {
+            String mkey = macroKeys.next();
+            if(macroList.get(mkey).contains("|")) {
+                StringBuffer buf = new StringBuffer();
+                String[] tableList = macroList.get(mkey).split("|");
+                boolean first = true;
+                for(String table : tableList) {
+                    String newQuery = query.replace("["+mkey+"]", table);
+                    if(!first) {
+                        buf.append(" union ");
+                    }
+                    buf.append("(");
+                    buf.append(newQuery);
+                    buf.append(")");
+                    first = false;
+                }
+                query = buf.toString();
+            } else {
+                log.debug("replacing:"+mkey+" with "+macroList.get(mkey));
+                query = query.replace("["+mkey+"]", macroList.get(mkey));
+            }
+        }
+        } catch(SQLException ex) {
+            log.error(query);
+            log.error(ex.getMessage());
+        }
+        return query;
+    }
+
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestMacro.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestMacro.java?rev=759839&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestMacro.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestMacro.java Mon Mar
30 04:47:05 2009
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.database;
+
+import junit.framework.TestCase;
+import java.util.TreeMap;
+import java.util.ArrayList;
+
+public class TestMacro extends TestCase {
+
+  public void testPastXIntervals() {
+    Macro m = new Macro(1234567890000L, "select '[past_5_minutes]';");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select '2009-02-13 15:25:00';".intern());
+    m = new Macro(1234567890000L, "select '[past_hour]';");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select '2009-02-13 14:31:30';".intern());
+    m = new Macro(1234567890000L, "select '[start]';");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select '2009-02-13 15:31:30';".intern());
+  }
+
+  public void testPartitions() {
+    Macro m = new Macro(1234567890000L, "select from [system_metrics_week];");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select from system_metrics_2041_week;".intern());
+    m = new Macro(1234567890000L, "select from [system_metrics_month];");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select from system_metrics_476_month;".intern());
+    m = new Macro(1234567890000L, "select from [system_metrics_quarter];");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select from system_metrics_156_quarter;".intern());
+    m = new Macro(1234567890000L, "select from [system_metrics_year];");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select from system_metrics_39_year;".intern());
+    m = new Macro(1234567890000L, "select from [system_metrics_decade];");
+    System.out.println(m.toString());
+    assertTrue(m.toString().intern()=="select from system_metrics_3_decade;".intern());
+  }
+
+}



Mime
View raw message