hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r735223 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/
Date Sat, 17 Jan 2009 03:37:56 GMT
Author: cdouglas
Date: Fri Jan 16 19:37:56 2009
New Revision: 735223

URL: http://svn.apache.org/viewvc?rev=735223&view=rev
Log:
HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
more modular and testable. Contributed by Ari Rabkin

Added:
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=735223&r1=735222&r2=735223&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Jan 16 19:37:56 2009
@@ -555,6 +555,9 @@
     HADOOP-4818. Pass user config to instrumentation API. (Eric Yang via
     cdouglas)
 
+    HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
+    more modular and testable. (Ari Rabkin via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=735223&r1=735222&r2=735223&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Fri Jan 16 19:37:56 2009
@@ -41,7 +41,6 @@
  */
 public class ChukwaAgent
 {
-  boolean DO_CHECKPOINT_RESTORE = true;
   //boolean WRITE_CHECKPOINTS = true;
 
   static Logger log = Logger.getLogger(ChukwaAgent.class);
@@ -87,9 +86,7 @@
 
   private File checkpointDir; // lock this object to indicate checkpoint in
   // progress
-  private File initialAdaptors;
   private String CHECKPOINT_BASE_NAME; // base filename for checkpoint files
-  private int CHECKPOINT_INTERVAL_MS; // min interval at which to write
   // checkpoints
   private static String tags = "";
 
@@ -122,7 +119,8 @@
             "[default collector URL]");
         System.exit(0);
       }
-      ChukwaAgent localAgent = new ChukwaAgent();
+      Configuration conf = readConfig();
+      ChukwaAgent localAgent = new ChukwaAgent(conf);
 
       if (agent.anotherAgentIsRunning())
       {
@@ -135,15 +133,9 @@
       int uriArgNumber = 0;
       if (args.length > 0)
       {
-        if (args[0].equalsIgnoreCase("-noCheckPoint"))
-        {
-          agent.DO_CHECKPOINT_RESTORE = false;
-          uriArgNumber = 1;
-        }
         if (args[uriArgNumber].equals("local"))
           agent.connector = new ConsoleOutConnector(agent);
-        else
-        {
+        else  {
           if (!args[uriArgNumber].contains("://"))
             args[uriArgNumber] = "http://" + args[uriArgNumber];
           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
@@ -182,33 +174,67 @@
   {
     return adaptorsByNumber.size();
   }
+  
 
-  public ChukwaAgent() throws AlreadyRunningException
-  {
-    ChukwaAgent.agent = this;
+  public ChukwaAgent() throws AlreadyRunningException {
+    this(new Configuration());
+  }
 
-    readConfig();
+  public ChukwaAgent(Configuration conf) throws AlreadyRunningException  {
+    ChukwaAgent.agent = this;
+    this.conf = conf;
 
     // almost always just reading this; so use a ConcurrentHM.
     // since we wrapped the offset, it's not a structural mod.
     adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
     adaptorsByNumber = new HashMap<Long, Adaptor>();
     checkpointNumber = 0;
-    try
-    {
-      if (DO_CHECKPOINT_RESTORE)
+    
+    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
+        true);
+    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+        "chukwa_checkpoint_");
+    final int CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
+        5000);
+    
+    if(conf.get("chukwaAgent.checkpoint.dir") != null)
+      checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
+    else
+      DO_CHECKPOINT_RESTORE = false;
+    
+    if (checkpointDir!= null && !checkpointDir.exists())  {
+      checkpointDir.mkdirs();
+    }
+    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+
+    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+    log.info("Config - checkpointDir: [" + checkpointDir + "]");
+    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+        + "]");
+    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
+    log.info("Config - tags: [" + tags + "]");
+
+    if (DO_CHECKPOINT_RESTORE) {
+      needNewCheckpoint = true;
+      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+    }
+    
+    File initialAdaptors = null;
+    if(conf.get("chukwaAgent.initial_adaptors") != null)
+      initialAdaptors= new File( conf.get("chukwaAgent.initial_adaptors"));
+    
+    try {
+      if (DO_CHECKPOINT_RESTORE) {
         restoreFromCheckpoint();
-    } catch (IOException e)
-    {
+      }
+    } catch (IOException e) {
       log.warn("failed to restart from checkpoint: ", e);
     }
 
-    try
-    {
-      if (initialAdaptors != null && initialAdaptors.exists())
-        readAdaptorsFile(initialAdaptors);
-    } catch (IOException e)
-    {
+    try {
+      if (initialAdaptors != null && initialAdaptors.exists() && checkpointNumber
==0)
+        readAdaptorsFile(initialAdaptors); //don't read after checkpoint restore
+    } catch (IOException e) {
       log.warn("couldn't read user-specified file "
           + initialAdaptors.getAbsolutePath());
     }
@@ -221,8 +247,7 @@
       controlSock.start(); // this sets us up as a daemon
       log.info("control socket started on port " + controlSock.portno);
 
-      if (CHECKPOINT_INTERVAL_MS > 0)
-      {
+      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir!= null)  {
         checkpointer = new Timer();
         checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
       }
@@ -242,7 +267,7 @@
   // but can be arbitrarily many space
   // delimited agent specific params )
   // 4) offset
-  Pattern addCmdPattern = Pattern.compile("add\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
+  Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+(\\S+)\\s+(\\S+)\\s+(.*\\S)?\\s*(\\d+)\\s*");
   // FIXME: should handle bad lines here
   public long processCommand(String cmd)
   {
@@ -293,8 +318,10 @@
         }
       }
     } else
-      log.warn("only 'add' command supported in config files");
-
+      if(cmd.length() > 0)
+        log.warn("only 'add' command supported in config files");
+      //no warning for blank line
+    
     return -1;
   }
 
@@ -351,7 +378,7 @@
         }
       }
 
-      checkpointNumber = lowestIndex;
+      checkpointNumber = lowestIndex+1;
       File checkpoint = new File(checkpointDir, lowestName);
       readAdaptorsFile(checkpoint);
     }
@@ -469,7 +496,10 @@
     synchronized (adaptorsByNumber) {
       toStop = adaptorsByNumber.remove(number);
     }
-    if (toStop == null) {
+    
+    if (toStop != null) {
+      adaptorPositions.remove(toStop); 
+    } else {
       log.warn("trying to stop adaptor " + number + " that isn't running");
       return offset;
     }
@@ -494,60 +524,31 @@
     return connector;
   }
 
-  protected void readConfig() {
-    conf = new Configuration();
-
-    String chukwaHome = System.getenv("CHUKWA_HOME");
-    if (chukwaHome == null) {
-      chukwaHome = ".";
-    }
-
-    if (!chukwaHome.endsWith("/")) {
-      chukwaHome = chukwaHome + File.separator;
-    }
-    log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
-
-    String chukwaConf = System.getProperty("CHUKWA_CONF_DIR");
-    if (chukwaConf == null) {
-      chukwaConf = chukwaHome + "conf" + File.separator;
-    }
-    if (!chukwaHome.endsWith("/")) {
-      chukwaHome = chukwaHome + File.separator;
-    }
-    if (!chukwaConf.endsWith("/")) {
-        chukwaConf = chukwaConf + File.separator;    	
-    }
-    log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
-
-    conf.addResource(new Path(chukwaConf + "chukwa-agent-conf.xml"));
-    DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
-        true);
-    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
-        "chukwa_checkpoint_");
-    checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", chukwaHome
-        + "/var/"));
-    CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
-        5000);
-    if (!checkpointDir.exists())
-    {
-      checkpointDir.mkdirs();
-    }
-    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+  protected static Configuration readConfig() {
+    Configuration conf = new Configuration();
 
-    log.info("Config - chukwaHome: [" + chukwaHome + "]");
-    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
-    log.info("Config - checkpointDir: [" + checkpointDir + "]");
-    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
-        + "]");
-    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
-    log.info("Config - tags: [" + tags + "]");
-
-    if (DO_CHECKPOINT_RESTORE) {
-      needNewCheckpoint = true;
-      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
-    }
-
-    initialAdaptors = new File(chukwaConf + "initial_adaptors");
+    String chukwaHomeName = System.getenv("CHUKWA_HOME");
+    if (chukwaHomeName == null) {
+      chukwaHomeName = "";
+    }
+    File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
+
+    log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
+
+    String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
+    File chukwaConf;
+    if (chukwaConfName != null)
+      chukwaConf = new File(chukwaConfName).getAbsoluteFile();
+    else
+      chukwaConf = new File(chukwaHome, "conf");
+    
+    log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
+    File agentConf = new File(chukwaConf,"chukwa-agent-conf.xml");
+    conf.addResource(new Path(agentConf.getAbsolutePath()));
+    if(conf.get("chukwaAgent.checkpoint.dir") == null)
+      conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var").getAbsolutePath());
+    conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf, "initial_adaptors").getAbsolutePath());
+    return conf;
   }
 
   public void shutdown() {
@@ -559,15 +560,15 @@
    * explicitly. It probably should.
    */
   public void shutdown(boolean exit) {
-    if (checkpointer != null)
-      checkpointer.cancel();
     controlSock.shutdown(); // make sure we don't get new requests
-    try {
-      if (needNewCheckpoint)
+    if (checkpointer != null) {
+      checkpointer.cancel();
+      try {
         writeCheckpoint(); // write a last checkpoint here, before stopping
-      // adaptors
-    } catch (IOException e) {
+      } catch (IOException e) {
+      }
     }
+      // adaptors
 
     synchronized (adaptorsByNumber) { 
       // shut down each adaptor
@@ -580,6 +581,8 @@
         }
       }
     }
+    adaptorsByNumber.clear();
+    adaptorPositions.clear();
     if (exit)
       System.exit(0);
   }

Added: hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java?rev=735223&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
(added)
+++ hadoop/core/branches/branch-0.20/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
Fri Jan 16 19:37:56 2009
@@ -0,0 +1,76 @@
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.io.*;
+
+import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestAgentConfig extends TestCase {
+  public void testInitAdaptors_vs_Checkpoint() {
+    try {
+        //create two target files, foo and bar
+      File foo = File.createTempFile("foo", "test");
+      foo.deleteOnExit();
+      PrintStream ps = new PrintStream(new FileOutputStream(foo));
+      ps.println("foo");
+      ps.close();
+
+      File bar = File.createTempFile("bar", "test");
+      bar.deleteOnExit();
+      ps = new PrintStream(new FileOutputStream(bar));
+      ps.println("bar");
+      ps.close();
+      
+        //initially, read foo
+      File initialAdaptors = File.createTempFile("initial", "adaptors");
+      initialAdaptors.deleteOnExit();
+      ps = new PrintStream(new FileOutputStream(initialAdaptors));
+      ps.println("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
 raw 0 "
+          + foo.getAbsolutePath() +" 0  ");
+      ps.close();
+      
+      Configuration conf = new Configuration();
+      conf.set("chukwaAgent.initial_adaptors", initialAdaptors.getAbsolutePath());
+      File checkpointDir = File.createTempFile("chukwatest", "checkpoint");
+      checkpointDir.delete();
+      checkpointDir.mkdir();
+      checkpointDir.deleteOnExit();
+      conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getAbsolutePath());
+      
+      ChukwaAgent agent = new ChukwaAgent(conf);
+      ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
+      conn.start();
+      assertEquals(1, agent.adaptorCount());//check that we processed initial adaptors
+      assertNotNull(agent.getAdaptorList().get(1L));
+      assertTrue(agent.getAdaptorList().get(1L).getStreamName().contains("foo"));
+      
+      System.out.println("---------------------done with first run, now stopping");
+      agent.shutdown();
+      assertEquals(0, agent.adaptorCount());
+      //at this point, there should be a checkpoint file with a tailer reading foo.
+      //we're going to rewrite initial adaptors to read bar; but after reboot we should
+      //still only be looking at foo.
+      ps = new PrintStream(new FileOutputStream(initialAdaptors, false));//overwrite
+      ps.println("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
 raw 0 "
+          + bar.getAbsolutePath() +" 0  ");
+      ps.close();
+
+      System.out.println("---------------------restarting");
+      agent = new ChukwaAgent(conf);
+      conn = new ConsoleOutConnector(agent, true);
+      conn.start();
+      assertEquals(1, agent.adaptorCount());//check that we processed initial adaptors
+      assertNotNull(agent.getAdaptorList().get(1L));
+      assertTrue(agent.getAdaptorList().get(1L).getStreamName().contains("foo"));
+      agent.shutdown();
+      System.out.println("---------------------done");
+      
+      
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail(e.toString());
+    }
+  }
+}



Mime
View raw message