parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch master updated: PARQUET-1485: Fix Snappy direct memory leak (#581)
Date Tue, 12 Feb 2019 10:33:22 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dcdcdc  PARQUET-1485: Fix Snappy direct memory leak (#581)
7dcdcdc is described below

commit 7dcdcdcf0eb5e91618c443d4a84973bf7883d79b
Author: PengchengLiu <pengchengliu_bupt@163.com>
AuthorDate: Tue Feb 12 18:33:18 2019 +0800

    PARQUET-1485: Fix Snappy direct memory leak (#581)
---
 .../org/apache/parquet/hadoop/codec/CleanUtil.java | 70 ++++++++++++++++++++++
 .../parquet/hadoop/codec/SnappyCompressor.java     | 27 ++++++++-
 .../parquet/hadoop/codec/SnappyDecompressor.java   | 31 ++++++++--
 3 files changed, 122 insertions(+), 6 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
new file mode 100644
index 0000000..8bf24b2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CleanUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Helper class which use reflections to clean up DirectBuffer. It's implemented for
+ * better compatibility with both java8 and java9+, because the Cleaner class is moved to
+ * another place since java9+.
+ */
+public class CleanUtil {
+  private static final Logger logger = LoggerFactory.getLogger(CleanUtil.class);
+  private static final Field CLEANER_FIELD;
+  private static final Method CLEAN_METHOD;
+
+  static {
+    ByteBuffer buf = null;
+    Field cleanerField = null;
+    Method cleanMethod = null;
+    try {
+      buf = ByteBuffer.allocateDirect(1);
+      cleanerField = buf.getClass().getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+      Object cleaner = cleanerField.get(buf);
+      cleanMethod = cleaner.getClass().getDeclaredMethod("clean");
+    } catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
+      logger.warn("Initialization failed for cleanerField or cleanMethod", e);
+    } finally {
+      clean(buf);
+    }
+    CLEANER_FIELD = cleanerField;
+    CLEAN_METHOD = cleanMethod;
+  }
+
+  public static void clean(ByteBuffer buffer) {
+    if (CLEANER_FIELD == null || CLEAN_METHOD == null) {
+      return;
+    }
+    try {
+      Object cleaner = CLEANER_FIELD.get(buffer);
+      CLEAN_METHOD.invoke(cleaner);
+    } catch (IllegalAccessException | InvocationTargetException | NullPointerException e)
{
+      // Ignore clean failure
+      logger.warn("Clean failed for buffer " + buffer.getClass().getSimpleName(), e);
+    }
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index d0270ca..b2a8e7f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -32,16 +32,23 @@ import org.apache.parquet.Preconditions;
  * entire input in setInput and compresses it as one compressed block.
  */
 public class SnappyCompressor implements Compressor {
+  private static final int initialBufferSize = 64 * 1024 * 1024;
+
   // Buffer for compressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
 
   // Buffer for uncompressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
 
   private long bytesRead = 0L;
   private long bytesWritten = 0L;
   private boolean finishCalled = false;
 
+  public SnappyCompressor() {
+    inputBuffer.limit(0);
+    outputBuffer.limit(0);
+  }
+
   /**
    * Fills specified buffer with compressed data. Returns actual number
    * of bytes of compressed data. A return value of 0 indicates that
@@ -66,7 +73,9 @@ public class SnappyCompressor implements Compressor {
       // There is uncompressed input, compress it now
       int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
       if (maxOutputSize > outputBuffer.capacity()) {
+        ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
+        CleanUtil.clean(oldBuffer);
       }
       // Reset the previous outputBuffer
       outputBuffer.clear();
@@ -97,7 +106,9 @@ public class SnappyCompressor implements Compressor {
       ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
       inputBuffer.rewind();
       tmp.put(inputBuffer);
+      ByteBuffer oldBuffer = inputBuffer;
       inputBuffer = tmp;
+      CleanUtil.clean(oldBuffer);
     } else {
       inputBuffer.limit(inputBuffer.position() + len);
     }
@@ -146,6 +157,18 @@ public class SnappyCompressor implements Compressor {
 
   @Override
   public synchronized void reset() {
+    if (inputBuffer.capacity() > initialBufferSize) {
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+      CleanUtil.clean(oldBuffer);
+    }
+
+    if (outputBuffer.capacity() > initialBufferSize) {
+      ByteBuffer oldBuffer = outputBuffer;
+      outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+      CleanUtil.clean(oldBuffer);
+    }
+
     finishCalled = false;
     bytesRead = bytesWritten = 0;
     inputBuffer.rewind();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 190f8d5..8a7f86d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -27,14 +27,21 @@ import org.xerial.snappy.Snappy;
 import org.apache.parquet.Preconditions;
 
 public class SnappyDecompressor implements Decompressor {
+  private static final int initialBufferSize = 64 * 1024 * 1024;
+
   // Buffer for uncompressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
 
   // Buffer for compressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
 
   private boolean finished;
-  
+
+  public SnappyDecompressor() {
+    inputBuffer.limit(0);
+    outputBuffer.limit(0);
+  }
+
   /**
    * Fills specified buffer with uncompressed data. Returns actual number
    * of bytes of uncompressed data. A return value of 0 indicates that
@@ -61,7 +68,9 @@ public class SnappyDecompressor implements Decompressor {
       // There is compressed input, decompress it now.
       int decompressedSize = Snappy.uncompressedLength(inputBuffer);
       if (decompressedSize > outputBuffer.capacity()) {
+        ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
+        CleanUtil.clean(oldBuffer);
       }
 
       // Reset the previous outputBuffer (i.e. set position to 0)
@@ -102,7 +111,9 @@ public class SnappyDecompressor implements Decompressor {
       ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
       inputBuffer.rewind();
       newBuffer.put(inputBuffer);
-      inputBuffer = newBuffer;      
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = newBuffer;
+      CleanUtil.clean(oldBuffer);
     } else {
       inputBuffer.limit(inputBuffer.position() + len);
     }
@@ -131,6 +142,18 @@ public class SnappyDecompressor implements Decompressor {
 
   @Override
   public synchronized void reset() {
+    if (inputBuffer.capacity() > initialBufferSize) {
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+      CleanUtil.clean(oldBuffer);
+    }
+
+    if (outputBuffer.capacity() > initialBufferSize) {
+      ByteBuffer oldBuffer = outputBuffer;
+      outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+      CleanUtil.clean(oldBuffer);
+    }
+
     finished = false;
     inputBuffer.rewind();
     outputBuffer.rewind();


Mime
View raw message