From commits-return-1570-archive-asf-public=cust-asf.ponee.io@parquet.apache.org Mon Feb 25 12:23:41 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DE995180626 for ; Mon, 25 Feb 2019 13:23:40 +0100 (CET) Received: (qmail 80698 invoked by uid 500); 25 Feb 2019 12:23:40 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 80689 invoked by uid 99); 25 Feb 2019 12:23:39 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Feb 2019 12:23:39 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5572082E8A; Mon, 25 Feb 2019 12:23:39 +0000 (UTC) Date: Mon, 25 Feb 2019 12:23:39 +0000 To: "commits@parquet.apache.org" Subject: [parquet-mr] branch master updated: PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change (#622) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155109741920.17284.16168574192925442868@gitbox.apache.org> From: gabor@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: parquet-mr X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 4cc22ddac29cf808b7a29d7747951a1bc69d1d09 X-Git-Newrev: f7998934020ea6f4949e347616431219343d8a15 X-Git-Rev: f7998934020ea6f4949e347616431219343d8a15 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. gabor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git The following commit(s) were added to refs/heads/master by this push: new f799893 PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change (#622) f799893 is described below commit f7998934020ea6f4949e347616431219343d8a15 Author: Gabor Szadovszky AuthorDate: Mon Feb 25 13:23:32 2019 +0100 PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change (#622) --- .../parquet/hadoop/codec/SnappyCompressor.java | 26 ++++------------------ .../parquet/hadoop/codec/SnappyDecompressor.java | 26 ++++------------------ 2 files changed, 8 insertions(+), 44 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java index b2a8e7f..4720c08 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java @@ -32,23 +32,16 @@ import org.apache.parquet.Preconditions; * entire input in setInput and compresses it as one compressed block. */ public class SnappyCompressor implements Compressor { - private static final int initialBufferSize = 64 * 1024 * 1024; - // Buffer for compressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); // Buffer for uncompressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private long bytesRead = 0L; private long bytesWritten = 0L; private boolean finishCalled = false; - public SnappyCompressor() { - inputBuffer.limit(0); - outputBuffer.limit(0); - } - /** * Fills specified buffer with compressed data. Returns actual number * of bytes of compressed data. A return value of 0 indicates that @@ -120,7 +113,8 @@ public class SnappyCompressor implements Compressor { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override @@ -157,18 +151,6 @@ public class SnappyCompressor implements Compressor { @Override public synchronized void reset() { - if (inputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - - if (outputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - finishCalled = false; bytesRead = bytesWritten = 0; inputBuffer.rewind(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java index 8a7f86d..c3da63f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java @@ -27,21 +27,14 @@ import org.xerial.snappy.Snappy; import org.apache.parquet.Preconditions; public class SnappyDecompressor implements Decompressor { - private static final int initialBufferSize = 64 * 1024 * 1024; - // Buffer for uncompressed output. This buffer grows as necessary. - private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0); // Buffer for compressed input. This buffer grows as necessary. - private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); + private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0); private boolean finished; - public SnappyDecompressor() { - inputBuffer.limit(0); - outputBuffer.limit(0); - } - /** * Fills specified buffer with uncompressed data. Returns actual number * of bytes of uncompressed data. A return value of 0 indicates that @@ -122,7 +115,8 @@ public class SnappyDecompressor implements Decompressor { @Override public void end() { - // No-op + CleanUtil.clean(inputBuffer); + CleanUtil.clean(outputBuffer); } @Override @@ -142,18 +136,6 @@ public class SnappyDecompressor implements Decompressor { @Override public synchronized void reset() { - if (inputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = inputBuffer; - inputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - - if (outputBuffer.capacity() > initialBufferSize) { - ByteBuffer oldBuffer = outputBuffer; - outputBuffer = ByteBuffer.allocateDirect(initialBufferSize); - CleanUtil.clean(oldBuffer); - } - finished = false; inputBuffer.rewind(); outputBuffer.rewind();