Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 63E6A11E26 for ; Fri, 27 Jun 2014 03:59:09 +0000 (UTC) Received: (qmail 48466 invoked by uid 500); 27 Jun 2014 03:59:09 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 48433 invoked by uid 500); 27 Jun 2014 03:59:09 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 48424 invoked by uid 99); 27 Jun 2014 03:59:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jun 2014 03:59:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B530D918EF2; Fri, 27 Jun 2014 03:59:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jarcec@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers Date: Fri, 27 Jun 2014 03:59:08 +0000 (UTC) Repository: flume Updated Branches: refs/heads/flume-1.6 4c5b602c5 -> d56feccfb FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d56feccf Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d56feccf Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d56feccf Branch: refs/heads/flume-1.6 Commit: d56feccfb922165ab35a856f3d2cc65649093571 Parents: 4c5b602 Author: Jarek Jarcec Cecho Authored: Thu Jun 26 20:57:31 2014 -0700 Committer: Jarek Jarcec Cecho Committed: Thu Jun 26 20:59:01 2014 -0700 ---------------------------------------------------------------------- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d56feccf/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index fe857c3..dc93e4f 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -29,8 +29,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { private Context serializerContext; private EventSerializer serializer; private boolean useRawLocalFileSystem; + private Compressor compressor; @Override public void configure(Context context) { @@ -83,7 +86,6 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } - boolean appending = false; if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { @@ -92,7 +94,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { } else { fsOut = hdfs.create(dstPath); } - cmpOut = codec.createOutputStream(fsOut); + if(compressor == null) { + compressor = CodecPool.getCompressor(codec, conf); + } + cmpOut = codec.createOutputStream(fsOut, compressor); serializer = EventSerializerFactory.getInstance(serializerType, serializerContext, cmpOut); if (appending && !serializer.supportsReopen()) { @@ -148,6 +153,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { fsOut.flush(); fsOut.sync(); cmpOut.close(); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } unregisterCurrentStream(); }