parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-353: Release compression resources.
Date Fri, 11 Dec 2015 21:17:18 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master b45c4bdb4 -> 491690335


PARQUET-353: Release compression resources.

This updates the use of CodecFactory in the output format and writer
classes so that its lifecycle is tied to ParquetWriter and
ParquetRecordWriter. When those classes are closed, the resources held
by the CodecFactory associated with the instance are released.

This is an alternative to and closes #282.

Author: Ryan Blue <blue@apache.org>

Closes #295 from rdblue/PARQUET-353-release-compressor-resources and squashes the following
commits:

a00f4b7 [Ryan Blue] PARQUET-353: Release compression resources.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/49169033
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/49169033
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/49169033

Branch: refs/heads/master
Commit: 49169033546d893dae3db903a2fa6af712f125c0
Parents: b45c4bd
Author: Ryan Blue <blue@apache.org>
Authored: Fri Dec 11 13:17:04 2015 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Fri Dec 11 13:17:04 2015 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/ParquetOutputFormat.java     | 11 ++---
 .../parquet/hadoop/ParquetRecordWriter.java     | 51 +++++++++++++-------
 .../apache/parquet/hadoop/ParquetWriter.java    |  6 ++-
 3 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 8979dba..31cc96b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -323,9 +323,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
 
   /**
    * constructor used when this OutputFormat in wrapped in another one (In Pig for example)
-   * @param writeSupportClass the class used to convert the incoming records
-   * @param schema the schema of the records
-   * @param extraMetaData extra meta data to be stored in the footer of the file
+   * @param writeSupport the class used to convert the incoming records
    */
   public <S extends WriteSupport<T>> ParquetOutputFormat(S writeSupport) {
     this.writeSupport = writeSupport;
@@ -387,8 +385,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     if (INFO) LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck());
     if (INFO) LOG.info("Min row count for page size check is: " + props.getMaxRowCountForPageSizeCheck());
 
-    CodecFactory codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());
-
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(
         conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
@@ -411,10 +407,11 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
         init.getSchema(),
         init.getExtraMetaData(),
         blockSize,
-        codecFactory.getCompressor(codec),
+        codec,
         validating,
         props,
-        memoryManager);
+        memoryManager,
+        conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
index 6c94fac..a9ade96 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -28,6 +29,7 @@ import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
 
 import static org.apache.parquet.Preconditions.checkNotNull;
@@ -43,8 +45,9 @@ import static org.apache.parquet.Preconditions.checkNotNull;
  */
 public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
 
-  private InternalParquetRecordWriter<T> internalWriter;
-  private MemoryManager memoryManager;
+  private final InternalParquetRecordWriter<T> internalWriter;
+  private final MemoryManager memoryManager;
+  private final CodecFactory codecFactory;
 
   /**
    *
@@ -79,6 +82,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
         extraMetaData, blockSize, compressor, validating, props);
     this.memoryManager = null;
+    this.codecFactory = null;
   }
 
   /**
@@ -106,14 +110,17 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
       boolean validating,
       WriterVersion writerVersion,
       MemoryManager memoryManager) {
-    this(w, writeSupport, schema, extraMetaData, blockSize, compressor,
-        validating, ParquetProperties.builder()
-            .withPageSize(pageSize)
-            .withDictionaryPageSize(dictionaryPageSize)
-            .withDictionaryEncoding(enableDictionary)
-            .withWriterVersion(writerVersion)
-            .build(),
-        memoryManager);
+    ParquetProperties props = ParquetProperties.builder()
+        .withPageSize(pageSize)
+        .withDictionaryPageSize(dictionaryPageSize)
+        .withDictionaryEncoding(enableDictionary)
+        .withWriterVersion(writerVersion)
+        .build();
+    internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
+        extraMetaData, blockSize, compressor, validating, props);
+    this.memoryManager = checkNotNull(memoryManager, "memoryManager");
+    memoryManager.addWriter(internalWriter, blockSize);
+    this.codecFactory = null;
   }
 
   /**
@@ -123,7 +130,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
    * @param schema the schema of the records
    * @param extraMetaData extra meta data to write in the footer of the file
    * @param blockSize the size of a block in the file (this will be approximate)
-   * @param compressor the compressor used to compress the pages
+   * @param codec the compression codec used to compress the pages
    * @param validating if schema validation should be turned on
    * @param props parquet encoding properties
    */
@@ -133,12 +140,15 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
       MessageType schema,
       Map<String, String> extraMetaData,
       long blockSize,
-      BytesCompressor compressor,
+      CompressionCodecName codec,
       boolean validating,
       ParquetProperties props,
-      MemoryManager memoryManager) {
+      MemoryManager memoryManager,
+      Configuration conf) {
+    this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, compressor, validating, props);
+        extraMetaData, blockSize, codecFactory.getCompressor(codec), validating,
+        props);
     this.memoryManager = checkNotNull(memoryManager, "memoryManager");
     memoryManager.addWriter(internalWriter, blockSize);
   }
@@ -148,9 +158,16 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void,
T> {
    */
   @Override
   public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
-    internalWriter.close();
-    if (memoryManager != null) {
-      memoryManager.removeWriter(internalWriter);
+    try {
+      internalWriter.close();
+      // release after the writer closes in case it is used for a last flush
+    } finally {
+      if (codecFactory != null) {
+        codecFactory.release();
+      }
+      if (memoryManager != null) {
+        memoryManager.removeWriter(internalWriter);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/49169033/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index f58dda4..58cbe95 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -52,6 +52,7 @@ public class ParquetWriter<T> implements Closeable {
   public static final int MAX_PADDING_SIZE_DEFAULT = 0;
 
   private final InternalParquetRecordWriter<T> writer;
+  private final CodecFactory codecFactory;
 
   /**
    * Create a new ParquetWriter.
@@ -273,7 +274,7 @@ public class ParquetWriter<T> implements Closeable {
         conf, schema, file, mode, blockSize, maxPaddingSize);
     fileWriter.start();
 
-    CodecFactory codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
+    this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
     CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName);
     this.writer = new InternalParquetRecordWriter<T>(
         fileWriter,
@@ -300,6 +301,9 @@ public class ParquetWriter<T> implements Closeable {
       writer.close();
     } catch (InterruptedException e) {
       throw new IOException(e);
+    } finally {
+      // release after the writer closes in case it is used for a last flush
+      codecFactory.release();
     }
   }
 


Mime
View raw message