Return-Path: X-Original-To: apmail-incubator-chukwa-commits-archive@www.apache.org Delivered-To: apmail-incubator-chukwa-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AA01AE37A for ; Tue, 20 Nov 2012 19:44:34 +0000 (UTC) Received: (qmail 92493 invoked by uid 500); 20 Nov 2012 19:44:34 -0000 Delivered-To: apmail-incubator-chukwa-commits-archive@incubator.apache.org Received: (qmail 92469 invoked by uid 500); 20 Nov 2012 19:44:34 -0000 Mailing-List: contact chukwa-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@incubator.apache.org Delivered-To: mailing list chukwa-commits@incubator.apache.org Received: (qmail 92461 invoked by uid 99); 20 Nov 2012 19:44:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2012 19:44:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Nov 2012 19:44:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9FAF92388906; Tue, 20 Nov 2012 19:44:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1411817 - in /incubator/chukwa/trunk: ./ conf/ src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ Date: Tue, 20 Nov 2012 19:44:09 -0000 To: chukwa-commits@incubator.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121120194409.9FAF92388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eyang Date: Tue Nov 20 19:44:08 2012 New Revision: 1411817 URL: http://svn.apache.org/viewvc?rev=1411817&view=rev Log: CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay via Eric Yang) Modified: incubator/chukwa/trunk/CHANGES.txt incubator/chukwa/trunk/conf/chukwa-common.xml incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Modified: incubator/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1411817&r1=1411816&r2=1411817&view=diff ============================================================================== --- incubator/chukwa/trunk/CHANGES.txt (original) +++ incubator/chukwa/trunk/CHANGES.txt Tue Nov 20 19:44:08 2012 @@ -6,6 +6,8 @@ Trunk (unreleased changes) CHUKWA-635. Collect swap usage. (Eric Yang) + CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay via Eric Yang) + IMPROVEMENTS CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin) Modified: incubator/chukwa/trunk/conf/chukwa-common.xml URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-common.xml?rev=1411817&r1=1411816&r2=1411817&view=diff ============================================================================== --- incubator/chukwa/trunk/conf/chukwa-common.xml (original) +++ incubator/chukwa/trunk/conf/chukwa-common.xml Tue Nov 20 19:44:08 2012 @@ -33,4 +33,18 @@ Location of Chukwa data on HDFS + + Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1411817&r1=1411816&r2=1411817&view=diff ============================================================================== --- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original) +++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Nov 20 19:44:08 2012 @@ -740,6 +740,7 @@ public class ChukwaAgent implements Adap log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]"); File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml"); conf.addResource(new Path(agentConf.getAbsolutePath())); + conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath())); if (conf.get("chukwaAgent.checkpoint.dir") == null) conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var") .getAbsolutePath()); Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1411817&r1=1411816&r2=1411817&view=diff ============================================================================== --- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original) +++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Tue Nov 20 19:44:08 2012 @@ -21,19 +21,29 @@ package org.apache.hadoop.chukwa.datacol import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; -import java.util.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.ChunkImpl; -import org.apache.hadoop.chukwa.datacollection.writer.*; +import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter; +import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; +import org.apache.hadoop.chukwa.datacollection.writer.WriterException; import org.apache.hadoop.chukwa.util.DaemonWatcher; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; public class ServletCollector extends HttpServlet { @@ -48,6 +58,10 @@ public class ServletCollector extends Ht private static final long serialVersionUID = 6286162898591407111L; Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class); + + static boolean COMPRESS; + static String CODEC_NAME; + static CompressionCodec codec; public void setWriter(ChukwaWriter w) { writer = w; @@ -103,6 +117,20 @@ public class ServletCollector extends Ht log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e); } + COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false); + if( COMPRESS) { + CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec"); + Class codecClass = null; + try { + codecClass = Class.forName( CODEC_NAME); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); + log.info("codec " + CODEC_NAME + " loaded for network compression"); + } catch (ClassNotFoundException e) { + log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e); + COMPRESS = false; + } + } + // We default to here if the pipeline construction failed or didn't happen. try { if (writer == null) { @@ -132,7 +160,17 @@ public class ServletCollector extends Ht java.io.InputStream in = req.getInputStream(); ServletOutputStream l_out = resp.getOutputStream(); - final DataInputStream di = new DataInputStream(in); + + DataInputStream di = null; + boolean compressNetwork = COMPRESS; + if( compressNetwork){ + InputStream cin = codec.createInputStream( in); + di = new DataInputStream(cin); + } + else { + di = new DataInputStream(in); + } + final int numEvents = di.readInt(); // log.info("saw " + numEvents+ " in request"); Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=1411817&r1=1411816&r2=1411817&view=diff ============================================================================== --- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original) +++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Tue Nov 20 19:44:08 2012 @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; + import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpException; import org.apache.commons.httpclient.HttpMethod; @@ -36,13 +37,17 @@ import org.apache.commons.httpclient.Htt import org.apache.commons.httpclient.HttpMethodRetryHandler; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; -import org.apache.commons.httpclient.methods.*; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; /** @@ -81,6 +86,10 @@ public class ChukwaHttpSender implements int postID = 0; protected Iterator collectors; + + static boolean COMPRESS; + static String CODEC_NAME; + static CompressionCodec codec; static { connectionManager = new MultiThreadedHttpConnectionManager(); @@ -106,12 +115,21 @@ public class ChukwaHttpSender implements public BuffersRequestEntity(List buf) { buffers = buf; } + + private long getUncompressedContentLenght(){ + long len = 4;// first we send post length, then buffers + for (DataOutputBuffer b : buffers) + len += b.getLength(); + return len; + } public long getContentLength() { - long len = 4;// first we send post length, then buffers - for (DataOutputBuffer b : buffers) - len += b.getLength(); - return len; + if( COMPRESS) { + return -1; + } + else { + return getUncompressedContentLenght(); + } } public String getContentType() { @@ -122,11 +140,23 @@ public class ChukwaHttpSender implements return true; } + private void doWriteRequest( DataOutputStream out ) throws IOException { + out.writeInt(buffers.size()); + for (DataOutputBuffer b : buffers) + out.write(b.getData(), 0, b.getLength()); + } + public void writeRequest(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeInt(buffers.size()); - for (DataOutputBuffer b : buffers) - dos.write(b.getData(), 0, b.getLength()); + if( COMPRESS) { + CompressionOutputStream cos = codec.createOutputStream(out); + DataOutputStream dos = new DataOutputStream( cos); + doWriteRequest( dos); + cos.finish(); + } + else { + DataOutputStream dos = new DataOutputStream( out); + doWriteRequest( dos); + } } } @@ -140,6 +170,19 @@ public class ChukwaHttpSender implements WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval", 20 * 1000); COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000); + COMPRESS = c.getBoolean("chukwaAgent.output.compress", false); + if( COMPRESS) { + CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec"); + Class codecClass = null; + try { + codecClass = Class.forName( CODEC_NAME); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c); + log.info("codec " + CODEC_NAME + " loaded for network compression"); + } catch (ClassNotFoundException e) { + log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e); + COMPRESS = false; + } + } } /** @@ -199,7 +242,16 @@ public class ChukwaHttpSender implements PostMethod method = new PostMethod(); method.setRequestEntity(postData); - log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength()); + StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_"); + sb.append( thisPost).append( " to ").append( currCollector).append( " length = "); + if( COMPRESS) { + sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght()) + .append( " of uncompressed data"); + } + else { + sb.append( postData.getContentLength()); + } + log.info( sb); List results = postAndParseResponse(method, commitResults); log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");