incubator-chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r1411817 - in /incubator/chukwa/trunk: ./ conf/ src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/main/java/org/apache/hadoop/chukwa/datacollection/sender/
Date Tue, 20 Nov 2012 19:44:09 GMT
Author: eyang
Date: Tue Nov 20 19:44:08 2012
New Revision: 1411817

URL: http://svn.apache.org/viewvc?rev=1411817&view=rev
Log:
CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay via
Eric Yang)

Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/conf/chukwa-common.xml
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1411817&r1=1411816&r2=1411817&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Tue Nov 20 19:44:08 2012
@@ -6,6 +6,8 @@ Trunk (unreleased changes)
 
     CHUKWA-635. Collect swap usage. (Eric Yang)
 
+    CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay
via Eric Yang)
+
   IMPROVEMENTS
 
     CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)

Modified: incubator/chukwa/trunk/conf/chukwa-common.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-common.xml?rev=1411817&r1=1411816&r2=1411817&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-common.xml (original)
+++ incubator/chukwa/trunk/conf/chukwa-common.xml Tue Nov 20 19:44:08 2012
@@ -33,4 +33,18 @@
     <description>Location of Chukwa data on HDFS</description>
   </property>
 
+  <!-- uncomment to enable network compression
+  <property>
+    <name>chukwaAgent.output.compress</name>
+    <value>true</value>
+    <description>boolean: true if we want to compress data on the wire between the
agent and the collector</description>
+  </property>
+
+  <property>
+    <name>chukwaAgent.output.compression.type</name>
+	<value>org.apache.hadoop.io.compress.GzipCodec</value>
+    <description>compression codec if the network data should be compressed</description>
+  </property>
+  -->
+
 </configuration>

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1411817&r1=1411816&r2=1411817&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Tue Nov 20 19:44:08 2012
@@ -740,6 +740,7 @@ public class ChukwaAgent implements Adap
     log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
     File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
     conf.addResource(new Path(agentConf.getAbsolutePath()));
+    conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath()));
     if (conf.get("chukwaAgent.checkpoint.dir") == null)
       conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
           .getAbsolutePath());

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1411817&r1=1411816&r2=1411817&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
Tue Nov 20 19:44:08 2012
@@ -21,19 +21,29 @@ package org.apache.hadoop.chukwa.datacol
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
-import java.util.*;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import org.apache.hadoop.conf.Configuration;
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.datacollection.writer.*;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 
 public class ServletCollector extends HttpServlet {
@@ -48,6 +58,10 @@ public class ServletCollector extends Ht
 
   private static final long serialVersionUID = 6286162898591407111L;
   Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
+  
+  static boolean COMPRESS;
+  static String CODEC_NAME;
+  static CompressionCodec codec;
 
   public void setWriter(ChukwaWriter w) {
     writer = w;
@@ -103,6 +117,20 @@ public class ServletCollector extends Ht
       log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
     }
 
+    COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
+    if( COMPRESS) {
+	    CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
+	    Class<?> codecClass = null;
+	    try {
+			codecClass = Class.forName( CODEC_NAME);
+			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+			log.info("codec " + CODEC_NAME + " loaded for network compression");
+		} catch (ClassNotFoundException e) {
+			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.",
e);
+			COMPRESS = false;
+		}
+    }
+    
     // We default to here if the pipeline construction failed or didn't happen.
     try {
       if (writer == null) {
@@ -132,7 +160,17 @@ public class ServletCollector extends Ht
       java.io.InputStream in = req.getInputStream();
 
       ServletOutputStream l_out = resp.getOutputStream();
-      final DataInputStream di = new DataInputStream(in);
+      
+      DataInputStream di = null;
+      boolean compressNetwork = COMPRESS;
+      if( compressNetwork){
+          InputStream cin = codec.createInputStream( in);
+          di = new DataInputStream(cin);
+      }
+      else {
+    	  di = new DataInputStream(in);
+      }
+
       final int numEvents = di.readInt();
       // log.info("saw " + numEvents+ " in request");
 

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=1411817&r1=1411816&r2=1411817&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
Tue Nov 20 19:44:08 2012
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
 import org.apache.commons.httpclient.HttpMethod;
@@ -36,13 +37,17 @@ import org.apache.commons.httpclient.Htt
 import org.apache.commons.httpclient.HttpMethodRetryHandler;
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.methods.*;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 
 /**
@@ -81,6 +86,10 @@ public class ChukwaHttpSender implements
   int postID = 0;
 
   protected Iterator<String> collectors;
+  
+  static boolean COMPRESS;
+  static String CODEC_NAME;
+  static CompressionCodec codec;
 
   static {
     connectionManager = new MultiThreadedHttpConnectionManager();
@@ -106,12 +115,21 @@ public class ChukwaHttpSender implements
     public BuffersRequestEntity(List<DataOutputBuffer> buf) {
       buffers = buf;
     }
+    
+    private long getUncompressedContentLenght(){
+        long len = 4;// first we send post length, then buffers
+        for (DataOutputBuffer b : buffers)
+          len += b.getLength();
+        return len;
+    }
 
     public long getContentLength() {
-      long len = 4;// first we send post length, then buffers
-      for (DataOutputBuffer b : buffers)
-        len += b.getLength();
-      return len;
+    	if( COMPRESS) {
+    		return -1;
+    	}
+    	else {
+    		return getUncompressedContentLenght();
+    	}
     }
 
     public String getContentType() {
@@ -122,11 +140,23 @@ public class ChukwaHttpSender implements
       return true;
     }
 
+    private void doWriteRequest( DataOutputStream out ) throws IOException {
+        out.writeInt(buffers.size());
+        for (DataOutputBuffer b : buffers)
+      	 out.write(b.getData(), 0, b.getLength());
+    }
+    
     public void writeRequest(OutputStream out) throws IOException {
-      DataOutputStream dos = new DataOutputStream(out);
-      dos.writeInt(buffers.size());
-      for (DataOutputBuffer b : buffers)
-        dos.write(b.getData(), 0, b.getLength());
+      if( COMPRESS) {
+          CompressionOutputStream cos = codec.createOutputStream(out);
+          DataOutputStream dos = new DataOutputStream( cos);
+          doWriteRequest( dos);
+          cos.finish();
+      }
+      else {
+          DataOutputStream dos = new DataOutputStream( out);
+          doWriteRequest( dos);
+      }
     }
   }
 
@@ -140,6 +170,19 @@ public class ChukwaHttpSender implements
     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
         20 * 1000);
     COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
+    COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
+    if( COMPRESS) {
+	    CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
+	    Class<?> codecClass = null;
+	    try {
+			codecClass = Class.forName( CODEC_NAME);
+			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
+			log.info("codec " + CODEC_NAME + " loaded for network compression");
+		} catch (ClassNotFoundException e) {
+			log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.",
e);
+			COMPRESS = false;
+		}
+    }
   }
 
   /**
@@ -199,7 +242,16 @@ public class ChukwaHttpSender implements
 
     PostMethod method = new PostMethod();
     method.setRequestEntity(postData);
-    log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + "
length = " + postData.getContentLength());
+    StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
+    sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
+	if( COMPRESS) {
+		sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
+			.append( " of uncompressed data");
+	}
+	else {
+		sb.append( postData.getContentLength());
+	}
+	log.info( sb);
 
     List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
     log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size()
+ " acks");



Mime
View raw message