Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6DDE21884C for ; Fri, 11 Dec 2015 21:17:18 +0000 (UTC) Received: (qmail 60560 invoked by uid 500); 11 Dec 2015 21:17:18 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 60535 invoked by uid 500); 11 Dec 2015 21:17:18 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 60526 invoked by uid 99); 11 Dec 2015 21:17:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 21:17:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2EC96E08E9; Fri, 11 Dec 2015 21:17:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Message-Id: <7f0fdaffd2c84760a250361901cf2a4d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-mr git commit: PARQUET-353: Release compression resources. Date: Fri, 11 Dec 2015 21:17:18 +0000 (UTC) Repository: parquet-mr Updated Branches: refs/heads/master b45c4bdb4 -> 491690335 PARQUET-353: Release compression resources. This updates the use of CodecFactory in the output format and writer classes so that its lifecycle is tied to ParquetWriter and ParquetRecordWriter. When those classes are closed, the resources held by the CodecFactory associated with the instance are released. This is an alternative to and closes #282. Author: Ryan Blue Closes #295 from rdblue/PARQUET-353-release-compressor-resources and squashes the following commits: a00f4b7 [Ryan Blue] PARQUET-353: Release compression resources. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/49169033 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/49169033 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/49169033 Branch: refs/heads/master Commit: 49169033546d893dae3db903a2fa6af712f125c0 Parents: b45c4bd Author: Ryan Blue Authored: Fri Dec 11 13:17:04 2015 -0800 Committer: Ryan Blue Committed: Fri Dec 11 13:17:04 2015 -0800 ---------------------------------------------------------------------- .../parquet/hadoop/ParquetOutputFormat.java | 11 ++--- .../parquet/hadoop/ParquetRecordWriter.java | 51 +++++++++++++------- .../apache/parquet/hadoop/ParquetWriter.java | 6 ++- 3 files changed, 43 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 8979dba..31cc96b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -323,9 +323,7 @@ public class ParquetOutputFormat extends FileOutputFormat { /** * constructor used when this OutputFormat in wrapped in another one (In Pig for example) - * @param writeSupportClass the class used to convert the incoming records - * @param schema the schema of the records - * @param extraMetaData extra meta data to be stored in the footer of the file + * @param writeSupport the class used to convert the incoming records */ public > ParquetOutputFormat(S writeSupport) { this.writeSupport = writeSupport; @@ -387,8 +385,6 @@ public class ParquetOutputFormat extends FileOutputFormat { if (INFO) LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck()); if (INFO) LOG.info("Min row count for page size check is: " + props.getMaxRowCountForPageSizeCheck()); - CodecFactory codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); - WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter( conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize); @@ -411,10 +407,11 @@ public class ParquetOutputFormat extends FileOutputFormat { init.getSchema(), init.getExtraMetaData(), blockSize, - codecFactory.getCompressor(codec), + codec, validating, props, - memoryManager); + memoryManager, + conf); } /** http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 6c94fac..a9ade96 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -21,6 +21,7 @@ package org.apache.parquet.hadoop; import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -28,6 +29,7 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import static org.apache.parquet.Preconditions.checkNotNull; @@ -43,8 +45,9 @@ import static org.apache.parquet.Preconditions.checkNotNull; */ public class ParquetRecordWriter extends RecordWriter { - private InternalParquetRecordWriter internalWriter; - private MemoryManager memoryManager; + private final InternalParquetRecordWriter internalWriter; + private final MemoryManager memoryManager; + private final CodecFactory codecFactory; /** * @@ -79,6 +82,7 @@ public class ParquetRecordWriter extends RecordWriter { internalWriter = new InternalParquetRecordWriter(w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); this.memoryManager = null; + this.codecFactory = null; } /** @@ -106,14 +110,17 @@ public class ParquetRecordWriter extends RecordWriter { boolean validating, WriterVersion writerVersion, MemoryManager memoryManager) { - this(w, writeSupport, schema, extraMetaData, blockSize, compressor, - validating, ParquetProperties.builder() - .withPageSize(pageSize) - .withDictionaryPageSize(dictionaryPageSize) - .withDictionaryEncoding(enableDictionary) - .withWriterVersion(writerVersion) - .build(), - memoryManager); + ParquetProperties props = ParquetProperties.builder() + .withPageSize(pageSize) + .withDictionaryPageSize(dictionaryPageSize) + .withDictionaryEncoding(enableDictionary) + .withWriterVersion(writerVersion) + .build(); + internalWriter = new InternalParquetRecordWriter(w, writeSupport, schema, + extraMetaData, blockSize, compressor, validating, props); + this.memoryManager = checkNotNull(memoryManager, "memoryManager"); + memoryManager.addWriter(internalWriter, blockSize); + this.codecFactory = null; } /** @@ -123,7 +130,7 @@ public class ParquetRecordWriter extends RecordWriter { * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file * @param blockSize the size of a block in the file (this will be approximate) - * @param compressor the compressor used to compress the pages + * @param codec the compression codec used to compress the pages * @param validating if schema validation should be turned on * @param props parquet encoding properties */ @@ -133,12 +140,15 @@ public class ParquetRecordWriter extends RecordWriter { MessageType schema, Map extraMetaData, long blockSize, - BytesCompressor compressor, + CompressionCodecName codec, boolean validating, ParquetProperties props, - MemoryManager memoryManager) { + MemoryManager memoryManager, + Configuration conf) { + this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); internalWriter = new InternalParquetRecordWriter(w, writeSupport, schema, - extraMetaData, blockSize, compressor, validating, props); + extraMetaData, blockSize, codecFactory.getCompressor(codec), validating, + props); this.memoryManager = checkNotNull(memoryManager, "memoryManager"); memoryManager.addWriter(internalWriter, blockSize); } @@ -148,9 +158,16 @@ public class ParquetRecordWriter extends RecordWriter { */ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { - internalWriter.close(); - if (memoryManager != null) { - memoryManager.removeWriter(internalWriter); + try { + internalWriter.close(); + // release after the writer closes in case it is used for a last flush + } finally { + if (codecFactory != null) { + codecFactory.release(); + } + if (memoryManager != null) { + memoryManager.removeWriter(internalWriter); + } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index f58dda4..58cbe95 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -52,6 +52,7 @@ public class ParquetWriter implements Closeable { public static final int MAX_PADDING_SIZE_DEFAULT = 0; private final InternalParquetRecordWriter writer; + private final CodecFactory codecFactory; /** * Create a new ParquetWriter. @@ -273,7 +274,7 @@ public class ParquetWriter implements Closeable { conf, schema, file, mode, blockSize, maxPaddingSize); fileWriter.start(); - CodecFactory codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); + this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName); this.writer = new InternalParquetRecordWriter( fileWriter, @@ -300,6 +301,9 @@ public class ParquetWriter implements Closeable { writer.close(); } catch (InterruptedException e) { throw new IOException(e); + } finally { + // release after the writer closes in case it is used for a last flush + codecFactory.release(); } }