hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685353 [12/13] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/build/ src/contrib/chukwa/conf/ src/contrib/chukwa/dist/ src/contrib/chukwa/docs/ src/contrib/chukwa/docs/paper/ src/contrib/chukwa/h...
Date Tue, 12 Aug 2008 22:35:23 GMT
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,553 @@
+/*
+ * Copyright (C) The Apache Software Foundation. All rights reserved.
+ *
+ * This software is published under the terms of the Apache Software
+ * License version 1.1, a copy of which has been included with this
+ * distribution in the LICENSE.txt file.  */
+
+
+
+package org.apache.hadoop.chukwa.inputtools.log4j;
+
+import java.io.IOException;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.Locale;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import org.apache.hadoop.chukwa.util.RecordConstants;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+
+/**
+    ChukwaDailyRollingFileAppender is a slightly modified version of
+    DailyRollingFileAppender, with modified versions of its
+    <code>subAppend()</code> and <code>rollOver()</code> functions. 
+    We would have preferred to sub-class DailyRollingFileAppender but
+    its authors clearly did not intend that to be a viable option since
+    they made too much of the class private or package-private
+
+    DailyRollingFileAppender extends {@link FileAppender} so that the
+    underlying file is rolled over at a user chosen frequency.
+
+    <p>The rolling schedule is specified by the <b>DatePattern</b>
+    option. This pattern should follow the {@link SimpleDateFormat}
+    conventions. In particular, you <em>must</em> escape literal text
+    within a pair of single quotes. A formatted version of the date
+    pattern is used as the suffix for the rolled file name.
+
+    <p>For example, if the <b>File</b> option is set to
+    <code>/foo/bar.log</code> and the <b>DatePattern</b> set to
+    <code>'.'yyyy-MM-dd</code>, on 2001-02-16 at midnight, the logging
+    file <code>/foo/bar.log</code> will be copied to
+    <code>/foo/bar.log.2001-02-16</code> and logging for 2001-02-17
+    will continue in <code>/foo/bar.log</code> until it rolls over
+    the next day.
+
+    <p>Is is possible to specify monthly, weekly, half-daily, daily,
+    hourly, or minutely rollover schedules.
+
+    <p><table border="1" cellpadding="2">
+    <tr>
+    <th>DatePattern</th>
+    <th>Rollover schedule</th>
+    <th>Example</th>
+
+    <tr>
+    <td><code>'.'yyyy-MM</code>
+    <td>Rollover at the beginning of each month</td>
+
+    <td>At midnight of May 31st, 2002 <code>/foo/bar.log</code> will be
+    copied to <code>/foo/bar.log.2002-05</code>. Logging for the month
+    of June will be output to <code>/foo/bar.log</code> until it is
+    also rolled over the next month.
+
+    <tr>
+    <td><code>'.'yyyy-ww</code>
+
+    <td>Rollover at the first day of each week. The first day of the
+    week depends on the locale.</td>
+
+    <td>Assuming the first day of the week is Sunday, on Saturday
+    midnight, June 9th 2002, the file <i>/foo/bar.log</i> will be
+    copied to <i>/foo/bar.log.2002-23</i>.  Logging for the 24th week
+    of 2002 will be output to <code>/foo/bar.log</code> until it is
+    rolled over the next week.
+
+    <tr>
+    <td><code>'.'yyyy-MM-dd</code>
+
+    <td>Rollover at midnight each day.</td>
+
+    <td>At midnight, on March 8th, 2002, <code>/foo/bar.log</code> will
+    be copied to <code>/foo/bar.log.2002-03-08</code>. Logging for the
+    9th day of March will be output to <code>/foo/bar.log</code> until
+    it is rolled over the next day.
+
+    <tr>
+    <td><code>'.'yyyy-MM-dd-a</code>
+
+    <td>Rollover at midnight and midday of each day.</td>
+
+    <td>At noon, on March 9th, 2002, <code>/foo/bar.log</code> will be
+    copied to <code>/foo/bar.log.2002-03-09-AM</code>. Logging for the
+    afternoon of the 9th will be output to <code>/foo/bar.log</code>
+    until it is rolled over at midnight.
+
+    <tr>
+    <td><code>'.'yyyy-MM-dd-HH</code>
+
+    <td>Rollover at the top of every hour.</td>
+
+    <td>At approximately 11:00.000 o'clock on March 9th, 2002,
+    <code>/foo/bar.log</code> will be copied to
+    <code>/foo/bar.log.2002-03-09-10</code>. Logging for the 11th hour
+    of the 9th of March will be output to <code>/foo/bar.log</code>
+    until it is rolled over at the beginning of the next hour.
+
+
+    <tr>
+    <td><code>'.'yyyy-MM-dd-HH-mm</code>
+
+    <td>Rollover at the beginning of every minute.</td>
+
+    <td>At approximately 11:23,000, on March 9th, 2001,
+    <code>/foo/bar.log</code> will be copied to
+    <code>/foo/bar.log.2001-03-09-10-22</code>. Logging for the minute
+    of 11:23 (9th of March) will be output to
+    <code>/foo/bar.log</code> until it is rolled over the next minute.
+
+    </table>
+
+    <p>Do not use the colon ":" character in anywhere in the
+    <b>DatePattern</b> option. The text before the colon is interpeted
+    as the protocol specificaion of a URL which is probably not what
+    you want.
+
+
+    @author Eirik Lygre
+    @author Ceki G&uuml;lc&uuml; */
+public class ChukwaDailyRollingFileAppender extends FileAppender {
+
+
+  // The code assumes that the following constants are in a increasing
+  // sequence.
+  static final int TOP_OF_TROUBLE=-1;
+  static final int TOP_OF_MINUTE = 0;
+  static final int TOP_OF_HOUR   = 1;
+  static final int HALF_DAY      = 2;
+  static final int TOP_OF_DAY    = 3;
+  static final int TOP_OF_WEEK   = 4;
+  static final int TOP_OF_MONTH  = 5;
+
+  static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
+
+  /**
+    The date pattern. By default, the pattern is set to
+    "'.'yyyy-MM-dd" meaning daily rollover.
+   */
+  private String datePattern = "'.'yyyy-MM-dd";
+
+  /**
+      The log file will be renamed to the value of the
+      scheduledFilename variable when the next interval is entered. For
+      example, if the rollover period is one hour, the log file will be
+      renamed to the value of "scheduledFilename" at the beginning of
+      the next hour. 
+
+      The precise time when a rollover occurs depends on logging
+      activity. 
+   */
+  private String scheduledFilename;
+
+  /**
+    The next time we estimate a rollover should occur. */
+  private long nextCheck = System.currentTimeMillis () - 1;
+
+  Date now = new Date();
+
+  SimpleDateFormat sdf;
+
+  RollingCalendar rc = new RollingCalendar();
+
+  int checkPeriod = TOP_OF_TROUBLE;
+
+  ChukwaAgentController chukwaClient;
+  String chukwaClientHostname;
+  int chukwaClientPortNum;
+  long chukwaClientConnectNumRetry;
+  long chukwaClientConnectRetryInterval;
+
+  String recordType;
+  
+  
+  
+  
+  // The gmtTimeZone is used only in computeCheckPeriod() method.
+  static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT");
+
+
+  /**
+    The default constructor does nothing. */
+  public ChukwaDailyRollingFileAppender() throws IOException{
+    super();
+  }
+
+  /**
+     Instantiate a <code>DailyRollingFileAppender</code> and open the
+     file designated by <code>filename</code>. The opened filename will
+     become the ouput destination for this appender.
+
+   */
+  public ChukwaDailyRollingFileAppender (Layout layout, String filename,
+      String datePattern) throws IOException {
+    super(layout, filename, true);
+    System.out.println("Daily Rolling File Appender successfully registered file with agent: " + filename);
+    this.datePattern = datePattern;
+    activateOptions();
+  }
+
+  /**
+    The <b>DatePattern</b> takes a string in the same format as
+    expected by {@link SimpleDateFormat}. This options determines the
+    rollover schedule.
+   */
+  public void setDatePattern(String pattern) {
+    datePattern = pattern;
+  }
+
+  /** Returns the value of the <b>DatePattern</b> option. */
+  public String getDatePattern() {
+    return datePattern;
+  }
+  
+  public String getRecordType(){
+    if (recordType != null)
+      return recordType;
+    else
+      return "unknown";
+  }
+  
+  public void setRecordType(String recordType){
+    this.recordType = recordType;
+  }
+
+  public void activateOptions() {
+    super.activateOptions();
+    if(datePattern != null && fileName != null) {
+      now.setTime(System.currentTimeMillis());
+      sdf = new SimpleDateFormat(datePattern);
+      int type = computeCheckPeriod();
+      printPeriodicity(type);
+      rc.setType(type);
+      File file = new File(fileName);
+      scheduledFilename = fileName+sdf.format(new Date(file.lastModified()));
+
+    } else {
+      LogLog.error("Either File or DatePattern options are not set for appender ["
+          +name+"].");
+    }
+  }
+
+  void printPeriodicity(int type) {
+    switch(type) {
+    case TOP_OF_MINUTE:
+      LogLog.debug("Appender ["+name+"] to be rolled every minute.");
+      break;
+    case TOP_OF_HOUR:
+      LogLog.debug("Appender ["+name
+          +"] to be rolled on top of every hour.");
+      break;
+    case HALF_DAY:
+      LogLog.debug("Appender ["+name
+          +"] to be rolled at midday and midnight.");
+      break;
+    case TOP_OF_DAY:
+      LogLog.debug("Appender ["+name
+          +"] to be rolled at midnight.");
+      break;
+    case TOP_OF_WEEK:
+      LogLog.debug("Appender ["+name
+          +"] to be rolled at start of week.");
+      break;
+    case TOP_OF_MONTH:
+      LogLog.debug("Appender ["+name
+          +"] to be rolled at start of every month.");
+      break;
+    default:
+      LogLog.warn("Unknown periodicity for appender ["+name+"].");
+    }
+  }
+
+
+  // This method computes the roll over period by looping over the
+  // periods, starting with the shortest, and stopping when the r0 is
+  // different from from r1, where r0 is the epoch formatted according
+  // the datePattern (supplied by the user) and r1 is the
+  // epoch+nextMillis(i) formatted according to datePattern. All date
+  // formatting is done in GMT and not local format because the test
+  // logic is based on comparisons relative to 1970-01-01 00:00:00
+  // GMT (the epoch).
+
+  int computeCheckPeriod() {
+    RollingCalendar rollingCalendar = new RollingCalendar(gmtTimeZone, Locale.ENGLISH);
+    // set sate to 1970-01-01 00:00:00 GMT
+    Date epoch = new Date(0);
+    if(datePattern != null) {
+      for(int i = TOP_OF_MINUTE; i <= TOP_OF_MONTH; i++) {
+        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(datePattern);
+        simpleDateFormat.setTimeZone(gmtTimeZone); // do all date formatting in GMT
+        String r0 = simpleDateFormat.format(epoch);
+        rollingCalendar.setType(i);
+        Date next = new Date(rollingCalendar.getNextCheckMillis(epoch));
+        String r1 =  simpleDateFormat.format(next);
+        //System.out.println("Type = "+i+", r0 = "+r0+", r1 = "+r1);
+        if(r0 != null && r1 != null && !r0.equals(r1)) {
+          return i;
+        }
+      }
+    }
+    return TOP_OF_TROUBLE; // Deliberately head for trouble...
+  }
+
+  /**
+    Rollover the current file to a new file.
+   */
+  void rollOver() throws IOException {
+
+    /* Compute filename, but only if datePattern is specified */
+    if (datePattern == null) {
+      errorHandler.error("Missing DatePattern option in rollOver().");
+      return;
+    }
+
+    String datedFilename = fileName+sdf.format(now);
+    // It is too early to roll over because we are still within the
+    // bounds of the current interval. Rollover will occur once the
+    // next interval is reached.
+    if (scheduledFilename.equals(datedFilename)) {
+      return;
+    }
+
+    // close current file, and rename it to datedFilename
+    this.closeFile();
+
+    if (chukwaClient != null){
+      chukwaClient.pauseFile(getRecordType(),fileName);
+    }
+
+    File target  = new File(scheduledFilename);
+    if (target.exists()) {
+      target.delete();
+    }
+
+    File file = new File(fileName);
+    boolean result = file.renameTo(target);
+    if(result) {
+      LogLog.debug(fileName +" -> "+ scheduledFilename);
+    } else {
+      LogLog.error("Failed to rename ["+fileName+"] to ["+scheduledFilename+"].");
+    }
+
+    try {
+      // This will also close the file. This is OK since multiple
+      // close operations are safe.
+      this.setFile(fileName, false, this.bufferedIO, this.bufferSize);
+    }
+    catch(IOException e) {
+      errorHandler.error("setFile("+fileName+", false) call failed.");
+    }
+
+    //resume the adaptor for the file now that we have emptied it (i.e. rolled it over)
+    if (chukwaClient.isFilePaused(getRecordType(), fileName)){
+      chukwaClient.resumeFile(getRecordType(), fileName);
+    }
+    else {
+      LogLog.warn("chukwa appender for file " + fileName + " was not paused, so we didn't do resumeFile() for it");
+    }
+    
+    scheduledFilename = datedFilename;
+  }
+
+  /**
+   * This method differentiates DailyRollingFileAppender from its
+   * super class.
+   *
+   * <p>Before actually logging, this method will check whether it is
+   * time to do a rollover. If it is, it will schedule the next
+   * rollover time and then rollover.
+   * */
+  protected void subAppend(LoggingEvent event) {
+    //we set up the chukwa adaptor here because this is the first
+    //point which is called after all setters have been called with
+    //their values from the log4j.properties file, in particular we
+    //needed to give setCukwaClientPortNum() and -Hostname() a shot
+    if (chukwaClient == null){
+        if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
+        chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
+        System.out.println("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
+      }
+      else{
+        chukwaClient = new ChukwaAgentController();
+        System.out.println("setup adaptor with no args, which means it used its defaults");
+      }
+        
+      //if they haven't specified, default to retrying every 10 seconds for 5 minutes
+      long retryInterval = chukwaClientConnectRetryInterval;
+      if (retryInterval == 0)
+        retryInterval = 1000;
+      long numRetries = chukwaClientConnectNumRetry;
+      if (numRetries == 0)
+        numRetries = 30;
+      long adaptorID = chukwaClient.addFile(getRecordType(), getFile(), numRetries, retryInterval);
+      if (adaptorID > 0){
+        System.out.println("Added file tailing adaptor to chukwa agent for file " + getFile());
+      }
+      else{
+        System.out.println("Chukwa adaptor not added, addFile(" + getFile() + ") returned " + adaptorID);
+      }
+    }
+    long n = System.currentTimeMillis();
+    if (n >= nextCheck) {
+      now.setTime(n);
+      nextCheck = rc.getNextCheckMillis(now);
+      try {
+        rollOver();
+      }
+      catch(IOException ioe) {
+        LogLog.error("rollOver() failed.", ioe);
+      }
+    }
+    //escape the newlines from record bodies and then write this record to the log file
+    this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+    
+    if(layout.ignoresThrowable()) {
+      String[] s = event.getThrowableStrRep();
+      if (s != null) {
+        int len = s.length;
+        for(int i = 0; i < len; i++) {
+          this.qw.write(s[i]);
+          this.qw.write(Layout.LINE_SEP);
+        }
+      }
+    }
+
+    if(this.immediateFlush) {
+      this.qw.flush();
+    }
+  }
+
+  public String getChukwaClientHostname() {
+    return chukwaClientHostname;
+  }
+
+  public void setChukwaClientHostname(String chukwaClientHostname) {
+    this.chukwaClientHostname = chukwaClientHostname;
+  }
+
+  public int getChukwaClientPortNum() {
+    return chukwaClientPortNum;
+  }
+
+  public void setChukwaClientPortNum(int chukwaClientPortNum) {
+    this.chukwaClientPortNum = chukwaClientPortNum;
+  }
+
+  public void setChukwaClientConnectNumRetry(int i){
+    this.chukwaClientConnectNumRetry = i;
+  }
+  
+  public void setChukwaClientConnectRetryInterval(long i) {
+    this.chukwaClientConnectRetryInterval = i;
+  }
+  
+}
+
+
+
+/**
+ *  RollingCalendar is a helper class to DailyRollingFileAppender.
+ *  Given a periodicity type and the current time, it computes the
+ *  start of the next interval.  
+ * */
+class RollingCalendar extends GregorianCalendar {
+
+  int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+
+  RollingCalendar() {
+    super();
+  }  
+
+  RollingCalendar(TimeZone tz, Locale locale) {
+    super(tz, locale);
+  }  
+
+  void setType(int type) {
+    this.type = type;
+  }
+
+  public long getNextCheckMillis(Date now) {
+    return getNextCheckDate(now).getTime();
+  }
+
+  public Date getNextCheckDate(Date now) {
+    this.setTime(now);
+
+    switch(type) {
+    case ChukwaDailyRollingFileAppender.TOP_OF_MINUTE:
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      this.add(Calendar.MINUTE, 1);
+      break;
+    case ChukwaDailyRollingFileAppender.TOP_OF_HOUR:
+      this.set(Calendar.MINUTE, 0);
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      this.add(Calendar.HOUR_OF_DAY, 1);
+      break;
+    case ChukwaDailyRollingFileAppender.HALF_DAY:
+      this.set(Calendar.MINUTE, 0);
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      int hour = get(Calendar.HOUR_OF_DAY);
+      if(hour < 12) {
+        this.set(Calendar.HOUR_OF_DAY, 12);
+      } else {
+        this.set(Calendar.HOUR_OF_DAY, 0);
+        this.add(Calendar.DAY_OF_MONTH, 1);
+      }
+      break;
+    case ChukwaDailyRollingFileAppender.TOP_OF_DAY:
+      this.set(Calendar.HOUR_OF_DAY, 0);
+      this.set(Calendar.MINUTE, 0);
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      this.add(Calendar.DATE, 1);
+      break;
+    case ChukwaDailyRollingFileAppender.TOP_OF_WEEK:
+      this.set(Calendar.DAY_OF_WEEK, getFirstDayOfWeek());
+      this.set(Calendar.HOUR_OF_DAY, 0);
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      this.add(Calendar.WEEK_OF_YEAR, 1);
+      break;
+    case ChukwaDailyRollingFileAppender.TOP_OF_MONTH:
+      this.set(Calendar.DATE, 1);
+      this.set(Calendar.HOUR_OF_DAY, 0);
+      this.set(Calendar.SECOND, 0);
+      this.set(Calendar.MILLISECOND, 0);
+      this.add(Calendar.MONTH, 1);
+      break;
+    default:
+      throw new IllegalStateException("Unknown periodicity type.");
+    }
+    return getTime();
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.log4j;
+
+import org.apache.log4j.*;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class OneLineLogLayout extends PatternLayout {
+  
+  char SEP = ' ';
+  public String format(LoggingEvent evt)
+  {
+    
+    String initial_s = super.format(evt);
+    StringBuilder sb = new StringBuilder();
+    for(int i = 0; i < initial_s.length() -1 ; ++i)
+    {
+      char c = initial_s.charAt(i);
+      if(c == '\n')
+        sb.append(SEP);
+      else
+        sb.append(c);
+    }
+    sb.append(SEP);
+    String[] s = evt.getThrowableStrRep();
+    if (s != null) {
+      int len = s.length;
+      for(int i = 0; i < len; i++) {
+        sb.append(s[i]);
+        sb.append(SEP);
+        }
+    }
+    
+    sb.append('\n');
+    return sb.toString();
+  }
+  
+  public boolean ignoresThrowable()
+  {
+    return false;
+  }
+  
+  public static void main(String[] args)
+  {
+    System.setProperty("line.separator", " ");
+    Logger l = Logger.getRootLogger();
+    l.removeAllAppenders();
+    Appender appender = new ConsoleAppender(new OneLineLogLayout());
+    appender.setName("console");
+    l.addAppender(appender);
+    l.warn("testing", new java.io.IOException("just kidding!"));
+    
+    
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataConfig {
+    private static Configuration config;
+    final static String DATACONFIG = "mdl.xml";
+    private Log log = LogFactory.getLog(DataConfig.class);
+    
+    public DataConfig(String path) {
+        Path fileResource = new Path(path);
+        config = new Configuration();
+        config.addResource(fileResource);
+    }
+    public DataConfig() {
+    	String dataConfig = System.getenv("DATACONFIG");
+    	if(dataConfig==null) {
+    		dataConfig=DATACONFIG;
+    	}
+    	log.debug("DATACONFIG="+dataConfig);
+    	if(config==null)  {
+    		try {
+                Path fileResource = new Path(dataConfig);
+                config = new Configuration();
+                config.addResource(fileResource);
+    		} catch (Exception e) {
+    			log.debug("Error reading configuration file:"+dataConfig);
+    		}
+    	}
+    }
+
+    public String get(String key) {
+        return config.get(key);
+    }
+    public void put(String key, String value) {
+        config.set(key, value);
+    }
+    public Iterator<Map.Entry <String, String>> iterator() {
+        return config.iterator();
+    }
+    public HashMap<String, String> startWith(String key) {
+        HashMap<String, String> transformer = new HashMap<String, String>();
+        Iterator<Map.Entry <String, String>> entries = config.iterator();
+        while(entries.hasNext()) {
+           String entry = entries.next().toString();
+           if(entry.startsWith(key)) {
+               String[] metrics = entry.split("=");
+               transformer.put(metrics[0],metrics[1]);
+           }
+        }
+        return transformer;
+    }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class LoaderServer {
+	
+	String name;
+	private static Log log = LogFactory.getLog(LoaderServer.class);
+	private static FileLock lock = null;
+        private static FileOutputStream pidFileOutput = null;
+	
+	public LoaderServer(String name){
+		this.name=name;
+	}
+	
+	public void init() throws IOException{
+  	     String pidLong=ManagementFactory.getRuntimeMXBean().getName();
+  	     String[] items=pidLong.split("@");
+  	     String pid=items[0];
+	     String chukwaPath=System.getProperty("CHUKWA_HOME");
+	     StringBuffer pidFilesb=new StringBuffer();
+	     pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid");
+	     try{
+	         File pidFile= new File(pidFilesb.toString());
+
+	         pidFileOutput= new FileOutputStream(pidFile);
+             pidFileOutput.write(pid.getBytes());
+	         pidFileOutput.flush();
+	         FileChannel channel = pidFileOutput.getChannel();
+	         LoaderServer.lock = channel.tryLock();
+             if(LoaderServer.lock!=null) {
+	             log.info("Initlization succeeded...");
+             } else {
+                 throw(new IOException());
+             }
+	     }catch (IOException ex){
+	    	 System.out.println("Initializaiton failed: can not write pid file.");
+	    	 log.error("Initialization failed...");
+	    	 log.error(ex.getMessage());
+	    	 System.exit(-1);
+	    	 throw ex;
+	    	 
+	     }
+	   
+	}	
+	
+	public void clean(){
+        String chukwaPath=System.getenv("CHUKWA_HOME");
+        StringBuffer pidFilesb=new StringBuffer();
+        pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid"); 
+        String pidFileName=pidFilesb.toString();
+
+        File pidFile=new File(pidFileName);
+        if (!pidFile.exists()) {
+    	   log.error("Delete pid file, No such file or directory: "+pidFileName);
+        } else {
+           try {
+               lock.release();
+	       pidFileOutput.close();
+           } catch(IOException e) {
+               log.error("Unable to release file lock: "+pidFileName);
+           }
+        }
+
+        boolean result=pidFile.delete();
+        if (!result){
+    	   log.error("Delete pid file failed, "+pidFileName);
+        }
+	}
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/ExecPlugin.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.plugin;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.json.JSONObject;
+
+public abstract class ExecPlugin implements IPlugin
+{
+	public final int statusOK = 100;
+	public final int statusKO = -100;
+	
+	Process process;
+	
+	public ExecPlugin()
+	{
+		
+	}
+	
+	public void stop() {
+	  process.destroy();
+	}
+	
+	public int waitFor() throws InterruptedException {
+	  return process.waitFor();
+	}
+	
+	public abstract String getCmde();
+	
+	public JSONObject postProcess(JSONObject execResult)
+	{
+		return execResult;
+	}
+	
+	public JSONObject execute()
+	{
+		JSONObject result = new JSONObject();
+		try
+		{
+			result.put("timestamp", System.currentTimeMillis());
+			
+			Runtime runtime = Runtime.getRuntime();
+			process = runtime.exec(getCmde());
+//			ProcessBuilder builder = new ProcessBuilder(cmde);
+//			Process process = builder.start();
+			
+
+			
+			
+			OutputReader stdOut = new OutputReader(process,Output.stdOut);
+			stdOut.start();
+			OutputReader stdErr = new OutputReader(process,Output.stdErr);
+			stdErr.start();
+		    int exitValue =process.waitFor();
+		    stdOut.join();
+		    stdErr.join();
+		    result.put("exitValue", exitValue);
+		    result.put("stdout", stdOut.output.toString());
+		    result.put("stderr", stdErr.output.toString());
+		    result.put("status", statusOK);
+		}
+		catch (Throwable e)
+		{
+			try 
+			{
+				result.put("status", statusKO);
+				result.put("errorLog", e.getMessage());
+			}
+			catch(Exception e1) { e1.printStackTrace();}
+			e.printStackTrace();
+		}
+
+		return postProcess(result);
+	}
+}
+
+
+enum Output{stdOut,stdErr};
+
+class OutputReader extends Thread
+{
+	private Process process = null;
+	private Output outputType = null;
+	public StringBuilder output = new StringBuilder();
+	public boolean isOk = true;
+
+
+	public OutputReader(Process process,Output outputType)
+	{
+		this.process = process;
+		this.outputType = outputType;
+	}
+	public void run()
+	{
+	   try
+		{
+		    String line = null;
+		    InputStream is = null;
+		    switch(this.outputType)
+		    {
+		    case stdOut:
+		    	is = process.getInputStream();
+		    	break;
+		    case stdErr:
+		    	is = process.getErrorStream();
+		    	break;
+		    	
+		    }
+		   
+		    InputStreamReader isr = new InputStreamReader(is);
+		    BufferedReader br = new BufferedReader(isr);
+		    while ((line = br.readLine()) != null) 
+		    {
+		    	 //System.out.println("========>>>>>>>["+line+"]");	
+		    	 output.append(line).append("\n");
+		    }
+		}
+		catch (IOException e)
+		{
+			isOk = false;
+			e.printStackTrace();
+		}
+		catch (Throwable e)
+		{
+			isOk = false;
+			e.printStackTrace();
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/IPlugin.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.plugin;
+
+import org.json.JSONObject;
+
+public interface IPlugin
+{
+	JSONObject execute();
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,47 @@
+package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class Exec extends ExecPlugin
+{
+	private static Log log = LogFactory.getLog(Exec.class);
+	private String cmde = null;
+	
+	public Exec(String[] cmds)
+	{
+		StringBuffer c = new StringBuffer();
+		for(String cmd : cmds) {
+			c.append(cmd);
+			c.append(" ");
+		}
+		cmde = c.toString();
+	}
+	
+	@Override
+	public String getCmde()
+	{
+		return cmde;
+	}
+
+	public static void main(String[] args) throws JSONException
+	{
+		IPlugin plugin = new Exec(args);
+		JSONObject result = plugin.execute();		
+		if (result.getInt("status") < 0)
+		{
+			System.out.println("Error");
+			log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
+			System.exit(-1);
+		}
+		else
+		{
+			log.info(result.get("stdout"));
+		}
+		System.exit(0);
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
+import java.io.*;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.log4j.Logger;
+
+public class Log4JMetricsContext extends AbstractMetricsContext {
+
+  static Logger out = Logger.getLogger(Log4JMetricsContext.class);
+  
+  /* Configuration attribute names */
+//  protected static final String FILE_NAME_PROPERTY = "fileName";
+  protected static final String PERIOD_PROPERTY = "period";
+
+    
+  /** Creates a new instance of FileContext */
+  public Log4JMetricsContext() {}
+     
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
+  /*      
+    String fileName = getAttribute(FILE_NAME_PROPERTY);
+    if (fileName != null) {
+      file = new File(fileName);
+    }
+    */    
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
+    }
+  }
+  
+  @Override
+  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
+      throws IOException
+  {
+    StringBuilder writer = new StringBuilder();
+    String separator = " ";
+    writer.append("contextName=");
+    writer.append(contextName);
+    
+    writer.append(separator);
+    writer.append("recordName=");
+    writer.append(recordName);
+    
+
+    writer.append(separator);
+    writer.append("chukwa_timestamp="+ System.currentTimeMillis());
+    writer.append(recordName);
+    
+    for (String tagName : outRec.getTagNames()) {
+      writer.append(separator);
+      writer.append(tagName);
+      writer.append("=");
+      writer.append(outRec.getTag(tagName));
+    }
+    for (String metricName : outRec.getMetricNames()) {
+      writer.append(separator);
+      writer.append(metricName);
+      writer.append("=");
+      writer.append(outRec.getMetric(metricName));
+    }
+    
+    out.info(writer.toString());
+//    out.println(writer);
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/nodeactivity/NodeActivityPlugin.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.inputtools.plugin.nodeactivity;
+
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.json.JSONObject;
+
+public class NodeActivityPlugin extends ExecPlugin
+{
+	private String cmde = null;
+	private DataConfig dataConfig = null;
+	
+	public NodeActivityPlugin()
+	{
+		dataConfig = new DataConfig();
+		cmde = dataConfig.get("mdl.plugin.NodeActivityPlugin.cmde");
+	}
+	
+	@Override
+	public String getCmde()
+	{
+		return cmde;
+	}
+	
+	@Override
+	public JSONObject postProcess(JSONObject execResult)
+	{
+		try
+		{
+			if (execResult.getInt("status") < 0)
+			{
+				return execResult;
+			}
+			
+			String res = execResult.getString("stdout");
+			
+			String[] tab = res.split("\n");
+			int totalFreeNode = 0;
+			int totalUsedNode = 0;
+			int totalDownNode = 0;
+			
+			for(int i=0;i<tab.length;i++)
+			{
+				if (tab[i].indexOf("state =") <0)
+				{
+					tab[i] = null;
+					continue;
+				}
+	
+				String[] line = tab[i].split("state =");
+				tab[i] = null;
+				
+				if (line[1].trim().equals("free"))
+				{
+					totalFreeNode ++;
+				}
+				else if (line[1].trim().equals("job-exclusive"))
+				{
+					totalUsedNode ++;
+				}
+				else
+				{
+					totalDownNode ++;
+				}
+			}
+			
+
+			execResult.put("totalFreeNode", totalFreeNode);
+			execResult.put("totalUsedNode", totalUsedNode);
+			execResult.put("totalDownNode", totalDownNode);
+			execResult.put("source", "NodeActivity");
+			
+			execResult.put("status", 100);	
+			
+		} catch (Throwable e)
+		{
+			try
+			{
+				execResult.put("source", "NodeActivity");
+				execResult.put("status", -100);	
+				execResult.put("errorLog",e.getMessage());
+			}
+			catch(Exception e1) { e1.printStackTrace();}
+			e.printStackTrace();
+			
+		}
+		
+		return execResult;
+	}
+
+	public static void main(String[] args)
+	{
+		IPlugin plugin = new NodeActivityPlugin();
+		JSONObject result = plugin.execute();
+		System.out.print("Result: " + result);
+		
+		
+		
+		
+	}
+
+
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/pbsnode/PbsNodePlugin.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,45 @@
+package org.apache.hadoop.chukwa.inputtools.plugin.pbsnode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class PbsNodePlugin extends ExecPlugin
+{
+	private static Log log = LogFactory.getLog(PbsNodePlugin.class);
+	private String cmde = null;
+	private DataConfig dataConfig = null;
+	
+	public PbsNodePlugin()
+	{
+		dataConfig = new DataConfig();
+		cmde = dataConfig.get("chukwa.inputtools.plugin.pbsNode.cmde");
+	}
+	
+	@Override
+	public String getCmde()
+	{
+		return cmde;
+	}
+
+	public static void main(String[] args) throws JSONException
+	{
+		IPlugin plugin = new PbsNodePlugin();
+		JSONObject result = plugin.execute();
+		System.out.print("Result: " + result);	
+		
+		if (result.getInt("status") < 0)
+		{
+			System.out.println("Error");
+			log.warn("[ChukwaError]:"+ PbsNodePlugin.class + ", " + result.getString("stderr"));
+		}
+		else
+		{
+			log.info(result.get("stdout"));
+		}
+	}
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.util.Random;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+
+public class ConstRateAdaptor  extends Thread implements Adaptor {
+
+
+  private static final int SLEEP_VARIANCE = 200; 
+  private static final int MIN_SLEEP = 300; 
+  
+  private String type;
+  private long offset;
+  private int bytesPerSec;
+  private ChunkReceiver dest;
+  
+  private volatile boolean stopping = false;
+  public String getCurrentStatus() throws AdaptorException {
+    return Integer.toString(bytesPerSec);
+  }
+
+  public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
+  {
+    try{
+      bytesPerSec = Integer.parseInt(status);
+    } catch(NumberFormatException e) {
+      throw new AdaptorException("bad argument to const rate adaptor: " + status);
+    }
+    this.offset = offset;
+    this.type = type;
+    this.dest = dest;
+    this.setName("ConstRate Adaptor");
+    super.start();  //this is a Thread.start
+  }
+  
+  public void run()
+  {
+    Random r = new Random();
+    try{
+      while(!stopping) {
+        int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; //between 1 and 3 secs
+          //FIXME: I think there's still a risk of integer overflow here
+        int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
+        byte[] data = new byte[ arraySize];
+        r.nextBytes(data);
+        offset += data.length;
+        ChunkImpl evt = new ChunkImpl(type,"random data source",  offset, data , this);
+
+        dest.add(evt);
+        
+        Thread.sleep(MSToSleep);
+      } //end while
+    }  catch(InterruptedException ie)
+    {} //abort silently
+  }
+  
+  public String toString() {
+    return "const rate " + type;
+  }
+
+  public void hardStop() throws AdaptorException {
+    stopping = true;
+  }
+
+  public long shutdown() throws AdaptorException {
+    stopping = true;
+    return offset;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/DatabaseWriter.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.sql.ResultSet;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+
+public class DatabaseWriter {
+    private static Log log = LogFactory.getLog(DatabaseWriter.class);
+    private Connection conn = null;    
+    private Statement stmt = null; 
+    private ResultSet rs = null;
+
+    public DatabaseWriter(String host, String user, String password) {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+host+"/";
+        if(user!=null) {
+            jdbc_url = jdbc_url + "?user=" + user;
+            if(password!=null) {
+                jdbc_url = jdbc_url + "&password=" + password;
+            }
+        }
+        try {
+            // The newInstance() call is a work around for some
+            // broken Java implementations
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        } catch (Exception ex) {
+            // handle the error
+            log.error(ex,ex);
+        }
+        try {
+            conn = DriverManager.getConnection(jdbc_url);
+            log.info("Initialized JDBC URL: "+jdbc_url);
+        } catch (SQLException ex) {
+            log.error(ex,ex);
+        }
+    }
+
+    public DatabaseWriter() {
+    	DataConfig mdlConfig = new DataConfig();
+    	String jdbc_url = "jdbc:mysql://"+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
+        if(mdlConfig.get("jdbc.user")!=null) {
+            jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+            if(mdlConfig.get("jdbc.password")!=null) {
+                jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+            }
+        }
+        try {
+            // The newInstance() call is a work around for some
+            // broken Java implementations
+            Class.forName("com.mysql.jdbc.Driver").newInstance();
+        } catch (Exception ex) {
+            // handle the error
+            log.error(ex,ex);
+        }
+        try {
+            conn = DriverManager.getConnection(jdbc_url);
+            log.info("Initialized JDBC URL: "+jdbc_url);
+        } catch (SQLException ex) {
+            log.error(ex,ex);
+        }
+    }
+    public void execute(String query) {
+        try {
+            stmt = conn.createStatement(); 
+            stmt.execute(query);
+        } catch (SQLException ex) {
+            // handle any errors
+            log.error(ex, ex);
+            log.error("SQL Statement:" + query);
+            log.error("SQLException: " + ex.getMessage());
+            log.error("SQLState: " + ex.getSQLState());
+            log.error("VendorError: " + ex.getErrorCode());
+        } finally {
+            if (stmt != null) {
+                try {
+                    stmt.close();
+                } catch (SQLException sqlEx) {
+                    // ignore
+                }
+                stmt = null;
+            }
+        }
+    }
+    public ResultSet query(String query) {
+        try {
+            stmt = conn.createStatement(); 
+            rs = stmt.executeQuery(query);
+        } catch (SQLException ex) {
+            // handle any errors
+            log.error(ex, ex);
+            log.error("SQL Statement:" + query);
+            log.error("SQLException: " + ex.getMessage());
+            log.error("SQLState: " + ex.getSQLState());
+            log.error("VendorError: " + ex.getErrorCode());
+        } finally {
+        }
+        return rs;
+    }
+    public void close() {
+    	// it is a good idea to release
+        // resources in a finally{} block
+        // in reverse-order of their creation
+        // if they are no-longer needed
+        if (rs != null) {
+            try {
+                rs.close();
+            } catch (SQLException sqlEx) {
+                // ignore
+            }
+            rs = null;
+        }
+        if (stmt != null) {
+            try {
+                stmt.close();
+            } catch (SQLException sqlEx) {
+                // ignore
+            }
+            stmt = null;
+        }    
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException sqlEx) {
+                // ignore
+            }
+            conn = null;
+        }
+    }
+    public String formatTimeStamp(long timestamp) {
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String format = formatter.format(timestamp);
+
+    	return format;
+    }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ExceptionUtil.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class ExceptionUtil {
+    public static String getStackTrace(Throwable t) {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        t.printStackTrace(pw);
+        pw.flush();
+        return sw.toString();
+    }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.util.Random;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.*;
+import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+
+public class MaxRateSender  extends Thread implements Adaptor {
+
+
+  public static final int BUFFER_SIZE = 60 * 1024;
+  public static final String ADAPTOR_NAME = "MaxRateSender";
+  
+  private volatile boolean stopping = false;
+  private long offset;
+  private String type;
+  ChunkReceiver dest;
+  
+  public String getCurrentStatus() throws AdaptorException {
+    return "";
+  }
+
+  public void start(String type, String status, long offset, ChunkReceiver dest) throws AdaptorException
+  {
+    this.setName("MaxRateSender adaptor");
+    this.offset = offset;
+    this.type = type;
+    this.dest = dest;
+    super.start();  //this is a Thread.start
+  }
+  
+  public void run()
+  {
+    Random r = new Random();
+    
+    try{
+      while(!stopping) {
+        byte[] data = new byte[ BUFFER_SIZE];
+        r.nextBytes(data);
+        offset += data.length;
+        ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data, this);
+        dest.add(evt);
+        
+      }
+    }  catch(InterruptedException ie)
+    {}
+  }
+  
+  public String toString() {
+    return ADAPTOR_NAME;
+  }
+
+  public long shutdown() throws AdaptorException {
+    stopping = true;
+    return offset;
+  }
+  
+  public void hardStop() throws AdaptorException {
+    stopping = true;
+  }
+  
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/RecordConstants.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+public class RecordConstants
+{
+  static final char[] CTRL_A =  {'\u0001'};
+  static final char[] CTRL_B =  {'\u0002'};
+  static final char[] CTRL_C =  {'\u0003'};
+  static final char[] CTRL_D =  {'\u0004'};
+	//public static final String FIELD_SEPARATOR = new String(CTRL_A);
+	public static final String DEFAULT_FIELD_SEPARATOR = "-#-";
+	public static final String DEFAULT_RECORD_SEPARATOR = "\n";
+	public static final String RECORD_SEPARATOR_ESCAPE_SEQ = new String (CTRL_D);// may want this to be very obscure, e.g. new String(CTRL_B) + new String (CTRL_C) + new String (CTRL_D)
+	
+	/**
+	 * Insert the default chukwa escape sequence in <code>record</code> before all occurances of 
+	 * <code>recordSeparator</code> <i>except</i> the final one if the final record separator occurs
+	 * at the end of the <code>record</code> 
+	 * @param recordSeparator The record separator that we are escaping. This is chunk source application specific 
+	 * @param record The string representing the entire record, including the final record delimiter
+	 * @return The string with appropriate <code>recordSeparator</code>s escaped
+	 */
+	public static String escapeAllButLastRecordSeparator(String recordSeparator,String record){
+	  String escapedRecord = "";
+	  if (record.endsWith(recordSeparator)){
+	    escapedRecord = record.substring(0,record.length()-recordSeparator.length()).replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator) + recordSeparator;
+	  }
+	  return escapedRecord;
+	}
+	
+  /**
+   * Insert the default chukwa escape sequence in <code>record</code> before all occurances of 
+   * <code>recordSeparator</code>. This is assuming that you are not passing the final record
+   * separator in with the <code>record</code>, because it would be escaped too. 
+   * @param recordSeparator The record separator that we are escaping. This is chunk source application specific 
+   * @param record The string representing the entire record, including the final record delimiter
+   * @return The string with all <code>recordSeparator</code>s escaped
+   */
+	 public static String escapeAllRecordSeparators(String recordSeparator,String record){
+	      return record.replaceAll(recordSeparator, RECORD_SEPARATOR_ESCAPE_SEQ + recordSeparator);
+	  }
+	
+	public static String recoverRecordSeparators(String recordSeparator, String record){
+    return record.replaceAll(RECORD_SEPARATOR_ESCAPE_SEQ+recordSeparator, recordSeparator);
+  }
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/TestChunkBuilder.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa;
+
+import junit.framework.TestCase;
+
+public class TestChunkBuilder extends TestCase {
+
+
+  public void testChunkBuilder()
+  {
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord("foo".getBytes());
+    cb.addRecord("bar".getBytes());
+    cb.addRecord("baz".getBytes());
+    Chunk chunk = cb.getChunk();
+    assertEquals(3, chunk.getRecordOffsets().length);
+    assertEquals(9, chunk.getSeqID());
+    assertEquals(2, chunk.getRecordOffsets()[0]);
+    assertEquals(5, chunk.getRecordOffsets()[1]);
+    assertEquals(8, chunk.getRecordOffsets()[2]);
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TempFileUtil.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.io.*;
+import java.util.Random;
+
+public class TempFileUtil {
+  public static File makeBinary(int length) throws IOException {
+    File tmpOutput = new File("/tmp/chukwaTest");
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+    Random r = new Random();
+    byte[] randomData = new byte[ length];
+    r.nextBytes(randomData);
+    randomData[ length-1] = '\n';//need data to end with \n since default tailer uses that
+    fos.write(randomData);
+    fos.flush();
+    fos.close();
+    return tmpOutput;
+  }
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TestAgentCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TestAgentCollector.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TestAgentCollector.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/TestAgentCollector.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.agent.*;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+import junit.framework.TestCase;
+
+/**
+ * Starts an agent, connector, collector in one process.
+ * Starts an adaptor to push a random temp file through through.
+ * 
+ *
+ */
+public class TestAgentCollector extends TestCase {
+  static Server server;
+  static Context root;
+  static ChukwaAgent agent ;
+  static HttpConnector connector;
+  int STOPS_AND_STARTS = 10;
+  
+  static {
+    try {
+    server = new Server(9990);
+    root = new Context(server,"/",Context.SESSIONS);
+    agent = new ChukwaAgent();
+    connector = new HttpConnector(agent, "http://localhost:9990/chukwa");
+    connector.start();
+
+    ConsoleWriter readInData =  new ConsoleWriter(false);
+    ServletCollector.setWriter(readInData);
+    root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+    server.start();
+    server.setStopAtShutdown(false);
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+  /**
+   * @param args
+   */
+  public void testAllOnce()
+  {
+    try {
+
+      InMemoryWriter readInData =  new InMemoryWriter();
+      ServletCollector.setWriter(readInData);
+  
+      Thread.sleep(1000);
+        
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      
+      File tmpOutput = new File("/tmp/chukwaTest");
+      FileOutputStream fos = new FileOutputStream(tmpOutput);
+      Random r = new Random();
+      boolean failed = false;
+      byte[] randomData = new byte[2000];
+      r.nextBytes(randomData);
+      randomData[1999] = '\n';//need data to end with \n since default tailer uses that
+      fos.write(randomData);
+      fos.flush();
+      fos.close();
+      
+      cli.addFile("unknown", tmpOutput.getAbsolutePath());
+      assertEquals(1, agent.adaptorCount());
+      cli.removeFile("unknown", tmpOutput.getAbsolutePath());
+      assertEquals(0, agent.adaptorCount());
+      org.apache.hadoop.chukwa.Chunk readIn = readInData.readOutChunk(randomData.length, 5000);
+      byte[] readInBytes = readIn.getData();
+      if(readInBytes.length != randomData.length)
+      {
+        System.err.println("FAIL: input ended at " + readInBytes.length + " bytes");
+        failed = true;
+      } else {
+        for(int i = 0; i < randomData.length ; ++i) {
+          byte next = readInBytes[i];
+          if(next != randomData[i]) {
+            System.err.println("FAIL: saw byte " + next + " at position " + i +
+                ", expected " + randomData[i]);
+            failed = true;
+            break;
+          }
+        }
+      }
+      cli.removeAll();
+      tmpOutput.delete();
+      assertFalse(failed);
+      System.out.println("done");
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  public void testStopAndStart() {
+
+    try {
+      ConsoleWriter readInData =  new ConsoleWriter(false);
+      ServletCollector.setWriter(readInData);
+  
+      Thread.sleep(1000);
+       
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      
+      for(int i=1; i < STOPS_AND_STARTS; ++i) {
+        cli.add("org.apache.hadoop.chukwa.util.ConstRateAdaptor", "oneatatime_raw" + i, "20000", 0);
+        assertEquals("adaptor failed to start", 1, agent.adaptorCount());
+        Thread.sleep(2000);   
+        cli.removeAll();
+        assertTrue("adaptor failed to stop", agent.adaptorCount() == 0);
+      }
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+    
+  }
+
+  public void testManyAdaptors() {
+    try {
+      
+      ConsoleWriter readInData =  new ConsoleWriter(false);
+      ServletCollector.setWriter(readInData);
+  
+      Thread.sleep(1000);
+
+      
+      ChukwaConfiguration cc = new ChukwaConfiguration();
+      int portno = cc.getInt("chukwaAgent.control.port", 9093);
+      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
+      assertEquals("no adaptors should be running pre-test",0, agent.adaptorCount());
+      for(int i=0; i < 10; ++i) {
+        
+        for(int n = 1; n < 7; ++n) {
+//          cli.add("org.apache.hadoop.chukwa.util.ConstRateAdaptor", "many_raw"+n, "20000", 0);
+          agent.processCommand("add org.apache.hadoop.chukwa.util.ConstRateAdaptor many_raw"+n + " 20000 0");
+          assertEquals(n, agent.adaptorCount());
+        }
+        
+        Thread.sleep(5000);
+   
+        cli.removeAll();
+        assertEquals("remove left some adaptors running", 0, agent.adaptorCount());
+      }
+  } catch(Exception e) {
+    e.printStackTrace();
+  }
+  }
+  
+  public static void main(String[] args)  {
+    new TestAgentCollector().testManyAdaptors();
+  }
+  
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestExecAdaptor.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+
+public class TestExecAdaptor extends TestCase {
+  
+  ChunkCatcherConnector chunks;
+  public TestExecAdaptor() {
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+  }
+  
+  public void testWithPs() throws ChukwaAgent.AlreadyRunningException {
+    try {
+      ChukwaAgent  agent = new ChukwaAgent();
+      agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor ps ps aux 0");
+  
+      Chunk c = chunks.waitForAChunk();
+      System.out.println(new String(c.getData()));
+    } catch(InterruptedException e) {
+      
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+
+public class TestFileTailingAdaptors extends TestCase {
+  ChunkCatcherConnector chunks;
+  public TestFileTailingAdaptors() {
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+  }
+  
+  public void testRawAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+    ChukwaAgent  agent = new ChukwaAgent();
+    
+    File testFile = makeTestFile("/tmp/chukwaTest");
+    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor" +
+        " raw " + testFile + " 0");
+    assertTrue(agent.adaptorCount() == 1);
+    Chunk c = chunks.waitForAChunk();
+    assertTrue(c.getDataType().equals("raw"));
+    assertTrue(c.getRecordOffsets().length == 1);
+    assertTrue(c.getSeqID() == testFile.length());     
+    agent.shutdown();
+  }
+
+
+  public void testCrSepAdaptor() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+    ChukwaAgent  agent = new ChukwaAgent();
+    File testFile = makeTestFile("/tmp/chukwaTest");
+    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8" +
+        " lines " + testFile + " 0");
+    assertTrue(agent.adaptorCount() == 1);
+    System.out.println("getting a chunk...");
+    Chunk c = chunks.waitForAChunk(); 
+    System.out.println("got chunk");
+    assertTrue(c.getSeqID() == testFile.length());    
+    
+    assertTrue(c.getRecordOffsets().length == 80);
+    int recStart = 0;
+    for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
+      String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
+      System.out.println("record "+ rec+ " was: " + record);
+      assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
+      recStart = c.getRecordOffsets()[rec] +1;
+    }
+    assertTrue(c.getDataType().equals("lines"));    
+    agent.shutdown();
+  }
+  
+  private File makeTestFile(String name) throws IOException {
+    File tmpOutput = new File(name);
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+    
+    PrintWriter pw = new PrintWriter(fos);
+    for(int i = 0; i < 80; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+  
+  public static void main(String[] args) {
+    try {
+      TestFileTailingAdaptors tests = new TestFileTailingAdaptors();
+      tests.testCrSepAdaptor();
+    } catch(Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=685353&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java Tue Aug 12 15:35:16 2008
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+
+import junit.framework.TestCase;
+
+public class TestStartAtOffset extends TestCase {
+  
+  ChunkCatcherConnector chunks;
+  public TestStartAtOffset() {
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+  }
+  
+  public void testStartAtOffset() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+    ChukwaAgent  agent = new ChukwaAgent();
+    File testFile = makeTestFile();
+    int startOffset = 50;
+    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
+         "lines "+ startOffset+ " " + testFile + " " + startOffset);
+    assertTrue(agent.adaptorCount() == 1);
+    System.out.println("getting a chunk...");
+    Chunk c = chunks.waitForAChunk(); 
+    System.out.println("got chunk");
+    assertTrue(c.getSeqID() == testFile.length() + startOffset);    
+    
+    assertTrue(c.getRecordOffsets().length == 80);//80 lines in test file
+    int recStart = 0;
+    for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
+      String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
+      System.out.println("record "+ rec+ " was: " + record);
+      assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
+      recStart = c.getRecordOffsets()[rec] +1;
+    }
+    assertTrue(c.getDataType().equals("lines"));    
+    agent.shutdown();
+  }
+  
+  public void testStartAfterOffset() throws IOException, InterruptedException, ChukwaAgent.AlreadyRunningException {
+    ChukwaAgent  agent = new ChukwaAgent();
+    File testFile = makeTestFile();
+    int startOffset = 50;
+    agent.processCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 " +
+         "lines "+ startOffset+ " " + testFile + " " + (startOffset + 29) );
+    assertTrue(agent.adaptorCount() == 1);
+    System.out.println("getting a chunk...");
+    Chunk c = chunks.waitForAChunk(); 
+    System.out.println("got chunk");
+    assertTrue(c.getSeqID() == testFile.length() + startOffset);    
+    
+    assertTrue(c.getRecordOffsets().length == 79);//80 lines in test file, minus the one we skipped
+    int recStart = 0;
+    for(int rec = 0 ; rec < c.getRecordOffsets().length; ++rec) {
+      String record = new String(c.getData(), recStart, c.getRecordOffsets()[rec] - recStart+1);
+      System.out.println("record "+ rec+ " was: " + record);
+      assertTrue(record.equals((rec+1) + " abcdefghijklmnopqrstuvwxyz\n"));
+      recStart = c.getRecordOffsets()[rec] +1;
+    }
+    assertTrue(c.getDataType().equals("lines"));    
+    agent.shutdown();
+  }
+  
+  private File makeTestFile() throws IOException {
+    File tmpOutput = new File("/tmp/chukwaTest");
+    FileOutputStream fos = new FileOutputStream(tmpOutput);
+    
+    PrintWriter pw = new PrintWriter(fos);
+    for(int i = 0; i < 80; ++i) {
+      pw.print(i + " ");
+      pw.println("abcdefghijklmnopqrstuvwxyz");
+    }
+    pw.flush();
+    pw.close();
+    return tmpOutput;
+  }
+  
+  
+}



Mime
View raw message