chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r783549 - in /hadoop/chukwa/trunk: ./ bin/ conf/ hadoopjars/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/dataloader/ src/java/org/apache/hadoop/chukwa/extraction/ src/java/org/apache/hadoop/chukwa/extractio...
Date Wed, 10 Jun 2009 22:11:43 GMT
Author: eyang
Date: Wed Jun 10 22:11:42 2009
New Revision: 783549

URL: http://svn.apache.org/viewvc?rev=783549&view=rev
Log:
CHUKWA-278. Improve post process manager and metric data loader to support data loading from
pig aggregation. (Eric Yang)

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
      - copied, changed from r781593, hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/TestDatabaseMetricDataLoader.java
Removed:
    hadoop/chukwa/trunk/conf/chukwa-demux-conf.xml
    hadoop/chukwa/trunk/hadoopjars/hadoop-0.18.0-core.jar
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/database/
    hadoop/chukwa/trunk/src/web/hicc/jsp/hod_job_list.jsp
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/bin/mdl.sh
    hadoop/chukwa/trunk/conf/mdl.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
    hadoop/chukwa/trunk/src/web/hicc/jsp/host_selector_dropdown.jsp

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Jun 10 22:11:42 2009
@@ -24,6 +24,12 @@
 
   IMPROVEMENTS
 
+    CHUKWA-278. Improve post process manager and metric data loader to support data loading
from pig aggregation. (Eric Yang)
+
+    CHUKWA-263. Added ability to configure demux parameters during build time. (Jerome Boulon
via Eric Yang)
+
+    CHUKWA-195. Update demux compilation to use Hadoop 0.20 jar file. (Jerome Boulon via
Eric Yang)
+
     CHUKWA-268. Expose adaptor manager interface, refactor Agent class hierarchy. (asrabkin)
 
     CHUKWA-259. More flexible CollectorStub. (asrabkin)

Modified: hadoop/chukwa/trunk/bin/mdl.sh
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/bin/mdl.sh?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/bin/mdl.sh (original)
+++ hadoop/chukwa/trunk/bin/mdl.sh Wed Jun 10 22:11:42 2009
@@ -18,8 +18,8 @@
 bin=`cd "$bin"; pwd`
 . "$bin"/chukwa-config.sh
 
-if [ $# -lt 2 ]; then
-    echo "Usage: mdl.sh <cluster name> <chukwa sequence file>"
+if [ $# -lt 1 ]; then
+    echo "Usage: mdl.sh <chukwa sequence file>"
     echo ""
     exit 1
 fi

Modified: hadoop/chukwa/trunk/conf/mdl.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/conf/mdl.xml?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/conf/mdl.xml (original)
+++ hadoop/chukwa/trunk/conf/mdl.xml Wed Jun 10 22:11:42 2009
@@ -431,11 +431,6 @@
 </property>
 
 <property>
-  <name>metric.systemmetrics.%memused</name>
-  <value>mem_used_pcnt</value>
-</property>
-
-<property>
   <name>metric.systemmetrics.eth0_busy_pcnt</name>
   <value>eth0_busy_pcnt</value>
 </property>

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java Wed
Jun 10 22:11:42 2009
@@ -22,6 +22,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.io.File;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java?rev=783549&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java
Wed Jun 10 22:11:42 2009
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.dataloader;
+
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+
+public abstract class DataLoaderFactory {
+
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+  protected FileStatus[] source = null;
+
+  private static Log log = LogFactory.getLog(DataLoaderFactory.class);
+
+  public DataLoaderFactory() {
+  }
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] src) throws IOException
{
+    this.source=src;
+    this.conf=conf;
+    this.fs=fs;
+  }
+
+}

Copied: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
(from r781593, hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java)
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java?p2=hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java&p1=hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java&r1=781593&r2=783549&rev=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
Wed Jun 10 22:11:42 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.chukwa.extraction.database;
+package org.apache.hadoop.chukwa.dataloader;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -31,6 +31,7 @@
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +47,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 
-public class MetricDataLoader {
+public class MetricDataLoader implements Callable {
   private static Log log = LogFactory.getLog(MetricDataLoader.class);
 
   private Statement stmt = null;
@@ -60,32 +61,20 @@
   private String newSpace = "-";
   private boolean batchMode = true;
   private Connection conn = null;
+  private Path source = null;
 
   private static ChukwaConfiguration conf = null;
   private static FileSystem fs = null;
   private String jdbc_url = "";
 
-  static {
-    conf = new ChukwaConfiguration();
-    try {
-      fs = FileSystem.get(conf);
-    } catch (Exception e) {
-      e.printStackTrace();
-      log.warn("Exception during HDFS init, Bailing out!", e);
-      System.exit(-1);
-    }
-  }
-
   /** Creates a new instance of DBWriter */
-  public MetricDataLoader() {
-    initEnv("");
+  public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
+    source = new Path(fileName);
+    this.conf = conf;
+    this.fs = fs;
   }
 
-  public MetricDataLoader(String cluster) {
-    initEnv(cluster);
-  }
-
-  private void initEnv(String cluster) {
+  private void initEnv(String cluster) throws Exception {
     mdlConfig = new DatabaseConfig();
     transformer = mdlConfig.startWith("metric.");
     conversion = new HashMap<String, Float>();
@@ -109,27 +98,44 @@
       ClusterConfig cc = new ClusterConfig();
       jdbc_url = cc.getURL(cluster);
     }
+    try {
+      DatabaseWriter dbWriter = new DatabaseWriter(cluster);
+      conn = dbWriter.getConnection();
+    } catch(Exception ex) {
+      throw new Exception("JDBC URL does not exist for:"+jdbc_url);
+    }
+    log.debug("Initialized JDBC URL: " + jdbc_url);
     HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
     Iterator<String> ki = dbNames.keySet().iterator();
     dbSchema = new HashMap<String, HashMap<String, Integer>>();
-    DatabaseWriter dbWriter = new DatabaseWriter(cluster);
     while (ki.hasNext()) {
-      String table = dbNames.get(ki.next().toString());
-      String query = "select * from " + table + "_template limit 1";
+      String recordType = ki.next().toString();
+      String table = dbNames.get(recordType);
       try {
-        ResultSet rs = dbWriter.query(query);
-        ResultSetMetaData rmeta = rs.getMetaData();
+        ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
         HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
-        for (int i = 1; i <= rmeta.getColumnCount(); i++) {
-          tableSchema.put(rmeta.getColumnName(i), rmeta.getColumnType(i));
+        while(rs.next()) {
+          String name = rs.getString("COLUMN_NAME");
+          int type = rs.getInt("DATA_TYPE");
+          tableSchema.put(name, type);
+          StringBuilder metricName = new StringBuilder();
+          metricName.append("metric.");
+          metricName.append(recordType.substring(15));
+          metricName.append(".");
+          metricName.append(name);
+          if(!transformer.containsKey(metricName.toString())) {
+            transformer.put(metricName.toString(), name);
+          }          
         }
+        rs.close();
         dbSchema.put(table, tableSchema);
       } catch (SQLException ex) {
         log.debug("table: " + table
           + " template does not exist, MDL will not load data for this table.");
       }
     }
-    dbWriter.close();
+    stmt = conn.createStatement();
+    conn.setAutoCommit(false);
   }
 
   public void interrupt() {
@@ -161,27 +167,21 @@
       }
     }
     return( sb.toString()); 
-  } 
-
-  public void process(Path source) throws IOException, URISyntaxException,
-      SQLException {
-
+  }
+  
+  public boolean run() {
+    boolean first=true;
     log.info("StreamName: " + source.getName());
-
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, source, conf);
+    SequenceFile.Reader reader = null;
 
     try {
       // The newInstance() call is a work around for some
       // broken Java implementations
-      org.apache.hadoop.chukwa.util.DriverManagerUtil.loadDriver().newInstance();
-      log.debug("Initialized JDBC URL: " + jdbc_url);
+      reader = new SequenceFile.Reader(fs, source, conf);
     } catch (Exception ex) {
       // handle the error
       log.error(ex, ex);
     }
-    conn = org.apache.hadoop.chukwa.util.DriverManagerUtil.getConnection(jdbc_url);
-    stmt = conn.createStatement();
-    conn.setAutoCommit(false);
     long currentTimeMillis = System.currentTimeMillis();
     boolean isSuccessful = true;
     String recordType = null;
@@ -189,8 +189,18 @@
     ChukwaRecordKey key = new ChukwaRecordKey();
     ChukwaRecord record = new ChukwaRecord();
     try {
+      Pattern p = Pattern.compile("(.*)-\\d+$");
       int batch = 0;
       while (reader.next(key, record)) {
+        if(first) { 
+          try {
+            initEnv(RecordUtil.getClusterName(record));
+            first=false;
+          } catch(Exception ex) {
+            log.error("Initialization failed for: "+RecordUtil.getClusterName(record)+".
 Please check jdbc configuration.");
+            return false;
+          }
+        }
         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
         log.debug("Timestamp: " + record.getTime());
         log.debug("DataType: " + key.getReduceType());
@@ -203,10 +213,39 @@
         String node = record.getValue("csource");
         recordType = key.getReduceType().toLowerCase();
         String dbKey = "report.db.name." + recordType;
+        Matcher m = p.matcher(recordType);
         if (dbTables.containsKey(dbKey)) {
           String[] tmp = mdlConfig.findTableName(mdlConfig.get(dbKey), record
               .getTime(), record.getTime());
           table = tmp[0];
+        } else if(m.matches()) {
+          String timePartition = "_week";
+          int timeSize = Integer.parseInt(m.group(2));
+          if(timeSize == 5) {
+            timePartition = "_month";
+          } else if(timeSize == 30) {
+            timePartition = "_quarter";
+          } else if(timeSize == 180) {
+            timePartition = "_year";
+          } else if(timeSize == 720) {
+            timePartition = "_decade";
+          }
+          int partition = (int) (record.getTime() / timeSize);
+          StringBuilder tmpDbKey = new StringBuilder();
+          tmpDbKey.append("report.db.name.");
+          tmpDbKey.append(m.group(1));
+          if(dbTables.containsKey(tmpDbKey.toString())) {
+            StringBuilder tmpTable = new StringBuilder();
+            tmpTable.append(dbTables.get(tmpDbKey.toString()));
+            tmpTable.append("_");
+            tmpTable.append(partition);
+            tmpTable.append("_");
+            tmpTable.append(timePartition);
+            table = tmpTable.toString();
+          } else {
+            log.debug(tmpDbKey.toString() + " does not exist.");
+            continue;            
+          }
         } else {
           log.debug(dbKey + " does not exist.");
           continue;
@@ -311,7 +350,7 @@
           StringBuilder sqlValues = new StringBuilder();
           boolean firstValue = true;
           while (fi.hasNext()) {
-            String fieldKey = (String) fi.next();
+            String fieldKey = fi.next();
             if (transformer.containsKey(fieldKey)) {
               if (!firstValue) {
                 sqlValues.append(", ");
@@ -338,7 +377,7 @@
                   SimpleDateFormat formatter = new SimpleDateFormat(
                       "yyyy-MM-dd HH:mm:ss");
                   Date recordDate = new Date();
-                  recordDate.setTime((long) Long.parseLong(recordSet
+                  recordDate.setTime(Long.parseLong(recordSet
                       .get(fieldKey)));
                   sqlValues.append(transformer.get(fieldKey));
                   sqlValues.append("=\"");
@@ -455,9 +494,8 @@
     } catch (Exception e) {
       isSuccessful = false;
       log.error(ExceptionUtil.getStackTrace(e));
-      e.printStackTrace();
     } finally {
-      if (batchMode) {
+      if (batchMode && conn!=null) {
         try {
           conn.commit();
           log.info("batchMode commit done");
@@ -516,12 +554,20 @@
         reader = null;
       }
     }
+    return true;
   }
 
+  public Boolean call() {
+    return run();  
+  }
+  
+
   public static void main(String[] args) {
     try {
-      MetricDataLoader mdl = new MetricDataLoader(args[0]);
-      mdl.process(new Path(args[1]));
+      conf = new ChukwaConfiguration();
+      fs = FileSystem.get(conf);
+      MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
+      mdl.run();
     } catch (Exception e) {
       e.printStackTrace();
     }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java?rev=783549&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java
Wed Jun 10 22:11:42 2009
@@ -0,0 +1,63 @@
+package org.apache.hadoop.chukwa.dataloader;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+public class MetricDataLoaderPool extends DataLoaderFactory {
+  private static Log log = LogFactory.getLog(MetricDataLoaderPool.class);
+
+  protected MetricDataLoader threads[] = null;
+  private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit";
+  private int size = 1;
+  private static CompletionService completion = null;
+  private static ExecutorService executor = null;
+  
+  public MetricDataLoaderPool() {
+  }
+  
+  public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] fileList) throws
IOException {
+
+    if(executor==null) {
+      try {
+        this.size = Integer.parseInt(conf.get(DATA_LOADER_THREAD_LIMIT));
+      } catch(Exception e) {
+        this.size = 1;
+      }
+      executor = Executors.newFixedThreadPool(size);
+    }
+    if(completion==null) {
+      completion = new ExecutorCompletionService(executor);
+    }
+    try {
+      for(int i=0;i<fileList.length;i++) {
+        String filename = fileList[i].getPath().toUri().toString();
+        log.info("Processing: "+filename);
+        completion.submit(new MetricDataLoader(conf, fs, filename));      
+      }
+      for(int i=0;i<fileList.length;i++) {
+        completion.take().get();
+      }
+    } catch(Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+      throw new IOException();
+    } finally {
+    }
+  }
+
+  public void shutdown() throws InterruptedException {
+    executor.shutdown();
+    executor.awaitTermination(30, TimeUnit.SECONDS);
+    executor.shutdownNow();
+  }
+}
\ No newline at end of file

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/CHUKWA_CONSTANT.java
Wed Jun 10 22:11:42 2009
@@ -52,5 +52,6 @@
   public static final String DEFAULT_ARCHIVES_MR_OUTPUT_DIR_NAME     = "mrOutput/";
   public static final String DEFAULT_ARCHIVES_MR_INPUT_DIR_NAME      = "mrInput/";
   public static final String DEFAULT_ARCHIVES_IN_ERROR_DIR_NAME      = "inError/";
-  
+
+  public static final String POST_DEMUX_DATA_LOADER = "chukwa.post.demux.data.loader";  
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java
Wed Jun 10 22:11:42 2009
@@ -27,9 +27,10 @@
 import java.util.List;
 
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
-import org.apache.hadoop.chukwa.extraction.database.DatabaseLoader;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,7 +56,7 @@
 
   
   public PostProcessorManager() throws Exception {
-    conf = new ChukwaConfiguration();
+    this.conf = new ChukwaConfiguration();
     init();
   }
   
@@ -102,17 +103,6 @@
     }
  
     dataSources = new HashMap<String, String>();
-    String[] datasources = conf.getStrings("postProcessorManager.dbloader.ds");
-    if (datasources == null || datasources.length == 0) {
-      log.warn("Cannot read postProcessorManager.dbloader.ds from configuration, bail out!");
-      DaemonWatcher.bailout(-1);
-    }
-    for(String ds: datasources) {
-      dataSources.put(ds.trim(), "");
-      log.info("Add " + ds + " to PostProcessorManager");
-    }
-
-    
     Path postProcessDirectory = new Path(postProcessDir);
     while (isRunning) {
       
@@ -136,8 +126,6 @@
         
         Collections.sort(directories);
         
-        System.out.println(directories);
-        
         String directoryToBeProcessed = null;
         long start = 0;
         
@@ -174,9 +162,29 @@
   
   public boolean processDemuxOutput(String directory) throws IOException {
     long start = System.currentTimeMillis();
-    DatabaseLoader.loadData(fs,directory, dataSources);
+    try {
+      String[] classes = conf.get(POST_DEMUX_DATA_LOADER).split(",");
+      for(String dataLoaderName : classes) {
+        Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>)
Class.forName(dataLoaderName);
+        java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
+            dl.getConstructor();
+        DataLoaderFactory dataloader = c.newInstance();
+        
+          //DataLoaderFactory dataLoader = (DataLoaderFactory) Class.
+          //    forName(dataLoaderName).getConstructor().newInstance();
+        log.info(dataLoaderName+" processing: "+directory);
+        StringBuilder dirSearch = new StringBuilder();
+        dirSearch.append(directory);
+        dirSearch.append("/*/*/*.evt");
+        Path demuxDir = new Path(dirSearch.toString());
+        FileStatus[] events = fs.globStatus(demuxDir);
+        dataloader.load(conf, fs, events);
+      }
+    } catch(Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+      return false;
+    }
     log.info("loadData Duration:" + (System.currentTimeMillis() - start));
-    
     return true;
   }
   

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/TestDatabaseMetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/TestDatabaseMetricDataLoader.java?rev=783549&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/TestDatabaseMetricDataLoader.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/dataloader/TestDatabaseMetricDataLoader.java
Wed Jun 10 22:11:42 2009
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.dataloader;
+
+import junit.framework.TestCase;
+import java.util.Calendar;
+import org.apache.hadoop.chukwa.database.Macro;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.database.TableCreator;
+import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+public class TestDatabaseMetricDataLoader extends TestCase {
+
+  long[] timeWindow = {7, 30, 91, 365, 3650};
+  String[] tables = {"system_metrics","disk","mr_job","mr_task"}; //,"dfs_namenode","dfs_datanode","dfs_fsnamesystem","dfs_throughput","hadoop_jvm","hadoop_mapred","hdfs_usage"};
+  String cluster = "demo";
+  long current = Calendar.getInstance().getTimeInMillis();
+
+  public void setUp() {
+    System.setProperty("CLUSTER","demo");
+    DatabaseWriter db = new DatabaseWriter(cluster);
+    String buffer = "";
+    File aFile = new File(System.getenv("CHUKWA_CONF_DIR")
+                 + File.separator + "database_create_tables.sql");
+    buffer = readFile(aFile);
+    String tables[] = buffer.split(";");
+    for(String table : tables) {
+      if(table.length()>5) {
+        db.execute(table);
+      }
+    }
+    db.close();
+    for(int i=0;i<timeWindow.length;i++) {
+      TableCreator tc = new TableCreator();
+      long start = current;
+      long end = current + (timeWindow[i]*1440*60*1000);
+      tc.createTables(start, end);
+    }
+  }
+
+  public void tearDown() {
+    DatabaseWriter db = null;
+    try {
+      db = new DatabaseWriter(cluster);
+      ResultSet rs = db.query("show tables");
+      ArrayList<String> list = new ArrayList<String>();
+      while(rs.next()) {
+        String table = rs.getString(1);
+        list.add(table);
+      }
+      for(String table : list) {
+        db.execute("drop table "+table);
+      }
+    } catch(Throwable ex) {
+    } finally {
+      if(db!=null) {
+        db.close();
+      }
+    }
+  }
+
+  public String readFile(File aFile) {
+    StringBuffer contents = new StringBuffer();
+    try {
+      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      try {
+        String line = null; // not declared within while loop
+        while ((line = input.readLine()) != null) {
+          contents.append(line);
+          contents.append(System.getProperty("line.separator"));
+        }
+      } finally {
+        input.close();
+      }
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    }
+    return contents.toString();
+  }
+
+  public void testMetricDataLoader() {
+    String srcDir = System.getenv("CHUKWA_DATA_DIR");
+    try {
+      ChukwaConfiguration conf = new ChukwaConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] sources = fs.listStatus(new Path(srcDir));
+      for (FileStatus sequenceFile : sources) {
+        MetricDataLoader mdl = new MetricDataLoader(conf, fs, sequenceFile.getPath().toUri().toString());
+        mdl.call();
+      }
+    } catch (Throwable ex) {
+      fail("SQL Exception: "+ExceptionUtil.getStackTrace(ex));
+    }
+    DatabaseWriter db = new DatabaseWriter(cluster);
+    for(int i=0;i<tables.length;i++) {
+      String query = "select [avg("+tables[i]+")] from ["+tables[i]+"]";
+      Macro mp = new Macro(current,query);
+      query = mp.toString();
+      try {
+        ResultSet rs = db.query(query);
+        ResultSetMetaData rsmd = rs.getMetaData();
+        int numberOfColumns = rsmd.getColumnCount();
+        while(rs.next()) {
+          for(int j=1;j<=numberOfColumns;j++) {
+            assertTrue("Table: "+tables[i]+", Column: "+rsmd.getColumnName(j)+", contains
no data.",rs.getString(j)!=null);
+          }
+        }
+      } catch(Throwable ex) {
+        fail("MetricDataLoader failed: "+ExceptionUtil.getStackTrace(ex));
+      }
+    }
+    db.close();
+    assertTrue("MetricDataLoader executed successfully.",true);
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/web/hicc/jsp/host_selector_dropdown.jsp
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/web/hicc/jsp/host_selector_dropdown.jsp?rev=783549&r1=783548&r2=783549&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/web/hicc/jsp/host_selector_dropdown.jsp (original)
+++ hadoop/chukwa/trunk/src/web/hicc/jsp/host_selector_dropdown.jsp Wed Jun 10 22:11:42 2009
@@ -83,7 +83,7 @@
                long end = time.getEndTime(); 
                String table = "system_metrics";
                DatabaseConfig dbc = new DatabaseConfig();
-               String[] tables = dbc.findTableName(table, start, end);
+               String[] tables = dbc.findTableNameForCharts(table, start, end);
                table=tables[0];
                query="select DISTINCT host from "+table+" order by host";
            }



Mime
View raw message