chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From billgra...@apache.org
Subject svn commit: r1139108 - in /incubator/chukwa/trunk: ./ conf/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/test/org/apache/hadoop/chukwa/datacollection/writer/
Date Thu, 23 Jun 2011 22:45:26 GMT
Author: billgraham
Date: Thu Jun 23 22:45:26 2011
New Revision: 1139108

URL: http://svn.apache.org/viewvc?rev=1139108&view=rev
Log:
CHUKWA-589. Allow collectors to close .chukwa files at fixed offset. (Himanshu Gahlot and
Shweta Shah via Bill Graham)

Modified:
    incubator/chukwa/trunk/build.xml
    incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template
    incubator/chukwa/trunk/default.properties
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
    incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java

Modified: incubator/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/build.xml?rev=1139108&r1=1139107&r2=1139108&view=diff
==============================================================================
--- incubator/chukwa/trunk/build.xml (original)
+++ incubator/chukwa/trunk/build.xml Thu Jun 23 22:45:26 2011
@@ -1118,6 +1118,12 @@
 				<replacetokens>
 					<token key="TODO-COLLECTORS-PORT" value="${TODO-COLLECTORS-PORT}" />
 				</replacetokens>
+        <replacetokens>
+					<token key="TODO-COLLECTORS-ISFIXEDTIMEROTATORSCHEME" value="${TODO-COLLECTORS-ISFIXEDTIMEROTATORSCHEME}"
/>
+				</replacetokens>
+        <replacetokens>
+					<token key="TODO-COLLECTORS-FIXEDTIMEINTERVALOFFSET" value="${TODO-COLLECTORS-FIXEDTIMEINTERVALOFFSET}"
/>
+				</replacetokens>
 
 			</filterchain>
 		</copy>

Modified: incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template?rev=1139108&r1=1139107&r2=1139108&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-collector-conf.xml.template Thu Jun 23 22:45:26 2011
@@ -97,6 +97,27 @@ your hbase.zookeeper.quorum setting to t
   </property>
 
   <property>
+    <name>chukwaCollector.isFixedTimeRotatorScheme</name>
+    <value>@TODO-COLLECTORS-ISFIXEDTIMEROTATORSCHEME</value>
+    <description>A flag to indicate that the collector should close at a fixed
+    offset after every rotateInterval. The default value is false which uses
+    the default scheme where collectors close after regular rotateIntervals.
+    If set to true then specify chukwaCollector.fixedTimeIntervalOffset value.
+    e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is
+    set to 10000 and rotateInterval is set to 300000, then the collector will
+    close its files at 10 seconds past the 5 minute mark, if
+    isFixedTimeRotatorScheme is false, collectors will rotate approximately
+    once every 5 minutes
+    </description>
+  </property>
+
+  <property>
+    <name>chukwaCollector.fixedTimeIntervalOffset</name>
+    <value>@TODO-COLLECTORS-FIXEDTIMEINTERVALOFFSET@</value>
+    <description>Chukwa fixed time interval offset value (ms)</description>
+  </property>
+
+  <property>
     <name>chukwaCollector.http.port</name>
     <value>@TODO-COLLECTORS-PORT@</value>
     <description>The HTTP port number the collector will listen on</description>

Modified: incubator/chukwa/trunk/default.properties
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/default.properties?rev=1139108&r1=1139107&r2=1139108&view=diff
==============================================================================
--- incubator/chukwa/trunk/default.properties (original)
+++ incubator/chukwa/trunk/default.properties Thu Jun 23 22:45:26 2011
@@ -27,6 +27,8 @@ TODO-COLLECTORS-LOCAL-OUTPUT-DIR=/tmp/ch
 TODO-COLLECTORS-NAMENODE=hdfs://localhost:9000/
 TODO-COLLECTORS-ROTATEINTERVAL=300000
 TODO-COLLECTORS-PORT=8080
+TODO-COLLECTORS-ISFIXEDTIMEROTATORSCHEME=false
+TODO-COLLECTORS-FIXEDTIMEINTERVALOFFSET=30000
 TODO-JAVA-HOME=/usr/lib/j2sdk1.5-sun
 TODO-HADOOP-HOME=/home/user/Development/hadoop-trunk
 TODO-HADOOP-CONF-DIR=/home/user/Development/hadoop-conf

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=1139108&r1=1139107&r2=1139108&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Thu Jun 23 22:45:26 2011
@@ -52,10 +52,14 @@ public class SeqFileWriter extends Pipel
 
   protected int STAT_INTERVAL_SECONDS = 30;
   private int rotateInterval = 1000 * 60 * 5;
+  private int offsetInterval = 1000 * 30;
+  private boolean if_fixed_interval = false;
   static final int ACQ_WAIT_ON_TERM = 500; //ms to wait for lock on a SIGTERM before aborting
   
   public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
   public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
+  public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme";
+  public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset";
   public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
   protected static String localHostAddr = null;
   
@@ -102,6 +106,8 @@ public class SeqFileWriter extends Pipel
     this.conf = conf;
 
     rotateInterval = conf.getInt(ROTATE_INTERVAL_OPT,rotateInterval);
+    if_fixed_interval = conf.getBoolean(IF_FIXED_INTERVAL_OPT,if_fixed_interval);
+    offsetInterval = conf.getInt(FIXED_INTERVAL_OFFSET_OPT,offsetInterval);
     
     STAT_INTERVAL_SECONDS = conf.getInt(STAT_PERIOD_OPT, STAT_INTERVAL_SECONDS);
 
@@ -113,6 +119,11 @@ public class SeqFileWriter extends Pipel
     }
 
     log.info("rotateInterval is " + rotateInterval);
+    if(if_fixed_interval)
+      log.info("using fixed time interval scheme, " +
+              "offsetInterval is " + offsetInterval);
+    else
+      log.info("not using fixed time interval scheme");
     log.info("outputDir is " + outputDir);
     log.info("fsname is " + fsname);
     log.info("filesystem type from core-default.xml is "
@@ -239,16 +250,63 @@ public class SeqFileWriter extends Pipel
     }
     
     // Schedule the next timer
+    scheduleNextRotation();
+
+  }
+
+  /**
+   * Schedules the rotate task using either a fixed time interval scheme or a
+   * relative time interval scheme as specified by the
+   * chukwaCollector.isFixedTimeRotatorScheme configuration parameter. If the
+   * value of this parameter is true then next rotation will be scheduled at a
+   * fixed offset from the current rotateInterval. This fixed offset is
+   * provided by chukwaCollector.fixedTimeIntervalOffset configuration
+   * parameter.
+   */
+  void scheduleNextRotation(){
+    long delay = rotateInterval;
+    if (if_fixed_interval) {
+      long currentTime = System.currentTimeMillis();
+      delay = getDelayForFixedInterval(currentTime, rotateInterval, offsetInterval);
+    }
     rotateTimer = new Timer();
     rotateTimer.schedule(new TimerTask() {
       public void run() {
         rotate();
       }
-    }, rotateInterval);
+    }, delay);
+  }
 
+  /**
+   * Calculates delay for scheduling the next rotation in case of
+   * FixedTimeRotatorScheme. This delay is the time difference between the
+   * currentTimestamp (t1) and the next time the collector should rotate the
+   * sequence files (t2). t2 is the time when the current rotateInterval ends
+   * plus an offset (as set by chukwaCollector.FixedTimeIntervalOffset).
+   * So, delay = t2 - t1
+   *
+   * @param currentTime - the current timestamp
+   * @param rotateInterval - chukwaCollector.rotateInterval
+   * @param offsetInterval - chukwaCollector.fixedTimeIntervalOffset
+   * @return delay for scheduling next rotation
+   */
+  long getDelayForFixedInterval(long currentTime, long rotateInterval, long offsetInterval){
+    // time since last rounded interval
+    long remainder = (currentTime % rotateInterval);
+    long prevRoundedInterval = currentTime - remainder;
+    long nextRoundedInterval = prevRoundedInterval + rotateInterval;
+    long delay = nextRoundedInterval - currentTime + offsetInterval;
+
+    if (log.isInfoEnabled()) {
+     log.info("currentTime="+currentTime+" prevRoundedInterval="+
+             prevRoundedInterval+" nextRoundedInterval" +
+            "="+nextRoundedInterval+" delay="+delay);
+    }
+
+    return delay;
   }
 
-  
+
   protected void computeTimePeriod() {
     synchronized (calendar) {
       calendar.setTimeInMillis(System.currentTimeMillis());

Modified: incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=1139108&r1=1139107&r2=1139108&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
(original)
+++ incubator/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
Thu Jun 23 22:45:26 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.chukwa.datacol
 import java.io.File;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Date;
+import java.text.SimpleDateFormat;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -73,7 +75,7 @@ public class TestChukwaWriters extends T
         tempDir.mkdirs();
       }
       
-      String outputDirectory = tempDir.getPath() + "/testChukwaWriters_JB_" + System.currentTimeMillis()
+ "/";
+      String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testWriters_" + System.currentTimeMillis()
+ "/";
       
       
       Configuration confSeqWriter = new Configuration();
@@ -176,4 +178,268 @@ public class TestChukwaWriters extends T
     }
     return null;    
   }
+
+  /**
+   * Test to check if the .chukwa files are closing at the time we expect them
+   * to close. This test sets the rotateInterval and offsetInterval to small
+   * values, reads the filename of the first .chukwa file, extracts the
+   * timestamp from its name, calculates the timestamp when the next .chukwa
+   * file should be closed, sleeps for some time (enough for producing the next
+   * .chukwa file), reads the timestamp on the second .chukwa file, and
+   * compares the expected close timestamp with the actual closing timestamp of
+   * the second file.
+   */
+  public void testSeqWriterFixedCloseInterval() {
+    try {
+      long rotateInterval = 10000;
+      long intervalOffset = 3000;
+
+      ChukwaWriter seqWriter = new SeqFileWriter();
+
+      File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
+      if (!tempDir.exists()) {
+        tempDir.mkdirs();
+      }
+
+      String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testSeqWriterFixedCloseInterval_"
+
+              System.currentTimeMillis() + "/";
+
+      Configuration confSeqWriter = new Configuration();
+      confSeqWriter.set("chukwaCollector.rotateInterval", String.valueOf(rotateInterval));
+      confSeqWriter.set("writer.hdfs.filesystem", "file:///");
+      String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
+      confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
+      confSeqWriter.set("chukwaCollector.isFixedTimeRotatorScheme", "true");
+      confSeqWriter.set("chukwaCollector.fixedTimeIntervalOffset", String.valueOf(intervalOffset));
+
+      File directory = new File(seqWriterOutputDir);
+
+      // if some files already exist in this directory then delete them. Files
+      // may exist due to an old test run.
+      File[] files = directory.listFiles();
+      if (files != null) {
+        for(File file: files) {
+          file.delete();
+        }
+      }
+
+      // we do not want our test to fail due to a lag in calling the
+      // scheduleNextRotation() method and creating of first .chukwa file.
+      // So, we will make sure that the rotation starts in the middle (approx)
+      // of the rotateInterval
+      long currentTime = System.currentTimeMillis();
+      long currentTimeInSec = currentTime/1000;
+      long timeAfterPrevRotateInterval = currentTimeInSec % rotateInterval;
+      if(timeAfterPrevRotateInterval > (rotateInterval - 2)){
+        Thread.sleep(2000);
+      }
+
+      seqWriter.init(confSeqWriter);
+      String [] fileNames = directory.list();
+      String firstFileName = "";
+      String initialTimestamp = "";
+      // extracting the close time of first .chukwa file. This timestamp can be
+      // extracted from the file name. An example filename is
+      // 20110531122600002_<host-name>_5f836ece1302899d9a0727e.chukwa
+      for(String file: fileNames) {
+        if ( file.endsWith(".chukwa") ){
+          // set a flag so that later we can identify that this file has been
+          // visited
+          firstFileName = file;
+          // getting just the timestamp part i.e. 20110531122600002 in the
+          // example filename mentioned in the above comment
+          initialTimestamp = file.split("_")[0];
+          // stripping off the millisecond part of timestamp. The timestamp
+          // now becomes 20110531122600
+          initialTimestamp = initialTimestamp.substring(0, initialTimestamp.length()-3);
+          break;
+        }
+      }
+
+      SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddhhmmss");
+      Date initialDate = formatter.parse(initialTimestamp);
+	    long initialDateInMillis = initialDate.getTime();
+
+      // calculate the expected close time of the next .chukwa file.
+      long prevRoundedInterval = initialDateInMillis - (initialDateInMillis %
+              rotateInterval);
+      long expectedNextCloseDate = prevRoundedInterval +
+              rotateInterval + intervalOffset;
+
+      // sleep for a time interval equal to (rotateInterval + offsetInterval).
+      // Only one more .chukwa file will be will be produced in this time
+      // interval.
+      long sleepTime = rotateInterval + intervalOffset;
+
+      Thread.sleep(sleepTime);
+      fileNames = directory.list();
+      String nextTimestamp = "";
+      // extract the timestamp of the second .chukwa file
+      for(String file: fileNames) {
+        if ( file.endsWith(".chukwa") && !file.equals(firstFileName)){
+          nextTimestamp = file.split("_")[0];
+          nextTimestamp = nextTimestamp.substring(0, nextTimestamp.length()-3);
+          break;
+        }
+      }
+
+      Date nextDate = formatter.parse(nextTimestamp);
+      long nextDateInMillis = nextDate.getTime();
+
+      long threshold = 500; //milliseconds
+
+      // test will be successful only if the timestamp on the second .chukwa
+      // file is very close (differs by < 500 ms) to the expected closing
+      // timestamp we calculated.
+      Assert.assertTrue("File not closed at expected time",
+              (nextDateInMillis - expectedNextCloseDate < threshold));
+      seqWriter.close();
+
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Exception in TestChukwaWriters - " +
+              "testSeqFileFixedCloseInterval()," + e.getMessage());
+    }
+}
+
+  /**
+   * Test to check the calculation of the delay interval for rotation in
+   * SeqFileWriter. It uses an array of known currentTimestamps and their
+   * corresponding expectedRotateTimestamps (the next timestamp when the
+   * rotation should happen). The actual timestamp of next rotation is
+   * calculated by adding delay (obtained from getDelayForFixedInterval()) to
+   * the currentTimestamp.
+   */
+  public void testFixedIntervalOffsetCalculation(){
+    try{
+      SeqFileWriter seqFileWriter = new SeqFileWriter();
+      SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
+
+      //rotateInterval >> offsetInterval
+      long rotateInterval = 300000; //5 min
+      long offsetInterval = 60000;  //1 min
+      long currentTimestamps[] = new long[5] ;
+      long expectedRotateTimestamps[] = new long[5];
+
+      Date date = formatter.parse("2011/06/15 01:05:00");
+	    currentTimestamps[0] = date.getTime();
+      expectedRotateTimestamps[0] = 1308125460000L; //2011/06/15 01:11:00
+
+      date = formatter.parse("2011/06/15 01:06:00");
+	    currentTimestamps[1] = date.getTime();
+      expectedRotateTimestamps[1] = 1308125460000L; //2011/06/15 01:11:00
+
+      date = formatter.parse("2011/06/15 01:02:00");
+      currentTimestamps[2] = date.getTime();
+      expectedRotateTimestamps[2] = 1308125160000L; //2011/06/15 01:06:00
+
+      date = formatter.parse("2011/06/15 01:04:00");
+      currentTimestamps[3] = date.getTime();
+      expectedRotateTimestamps[3] = 1308125160000L; //2011/06/15 01:06:00
+
+      //edge case, when there is a change in the "hour"
+      date = formatter.parse("2011/06/15 01:56:00");
+      currentTimestamps[4] = date.getTime();
+      expectedRotateTimestamps[4] = 1308128460000L; //2011/06/15 02:01:00
+
+      int i=0;
+      long expectedDelay = 0;
+      long actualRotateTimestamp = 0;
+      for(; i<5; i++){
+        expectedDelay = seqFileWriter.getDelayForFixedInterval(
+                currentTimestamps[i], rotateInterval, offsetInterval);
+        actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
+        Assert.assertTrue("Incorrect value for delay",
+                (actualRotateTimestamp==expectedRotateTimestamps[i]));
+      }
+
+      //rotateInterval > offsetInterval
+      rotateInterval = 60000; //1 min
+      offsetInterval = 30000; //30 sec
+
+      date = formatter.parse("2011/06/15 01:05:00");
+	    currentTimestamps[0] = date.getTime();
+      expectedRotateTimestamps[0] = 1308125190000L; //2011/06/15 01:06:30
+
+      date = formatter.parse("2011/06/15 01:04:30");
+	    currentTimestamps[1] = date.getTime();
+      expectedRotateTimestamps[1] = 1308125130000L; //2011/06/15 01:05:30
+
+      date = formatter.parse("2011/06/15 01:05:30");
+      currentTimestamps[2] = date.getTime();
+      expectedRotateTimestamps[2] = 1308125190000L; //2011/06/15 01:06:30
+
+      date = formatter.parse("2011/06/15 01:04:00");
+      currentTimestamps[3] = date.getTime();
+      expectedRotateTimestamps[3] = 1308125130000L; //2011/06/15 01:05:30
+
+      //edge case, when there is a change in the "hour"
+      date = formatter.parse("2011/06/15 01:59:30");
+      currentTimestamps[4] = date.getTime();
+      expectedRotateTimestamps[4] = 1308128430000L; //2011/06/15 02:00:30
+
+      for(i=0; i<5; i++){
+        expectedDelay = seqFileWriter.getDelayForFixedInterval(
+                currentTimestamps[i], rotateInterval, offsetInterval);
+        actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
+        Assert.assertTrue("Incorrect value for delay",
+                (actualRotateTimestamp==expectedRotateTimestamps[i]));
+      }
+
+      //rotateInterval = offsetInterval
+      rotateInterval = 60000; //1 min
+      offsetInterval = 60000; //1 min
+
+      date = formatter.parse("2011/06/15 01:02:00");
+      currentTimestamps[0] = date.getTime();
+      expectedRotateTimestamps[0] = 1308125040000L; //2011/06/15 01:04:00
+
+      date = formatter.parse("2011/06/15 01:02:30");
+      currentTimestamps[1] = date.getTime();
+      expectedRotateTimestamps[1] = 1308125040000L; //2011/06/15 01:04:00
+
+      //edge case, when there is a change in the "hour"
+      date = formatter.parse("2011/06/15 01:59:30");
+      currentTimestamps[2] = date.getTime();
+      expectedRotateTimestamps[2] = 1308128460000L; //2011/06/15 02:01:00
+
+      for(i=0; i<3; i++){
+        expectedDelay = seqFileWriter.getDelayForFixedInterval(
+                currentTimestamps[i], rotateInterval, offsetInterval);
+        actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
+        Assert.assertTrue("Incorrect value for delay",
+                (actualRotateTimestamp==expectedRotateTimestamps[i]));
+      }
+
+      //rotateInterval < offsetInterval
+      rotateInterval = 60000; //1 min
+      offsetInterval = 120000; //2 min
+
+      date = formatter.parse("2011/06/15 01:02:00");
+      currentTimestamps[0] = date.getTime();
+      expectedRotateTimestamps[0] = 1308125100000L; //2011/06/15 01:05:00
+
+      date = formatter.parse("2011/06/15 01:02:30");
+      currentTimestamps[1] = date.getTime();
+      expectedRotateTimestamps[1] = 1308125100000L; //2011/06/15 01:05:00
+
+      //edge case, when there is a change in the "hour"
+      date = formatter.parse("2011/06/15 01:59:30");
+      currentTimestamps[2] = date.getTime();
+      expectedRotateTimestamps[2] = 1308128520000L; //2011/06/15 02:02:00
+
+      for(i=0; i<3; i++){
+        expectedDelay = seqFileWriter.getDelayForFixedInterval(
+                currentTimestamps[i], rotateInterval, offsetInterval);
+        actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
+        Assert.assertTrue("Incorrect value for delay",
+                (actualRotateTimestamp==expectedRotateTimestamps[i]));
+      }
+
+    } catch (Throwable e) {
+      e.printStackTrace();
+      Assert.fail("Exception in TestChukwaWriters - " +
+              "testFixedIntervalOffsetCalculation()," + e.getMessage());
+    }
+  }
 }



Mime
View raw message