parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch master updated: PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change (#622)
Date Mon, 25 Feb 2019 12:23:39 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new f799893  PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change
(#622)
f799893 is described below

commit f7998934020ea6f4949e347616431219343d8a15
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Mon Feb 25 13:23:32 2019 +0100

    PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change (#622)
---
 .../parquet/hadoop/codec/SnappyCompressor.java     | 26 ++++------------------
 .../parquet/hadoop/codec/SnappyDecompressor.java   | 26 ++++------------------
 2 files changed, 8 insertions(+), 44 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index b2a8e7f..4720c08 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -32,23 +32,16 @@ import org.apache.parquet.Preconditions;
  * entire input in setInput and compresses it as one compressed block.
  */
 public class SnappyCompressor implements Compressor {
-  private static final int initialBufferSize = 64 * 1024 * 1024;
-
   // Buffer for compressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
   // Buffer for uncompressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
 
   private long bytesRead = 0L;
   private long bytesWritten = 0L;
   private boolean finishCalled = false;
 
-  public SnappyCompressor() {
-    inputBuffer.limit(0);
-    outputBuffer.limit(0);
-  }
-
   /**
    * Fills specified buffer with compressed data. Returns actual number
    * of bytes of compressed data. A return value of 0 indicates that
@@ -120,7 +113,8 @@ public class SnappyCompressor implements Compressor {
 
   @Override
   public void end() {
-    // No-op		
+    CleanUtil.clean(inputBuffer);
+    CleanUtil.clean(outputBuffer);
   }
 
   @Override
@@ -157,18 +151,6 @@ public class SnappyCompressor implements Compressor {
 
   @Override
   public synchronized void reset() {
-    if (inputBuffer.capacity() > initialBufferSize) {
-      ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
-      CleanUtil.clean(oldBuffer);
-    }
-
-    if (outputBuffer.capacity() > initialBufferSize) {
-      ByteBuffer oldBuffer = outputBuffer;
-      outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
-      CleanUtil.clean(oldBuffer);
-    }
-
     finishCalled = false;
     bytesRead = bytesWritten = 0;
     inputBuffer.rewind();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 8a7f86d..c3da63f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -27,21 +27,14 @@ import org.xerial.snappy.Snappy;
 import org.apache.parquet.Preconditions;
 
 public class SnappyDecompressor implements Decompressor {
-  private static final int initialBufferSize = 64 * 1024 * 1024;
-
   // Buffer for uncompressed output. This buffer grows as necessary.
-  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
   // Buffer for compressed input. This buffer grows as necessary.
-  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
 
   private boolean finished;
 
-  public SnappyDecompressor() {
-    inputBuffer.limit(0);
-    outputBuffer.limit(0);
-  }
-
   /**
    * Fills specified buffer with uncompressed data. Returns actual number
    * of bytes of uncompressed data. A return value of 0 indicates that
@@ -122,7 +115,8 @@ public class SnappyDecompressor implements Decompressor {
 
   @Override
   public void end() {
-    // No-op		
+    CleanUtil.clean(inputBuffer);
+    CleanUtil.clean(outputBuffer);
   }
 
   @Override
@@ -142,18 +136,6 @@ public class SnappyDecompressor implements Decompressor {
 
   @Override
   public synchronized void reset() {
-    if (inputBuffer.capacity() > initialBufferSize) {
-      ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
-      CleanUtil.clean(oldBuffer);
-    }
-
-    if (outputBuffer.capacity() > initialBufferSize) {
-      ByteBuffer oldBuffer = outputBuffer;
-      outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
-      CleanUtil.clean(oldBuffer);
-    }
-
     finished = false;
     inputBuffer.rewind();
     outputBuffer.rewind();


Mime
View raw message