chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r806419 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop/chukwa/datacollection/sender/ src/test/org/apache/hadoop/chukwa/database/ src/test/org/apache/hadoop/chukwa/datacollection/co...
Date Fri, 21 Aug 2009 04:39:40 GMT
Author: asrabkin
Date: Fri Aug 21 04:39:39 2009
New Revision: 806419

URL: http://svn.apache.org/viewvc?rev=806419&view=rev
Log:
CHUKWA-379.  Refactor sender code.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestAcksOnFailure.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Aug 21 04:39:39 2009
@@ -46,6 +46,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-379.  Refactor sender code. (asrabkin)
+
     CHUKWA-374.  Adaptor.getStatus() shouldn't throw exceptions. (asrabkin)
 
     CHUKWA-373.  Test code for backpressure. (asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
Fri Aug 21 04:39:39 2009
@@ -67,7 +67,7 @@
    * @return empty list if file does not exist
    * @throws IOException on other error
    */
-  public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
+  public Iterator<String> getCollectorURLs(Configuration conf, String filename) throws
IOException {
     String chukwaHome = System.getenv("CHUKWA_HOME");
     if (chukwaHome == null) {
       chukwaHome = ".";
@@ -87,13 +87,16 @@
 
     log.info("setting up collectors file: " + chukwaConf + File.separator
         + COLLECTORS_FILENAME);
-    File collectors = new File(chukwaConf + File.separator + "collectors");
+    File collectors = new File(chukwaConf + File.separator + filename);
     try {
-      return new RetryListOfCollectors(collectors, 1000 * 15, conf);// time is ms
-                                                              // between tries
+      return new RetryListOfCollectors(collectors, conf);
     } catch (java.io.IOException e) {
       log.error("failed to read collectors file: ", e);
       throw e;
     }
   }
+  public Iterator<String> getCollectorURLs(Configuration conf) throws IOException {
+    return getCollectorURLs(conf, "collectors");
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
Fri Aug 21 04:39:39 2009
@@ -32,14 +32,15 @@
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
 import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.HttpMethodBase;
 import org.apache.commons.httpclient.HttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.httpclient.methods.*;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
 import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -48,6 +49,11 @@
 /**
  * Encapsulates all of the http setup and connection details needed for chunks
  * to be delivered to a collector.
+ * 
+ * This class should encapsulate the details of the low level data formatting.
+ * The Connector is responsible for picking what to send and to whom;
+ * retry policy is encoded in the collectors iterator.
+ * 
  * <p>
  * On error, tries the list of available collectors, pauses for a minute, and
  * then repeats.
@@ -67,7 +73,7 @@
   static Logger log = Logger.getLogger(ChukwaHttpSender.class);
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
-  static String currCollector = null;
+  String currCollector = null;
 
   protected Iterator<String> collectors;
 
@@ -122,8 +128,7 @@
     // setup default collector
     ArrayList<String> tmp = new ArrayList<String>();
     this.collectors = tmp.iterator();
-    log
-        .info("added a single collector to collector list in ConnectorClient constructor,
it's hasNext is now: "
+    log.info("added a single collector to collector list in ConnectorClient constructor,
it's hasNext is now: "
             + collectors.hasNext());
 
     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
@@ -133,14 +138,6 @@
   }
 
   /**
-   * Set up a single connector for this client to send {@link Chunk}s to
-   * 
-   * @param collector the url of the collector
-   */
-  public void setCollectors(String collector) {
-  }
-
-  /**
    * Set up a list of connectors for this client to send {@link Chunk}s to
    * 
    * @param collectors
@@ -163,6 +160,7 @@
    * 
    * @return array of chunk id's which were ACKed by collector
    */
+  @Override
   public List<CommitListEntry> send(List<Chunk> toSend)
       throws InterruptedException, IOException {
     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
@@ -190,26 +188,46 @@
     // collect all serialized chunks into a single buffer to send
     RequestEntity postData = new BuffersRequestEntity(serializedEvents);
 
+    PostMethod method = new PostMethod();
+    method.setRequestEntity(postData);
+    log.info(">>>>>> HTTP post to " + currCollector + " length = " + postData.getContentLength());
+
+    return postAndParseResponse(method, commitResults);
+  }
+  
+  public List<CommitListEntry> postAndParseResponse(PostMethod method, List<CommitListEntry>
commitResults)
+  throws IOException, InterruptedException{
+    reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
+    return commitResults;
+  }
+
+  /**
+   *  Responsible for executing the supplied method on at least one collector
+   * @param method
+   * @return
+   * @throws InterruptedException
+   * @throws IOException if no collector responds with an OK
+   */
+  protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws
InterruptedException, IOException {
     int retries = SENDER_RETRIES;
     while (currCollector != null) {
       // need to pick a destination here
-      PostMethod method = new PostMethod();
       try {
-        doPost(method, postData, currCollector);
+
+        // send it across the network    
+        List<String> responses = doRequest(method, currCollector+ pathSuffix);
 
         retries = SENDER_RETRIES; // reset count on success
-        // if no exception was thrown from doPost, ACK that these chunks were
-        // sent
-        return commitResults;
+
+        return responses;
       } catch (Throwable e) {
         log.error("Http post exception");
         log.debug("Http post exception", e);
         ChukwaHttpSender.metrics.httpThrowable.inc();
-        log.info("Checking list of collectors to see if another collector has been specified
for rollover");
         if (collectors.hasNext()) {
           ChukwaHttpSender.metrics.collectorRollover.inc();
+          failedCollector(currCollector);
           currCollector = collectors.next();
-
           log.info("Found a new collector to roll over to, retrying HTTP Post to collector
"
                   + currCollector);
         } else {
@@ -229,14 +247,23 @@
         method.releaseConnection();
       }
     } // end retry loop
-    return new ArrayList<CommitListEntry>();
+    return new ArrayList<String>();
+  }
+
+  /**
+   * A hook for taking action when a collector is declared failed.
+   * @param downCollector
+   */
+  protected void failedCollector(String downCollector) {
+    log.debug("declaring "+ downCollector + " down");
   }
 
   /**
-   * Handles the HTTP post. Throws HttpException on failure
+   * Responsible for performing a single operation to a specified collector URL.
+   * 
+   * @param dest the URL being requested. (Including hostname)
    */
-  @SuppressWarnings("deprecation")
-  private void doPost(PostMethod method, RequestEntity data, String dest)
+  protected List<String> doRequest(HttpMethodBase method, String dest)
       throws IOException, HttpException {
 
     HttpMethodParams pars = method.getParams();
@@ -253,11 +280,6 @@
     method.setParams(pars);
     method.setPath(dest);
 
-    // send it across the network
-    method.setRequestEntity(data);
-
-    log.info(">>>>>> HTTP post to " + dest + " length = "
-        + data.getContentLength());
     // Send POST request
     ChukwaHttpSender.metrics.httpPost.inc();
     
@@ -271,13 +293,12 @@
         ChukwaHttpSender.metrics.httpTimeOutException.inc();
       }
       
-      log.error(">>>>>> HTTP post response statusCode: " + statusCode
-          + ", statusLine: " + method.getStatusLine());
+      log.error(">>>>>> HTTP response from " + dest + " statusLine: " +
method.getStatusLine());
       // do something aggressive here
       throw new HttpException("got back a failure from server");
     }
     // implicitly "else"
-    log.info(">>>>>> HTTP Got success back from the remote collector; response
length "
+    log.info(">>>>>> HTTP Got success back from "+ dest + "; response length
"
             + method.getResponseContentLength());
 
     // FIXME: should parse acks here
@@ -288,10 +309,13 @@
     rstream = new ByteArrayInputStream(resp_buf);
     BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
     String line;
+    List<String> resp = new ArrayList<String>();
     while ((line = br.readLine()) != null) {
       if (log.isDebugEnabled()) {
         log.debug("response: " + line);
       }
+      resp.add(line);
     }
+    return resp;
   }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
Fri Aug 21 04:39:39 2009
@@ -34,7 +34,7 @@
  * 
  * 
  */
-public class RetryListOfCollectors implements Iterator<String> {
+public class RetryListOfCollectors implements Iterator<String>, Cloneable {
 
   int maxRetryRateMs;
   List<String> collectors;
@@ -42,42 +42,29 @@
   int nextCollector = 0;
   private String portNo;
   Configuration conf;
+  public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate";
 
-  public RetryListOfCollectors(File collectorFile, int maxRetryRateMs, Configuration conf)
+  public RetryListOfCollectors(File collectorFile, Configuration conf)
       throws IOException {
-    this.maxRetryRateMs = maxRetryRateMs;
-    lastLookAtFirstNode = 0;
     collectors = new ArrayList<String>();
     this.conf = conf;
     portNo = conf.get("chukwaCollector.http.port", "8080");
-
+    maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
     try {
       BufferedReader br = new BufferedReader(new FileReader(collectorFile));
       String line, parsedline;
       while ((line = br.readLine()) != null) {
-        if (!line.contains("://")) {
-          // no protocol, assume http
-          if (line.matches(".*:\\d+.*")) {
-            parsedline = "http://" + line+"/";
-          } else {
-            parsedline = "http://" + line + ":" + portNo;
-          }
-        } else {
-          if (line.matches(".*:\\d+.*")) {
-            parsedline = line;
-          } else {
-            parsedline = line + ":" + portNo;
-          }
-        }
-        if(!parsedline.matches(".*\\w/.*")) //no resource name
-          parsedline = parsedline+"/";
+        parsedline = canonicalizeLine(line);
         collectors.add(parsedline);
       }
       
       br.close();
     } catch (FileNotFoundException e) {
-      System.err
-          .println("Error in RetryListOfCollectors() opening file: collectors, double check
that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is
in classpath");
+      System.err.println("Error in RetryListOfCollectors() opening file"
+            + collectorFile.getCanonicalPath() + ", double check that you have"
+            + "set the CHUKWA_CONF_DIR environment variable. Also, ensure file"
+            + " exists and is in classpath");
+      throw e;
     } catch (IOException e) {
       System.err
           .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified
collectors file");
@@ -86,14 +73,35 @@
     shuffleList();
   }
 
+  private String canonicalizeLine(String line) {
+    String parsedline;
+    if (!line.contains("://")) {
+      // no protocol, assume http
+      if (line.matches(".*:\\d+.*")) {
+        parsedline = "http://" + line+"/";
+      } else {
+        parsedline = "http://" + line + ":" + portNo;
+      }
+    } else {
+      if (line.matches(".*:\\d+.*")) {
+        parsedline = line;
+      } else {
+        parsedline = line + ":" + portNo;
+      }
+    }
+    if(!parsedline.matches(".*\\w/.*")) //no resource name
+      parsedline = parsedline+"/";
+    return parsedline;
+  }
+
   /**
    * This is only used for debugging. Possibly it should sanitize urls the same way the other
    * constructor does.
    * @param collectors
    * @param maxRetryRateMs
    */
-  public RetryListOfCollectors(final List<String> collectors, int maxRetryRateMs) {
-    this.maxRetryRateMs = maxRetryRateMs;
+  public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
+    maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
     lastLookAtFirstNode = 0;
     this.collectors = new ArrayList<String>();
     this.collectors.addAll(collectors);
@@ -101,7 +109,7 @@
   }
 
   // for now, use a simple O(n^2) algorithm.
-  // safe, because we only do this once, and on smalls list
+  // safe, because we only do this once, and on smallish lists
   private void shuffleList() {
     ArrayList<String> newList = new ArrayList<String>();
     Random r = new java.util.Random();
@@ -152,5 +160,14 @@
   int total() {
     return collectors.size();
   }
+  
+  public RetryListOfCollectors clone() {
+    try {
+      RetryListOfCollectors clone = (RetryListOfCollectors) super.clone();
+      return clone;
+    } catch(CloneNotSupportedException e) {
+      return null;
+    }
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/database/TestDatabaseIostat.java
Fri Aug 21 04:39:39 2009
@@ -164,7 +164,7 @@
       sender = new ChukwaHttpSender(conf);
       ArrayList<String> collectorList = new ArrayList<String>();
       collectorList.add("http://localhost:"+collectorPort+"/chukwa");
-      sender.setCollectors(new RetryListOfCollectors(collectorList, 50));
+      sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
     } catch (AlreadyRunningException e) {
       fail("Chukwa Agent is already running");
     }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
Fri Aug 21 04:39:39 2009
@@ -46,7 +46,7 @@
       ChukwaHttpSender sender = new ChukwaHttpSender(conf);
       ArrayList<String> collectorList = new ArrayList<String>();
       collectorList.add("http://localhost:9990/chukwa");
-      sender.setCollectors(new RetryListOfCollectors(collectorList, 50));
+      sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
       Server server = new Server(9990);
       Context root = new Context(server, "/", Context.SESSIONS);
 

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestAcksOnFailure.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestAcksOnFailure.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestAcksOnFailure.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestAcksOnFailure.java
Fri Aug 21 04:39:39 2009
@@ -37,7 +37,7 @@
     ChukwaHttpSender send = new ChukwaHttpSender(conf);
     ArrayList<String> collectors = new ArrayList<String>();
     collectors.add("http://somehost.invalid/chukwa");
-    send.setCollectors(new RetryListOfCollectors(collectors, 1000));
+    send.setCollectors(new RetryListOfCollectors(collectors, conf));
     
     byte[] data = "sometestdata".getBytes();
     Adaptor a = new FileTailingAdaptor();

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java?rev=806419&r1=806418&r2=806419&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/sender/TestRetryListOfCollectors.java
Fri Aug 21 04:39:39 2009
@@ -32,7 +32,8 @@
     hosts.add("host2");
     hosts.add("host3");
     hosts.add("host4");
-    RetryListOfCollectors rloc = new RetryListOfCollectors(hosts, 2000);
+    Configuration conf = new Configuration();
+    RetryListOfCollectors rloc = new RetryListOfCollectors(hosts, conf);
     assertEquals(hosts.size(), rloc.total());
 
     for (int i = 0; i < hosts.size(); ++i) {
@@ -74,7 +75,7 @@
     
     Configuration conf = new Configuration();
     conf.setInt("chukwaCollector.http.port", 5052);
-    RetryListOfCollectors rloc = new RetryListOfCollectors(tmpOutput, 2000, conf);
+    RetryListOfCollectors rloc = new RetryListOfCollectors(tmpOutput, conf);
     for (int i = 0; i < validHosts.size(); ++i) {
       assertTrue(rloc.hasNext());
       String s = rloc.next();



Mime
View raw message