chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r783885 [1/2] - in /hadoop/chukwa/trunk: CHANGES.txt build.xml src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java test/samples/Iostat.log
Date Thu, 11 Jun 2009 20:03:29 GMT
Author: eyang
Date: Thu Jun 11 20:03:28 2009
New Revision: 783885

URL: http://svn.apache.org/viewvc?rev=783885&view=rev
Log:
CHUKWA-280. Added end to end test to detect iostat overflow. (Eric Yang)

Added:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
    hadoop/chukwa/trunk/test/samples/Iostat.log
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/build.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=783885&r1=783884&r2=783885&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Jun 11 20:03:28 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-280. Added end to end test to detect iostat overflow. (Eric Yang)
+
     CHUKWA-194.  Backfilling tools.  (Jerome Boulon via asrabkin)
 
     CHUKWA-253. Added aggregations by user. (Cheng Zhang via Eric Yang)
@@ -28,6 +30,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-280. Added end to end test to detect iostat overflow. (Eric Yang)
+
     CHUKWA-278. Improve post process manager and metric data loader to support data loading
from pig aggregation. (Eric Yang)
 
     CHUKWA-263. Added ability to configure demux parameters during build time. (Jerome Boulon
via Eric Yang)

Modified: hadoop/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/build.xml?rev=783885&r1=783884&r2=783885&view=diff
==============================================================================
--- hadoop/chukwa/trunk/build.xml (original)
+++ hadoop/chukwa/trunk/build.xml Thu Jun 11 20:03:28 2009
@@ -464,10 +464,23 @@
 	<target name="test-chukwa" depends="compile,compile-test" description="Run Chukwa unit
tests">
 		<mkdir dir="${basedir}/var" />
 		<mkdir dir="${test.build.dir}/var" />
+		<mkdir dir="${test.build.dir}/var/log" />
+                <copy todir="${test.build.dir}/var/log">
+                        <fileset dir="${basedir}/test/samples/">
+                                <include name="*.log" />
+                        </fileset>
+                </copy>
+                <copy todir="${test.build.dir}">
+                        <fileset dir="${build.dir}">
+                                <include name="chukwa-core*.jar" />
+                        </fileset>
+                </copy>
+                <copy file="${basedir}/conf/chukwa-demux-conf.xml.template" tofile="${test.build.dir}/conf/chukwa-demux-conf.xml"></copy>
 		<junit showoutput="${test.output}" fork="yes" printsummary="withOutAndErr" forkmode="${test.junit.fork.mode}"
maxmemory="${test.junit.maxmemory}" dir="${test.build.dir}/classes/" timeout="${test.timeout}"
errorProperty="tests.failed" failureProperty="tests.failed">
 			<classpath refid="testClasspath" />
 			<env key="CHUKWA_CONF_DIR" value="${test.build.dir}/conf" />
 			<env key="CHUKWA_DATA_DIR" value="${test.build.dir}/var" />
+			<env key="CHUKWA_HOME" value="${test.build.dir}" />
 			<sysproperty key="test.src.dir" value="${test.src.dir}" />
 			<sysproperty key="test.build.classes" value="${test.build.classes}" />
 			<formatter type="${test.junit.output.format}" />

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=783885&r1=783884&r2=783885&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Thu Jun 11 20:03:28 2009
@@ -209,7 +209,7 @@
     if (checkpointDir != null && !checkpointDir.exists()) {
       checkpointDir.mkdirs();
     }
-    
+    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
     DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
 
     log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java?rev=783885&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
Thu Jun 11 20:03:28 2009
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.database;
+
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.database.TableCreator;
+
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender;
+import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
+import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
+import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat;
+import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordPartitioner;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+import org.apache.hadoop.chukwa.database.Macro;
+
+import junit.framework.TestCase;
+
+public class TestDatabaseIostat extends TestCase {
+  private static Log log = LogFactory.getLog(TestDatabaseIostat.class);
+  private HttpConnector conn = null;
+  private ChukwaAgent agent = null;
+  private int agentPort = 9093;
+  private int collectorPort = 9990;
+  private Server jettyCollector = null;
+  private ChukwaHttpSender sender = null;
+  private MiniDFSCluster dfs = null;
+  private FileSystem fileSys = null;
+  private MiniMRCluster mr = null;
+  int NUM_HADOOP_SLAVES = 4;
+  int LINES = 10000;
+  int THREADS = 2;
+  private static final String dataSink = "/demux/input";
+  private static Path DEMUX_INPUT_PATH = null;
+  private static Path DEMUX_OUTPUT_PATH = null;
+  private ChukwaConfiguration conf = new ChukwaConfiguration();
+  private static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
+  private static String cluster = "demo";
+  long[] timeWindow = {7, 30, 91, 365, 3650};
+  long current = 1244617200000L;  // 2009-06-10
+  
+  public void setUp() {
+    
+    // Startup HDFS Cluster
+    try {
+      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
+          null);
+      fileSys = dfs.getFileSystem();
+      DEMUX_INPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+dataSink); 
        
+      DEMUX_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+"/demux/output");
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail("Fail to startup HDFS cluster.");      
+    }
+
+    // Startup MR Cluster
+    try {
+      System.setProperty("hadoop.log.dir", System.getProperty(
+          "test.build.data", "/tmp"));
+      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
+          .toString(), 1);
+    } catch(Exception e) {
+      fail("Fail to startup Map/reduce cluster.");
+    }
+    
+    // Startup Collector
+    try {
+      // Configure Collector
+      conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
+      conf.set("writer.hdfs.filesystem",fileSys.getUri().toString());
+      conf.set("chukwaCollector.outputDir",dataSink);
+      conf.set("chukwaCollector.rotateInterval", "10000");
+      
+      // Set up jetty connector
+      SelectChannelConnector jettyConnector = new SelectChannelConnector();
+      jettyConnector.setLowResourcesConnections(THREADS-1);
+      jettyConnector.setLowResourceMaxIdleTime(1500);
+      jettyConnector.setPort(collectorPort);
+      
+      // Set up jetty server proper, using connector
+      jettyCollector = new Server(collectorPort);
+      Context root = new Context(jettyCollector, "/", Context.SESSIONS);
+      root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
+      jettyCollector.start();
+      jettyCollector.setStopAtShutdown(false);
+      Thread.sleep(10000);
+    } catch(Exception e) {
+      fail("Fail to startup collector.");
+    }
+    
+    // Startup Agent
+    try {
+      // Configure Agent
+      conf.set("chukwaAgent.tags", "cluster=\"demo\"");
+      DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
+      conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
+      conf.set("chukwaAgent.checkpoint.interval", "10000");
+      int portno = conf.getInt("chukwaAgent.control.port", agentPort);
+      agent = new ChukwaAgent(conf);
+      conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
+      conn.start();      
+      sender = new ChukwaHttpSender(conf);
+      ArrayList<String> collectorList = new ArrayList<String>();
+      collectorList.add("http://localhost:"+collectorPort+"/chukwa");
+      sender.setCollectors(new RetryListOfCollectors(collectorList, 50));
+    } catch (AlreadyRunningException e) {
+      fail("Chukwa Agent is already running");
+    }
+      
+    System.setProperty("CLUSTER","demo");
+    DatabaseWriter db = new DatabaseWriter(cluster);
+    String buffer = "";
+    File aFile = new File(System.getenv("CHUKWA_CONF_DIR")
+                 + File.separator + "database_create_tables.sql");
+    buffer = readFile(aFile);
+    String tables[] = buffer.split(";");
+    for(String table : tables) {
+      if(table.length()>5) {
+        db.execute(table);
+      }
+    }
+    db.close();
+    for(int i=0;i<timeWindow.length;i++) {
+      TableCreator tc = new TableCreator();
+      long start = current;
+      long end = current + (timeWindow[i]*1440*60*1000);
+      tc.createTables(start, end);
+    }    
+  }
+
+  public String readFile(File aFile) {
+    StringBuffer contents = new StringBuffer();
+    try {
+      BufferedReader input = new BufferedReader(new FileReader(aFile));
+      try {
+        String line = null; // not declared within while loop
+        while ((line = input.readLine()) != null) {
+          contents.append(line);
+          contents.append(System.getProperty("line.separator"));
+        }
+      } finally {
+        input.close();
+      }
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    }
+    return contents.toString();
+  }
+  
+  public void tearDown() {
+    try {
+      agent.shutdown();
+      conn.shutdown();
+      jettyCollector.stop();
+      mr.shutdown();
+      dfs.shutdown();
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    DatabaseWriter db = null;
+    try {
+      db = new DatabaseWriter(cluster);
+      ResultSet rs = db.query("show tables");
+      ArrayList<String> list = new ArrayList<String>();
+      while(rs.next()) {
+        String table = rs.getString(1);
+        list.add(table);
+      }
+      for(String table : list) {
+        db.execute("drop table "+table);
+      }
+    } catch(Throwable ex) {
+    } finally {
+      if(db!=null) {
+        db.close();
+      }
+    }
+  }
+  
+  public void testChukwaFramework() {
+    try {
+      // Test Chukwa Agent Controller and Agent Communication
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", agentPort);
+      String[] source = new File(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log").list(new
FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          return name.endsWith(".log");
+        }
+      });
+
+      for(String fname : source) {
+        StringBuilder fullPath = new StringBuilder();
+        fullPath.append(System.getenv("CHUKWA_DATA_DIR"));
+        fullPath.append(File.separator);
+        fullPath.append("log");
+        fullPath.append(File.separator);        
+        fullPath.append(fname);
+        String recordType = fname.substring(0,fname.indexOf("."));
+        long adaptorId = cli.add(
+          "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",

+          recordType, "0 " + fullPath.toString(), 0);
+        assertTrue(adaptorId != -1);
+        Thread.sleep(2000);
+      }
+      cli.removeAll();
+      Thread.sleep(30000);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+    
+    
+    // Test Data Sink files written by Collector    
+    Path demuxDir = new Path(dataSink+"/*");
+    FileSystem fs;
+    try {
+      fs = dfs.getFileSystem();
+      FileStatus[] events = fs.globStatus(demuxDir);
+      log.info("Number of data sink files written:"+events.length);
+      assertTrue(events.length!=0);
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("File System Error.");
+    }
+    
+    // Test Demux    
+    log.info("Testing demux");
+    try {
+      //ChukwaConfiguration conf = new ChukwaConfiguration();
+      System.setProperty("hadoop.log.dir", System.getProperty(
+          "test.build.data", "/tmp"));
+    
+      String[] sortArgs = { DEMUX_INPUT_PATH.toString(), DEMUX_OUTPUT_PATH.toString() };
+//      JobConf job = mr.createJobConf();
+      JobConf job = new JobConf(new ChukwaConfiguration(), Demux.class);
+      job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
+      job.setJobName("Chukwa-Demux_" + day.format(new Date()));
+      job.setInputFormat(SequenceFileInputFormat.class);
+      job.setMapperClass(Demux.MapClass.class);
+      job.setPartitionerClass(ChukwaRecordPartitioner.class);
+      job.setReducerClass(Demux.ReduceClass.class);
+
+      job.setOutputKeyClass(ChukwaRecordKey.class);
+      job.setOutputValueClass(ChukwaRecord.class);
+      job.setOutputFormat(ChukwaRecordOutputFormat.class);
+      job.setJobPriority(JobPriority.VERY_HIGH);
+      job.setNumMapTasks(2);
+      job.setNumReduceTasks(1);
+      Path input = new Path(fileSys.getUri().toString()+File.separator+dataSink+File.separator+"*.done");
+      FileInputFormat.setInputPaths(job, input);
+      FileOutputFormat.setOutputPath(job, DEMUX_OUTPUT_PATH);
+      String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+          return name.endsWith(".jar");
+        }
+      });
+      job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
+      //assertEquals(ToolRunner.run(job, new Demux(), sortArgs), 0);
+      JobClient.runJob(job);
+    } catch (Exception e) {
+      fail(e.toString());
+    }
+
+    // Test DataLoader
+    try {
+      fs = dfs.getFileSystem();
+      Path outputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/*.evt");
+      FileStatus[] demuxOutput = fs.globStatus(outputPath);
+      log.info("Number of chukwa records files written:"+demuxOutput.length);
+      assertTrue(demuxOutput.length!=0);
+      for(FileStatus fname : demuxOutput) {
+        MetricDataLoader mdl = new MetricDataLoader(conf, fs, fname.getPath().toUri().toString());
+        mdl.call();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Metric Data Loader Error.");
+    }    
+    
+    // Verify Data
+    DatabaseWriter db = null;
+    try {
+      db = new DatabaseWriter(cluster);
+      Macro mp = new Macro(current,current, "select * from [system_metrics]");
+      String query = mp.toString();
+      ResultSet rs = db.query(query);
+      ResultSetMetaData rmeta = rs.getMetaData();
+      int size = rmeta.getColumnCount();
+      while(rs.next()) {
+        for(int i=1;i<=size;i++) {
+          int columnType = rmeta.getColumnType(i);
+          if(columnType==java.sql.Types.BIGINT ||
+             columnType==java.sql.Types.INTEGER) {
+            long testValue = rs.getLong(i);
+            assertTrue(testValue<1000000000L);
+          } else if(columnType==java.sql.Types.FLOAT ||
+              columnType==java.sql.Types.DOUBLE) {
+            double testValue = rs.getDouble(i);
+            assertTrue(testValue<1000000000L);
+          }
+        }
+      }
+    } catch(Throwable ex) {
+      fail("Data verification failed.");
+    } finally {
+      if(db!=null) {
+        db.close();
+      }
+    }      
+    
+  }
+
+}



Mime
View raw message