flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers
Date Fri, 27 Jun 2014 03:59:08 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 4c5b602c5 -> d56feccfb


FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d56feccf
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d56feccf
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d56feccf

Branch: refs/heads/flume-1.6
Commit: d56feccfb922165ab35a856f3d2cc65649093571
Parents: 4c5b602
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Thu Jun 26 20:57:31 2014 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Thu Jun 26 20:59:01 2014 -0700

----------------------------------------------------------------------
 .../flume/sink/hdfs/HDFSCompressedDataStream.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d56feccf/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index fe857c3..dc93e4f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +50,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
   private Context serializerContext;
   private EventSerializer serializer;
   private boolean useRawLocalFileSystem;
+  private Compressor compressor;
 
   @Override
   public void configure(Context context) {
@@ -83,7 +86,6 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
             "is not of type LocalFileSystem: " + hdfs.getClass().getName());
       }
     }
-
     boolean appending = false;
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
     (dstPath)) {
@@ -92,7 +94,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
     } else {
       fsOut = hdfs.create(dstPath);
     }
-    cmpOut = codec.createOutputStream(fsOut);
+    if(compressor == null) {
+      compressor = CodecPool.getCompressor(codec, conf);
+    }
+    cmpOut = codec.createOutputStream(fsOut, compressor);
     serializer = EventSerializerFactory.getInstance(serializerType,
         serializerContext, cmpOut);
     if (appending && !serializer.supportsReopen()) {
@@ -148,6 +153,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter {
     fsOut.flush();
     fsOut.sync();
     cmpOut.close();
+    if (compressor != null) {
+      CodecPool.returnCompressor(compressor);
+      compressor = null;
+    }
     unregisterCurrentStream();
   }
 


Mime
View raw message